2.12.3 Kafkaインバウンドアダプタの機能
KafkaインバウンドアダプタはKafkaからメッセージを受信するアダプタです。サービスプラットフォームからKafkaに接続し,Brokerのトピックからメッセージとしてレコードを取得して,Kafka受付を呼び出します。
Kafkaインバウンドアダプタが接続できるApache Kafkaのバージョンは,2.6.0〜2.8.0です。また,Kafkaインバウンドアダプタと連携できるアプリケーションは,サービスプラットフォームが提供するKafka受付だけです。
- 〈この項の構成〉
(1) メッセージの受信機能
メッセージの受信機能は,Brokerからメッセージを受信するための機能です。Consumerとも呼ばれます。
メッセージの受信機能(Consumer)の機能概要を次の図に示します。
Consumerは,Kafka受付の起動の延長で開始します。
(a) パーティションの割り当て
パーティションの割り当ては,Consumerグループのグループリーダー(グループ内の最初のConsumer)によって動的に行われます。Consumerに手動でパーティションを割り当てることはできません。
Consumerグループについては,「(5) KafkaインバウンドアダプタのConsumerグループ」を参照してください。
(b) サブスクライブ
Consumerが,どのトピックからレコードを受信するか設定することをサブスクライブといいます。Consumerのサブスクライブ対象のトピックを次に示します。
-
ユーザが作成したトピック
-
Kafkaが作成した内部トピック
サブスクライブ対象のトピックが存在しない場合でも,Broker側ではトピックは自動生成されません。この場合,Consumerの処理の実行中にトピックを作成することで,トピック作成後のポーリング処理でデータが読み込まれます。
(c) データのポーリング
データのポーリングでは,Consumerに割り当てられたトピック内のパーティションからデータを読み込みます。データの取得単位は,レコードバッチ単位です。ポーリング処理は,この機能が停止しない限り繰り返し行われます。
- クラスタの初期接続時のポーリング
-
Consumerは,Kafkaクラスタの情報を取得するために,データのポーリング実行時にKafkaクラスタの初期接続を実施します。Kafkaクラスタへの初期接続を確立するために使用するホスト/ポートペアのリストのどれかのBrokerと初期接続を行い,接続できたBrokerからKafkaクラスタの情報を取得します。ポーリング処理中に接続できた場合,以降のポーリングでは初期接続は必要ありません。
初期接続ができない場合,ポーリングの待ち時間内に繰り返し初期接続を行います。
ポーリングの待ち時間内に初期接続ができなかった場合,データのポーリング実行時にエラーは発生しません。
- Brokerの接続時のポーリング
-
初期接続が行えた場合は,次に示すBrokerに接続します。
-
Consumerグループの,グループコーディネータのBroker
-
パーティションのリーダーであるBroker
ポーリングの待ち時間内に接続ができなかった場合は,ポーリング実行時にエラーは発生しません。
-
- レコードのデシリアライズ
-
Consumerは,Brokerから取得したバイト配列のレコードをデシリアライズします。デシリアライズに使用するデシリアライザをKafka受付定義ファイルで設定できます。レコードのデシリアライズに関するプロパティを次に示します。
表2‒28 レコードのデシリアライズに関するプロパティ プロパティ
説明
consumer.key.deserializer
レコードのキーのデシリアライザを指定します。
consumer.value.deserializer
レコードの値のデシリアライザを指定します。
- 取得するメッセージの順序
-
Consumerが取得するメッセージは,単一のパーティション内のメッセージに対してだけ順序が保証されます。トピックが複数のパーティションで構成される場合,取り出されるメッセージが,送信された順序で取得できるとは限りません。
- 受信するレコード数の上限
-
ポーリングごとにBrokerから受信するレコード数の上限を設定できます。レコード数の上限は,Kafka受付定義ファイルのconsumer.max.poll.recordsで設定します。
- フェッチ要求のレコードのデータサイズ
-
Consumerからのデータのフェッチ要求に対して,Brokerが返す必要のあるデータサイズを設定できます。データサイズとは,1回のポーリングでConsumerが送信する,すべてのレコードデータのサイズです。
フェッチ要求のレコードのデータサイズについては,「(7) レコードのデータサイズ」を参照してください。
(d) 受信できるメッセージの圧縮形式
Consumerでは,次に示す形式で圧縮されたメッセージが受信できます。Consumer はメッセージを伸張して処理を行います。Consumerが受信できない形式で圧縮されたレコードを受信した場合の動作は保証されません。
圧縮方式 |
受信可否 |
---|---|
gzip |
○ |
lz4 |
× |
snappy |
× |
zstd |
× |
圧縮していないデータ |
○ |
(f) セキュリティ
Kafkaインバウンドアダプタでは,TLSを使用した暗号化と認証,ホスト名の検証をサポートしています。TLSで使用するセキュリティ通信用プロトコルについては,Kafkaの公式ドキュメントを参照してください。
TLSで使用できるTLS暗号スイートの制限はありません。ただし,使用する暗号スイートを設定することはできません。
ホスト名の検証は,証明書内のホスト名が接続先のホスト名と一致するかどうかをチェックします。ホスト名の検証を無効にすることはできません。
(g) レコード情報の作成
Brokerから取得したレコードを基に,Kafka受付に渡すレコード情報を作成します。レコード情報の作成は,1回のポーリングで受信したレコードごとに行います。レコード情報が作成されるのは,Brokerからメッセージが取得できた場合だけです。
取得したメッセージから作成されるレコード情報を次に示します。
レコードの項目 |
説明 |
---|---|
レコードのキー |
レコードのキーです。 |
レコードの値 |
レコードの値です。 |
オフセット |
対応するパーティション内のレコードの位置です。 |
パーティション |
レコードの受信元のパーティションです。 |
タイムスタンプ |
メッセージ送信時のレコードのタイムスタンプです。 |
トピック |
レコードを受け取ったトピックです。 |
ヘッダ |
レコードのヘッダ情報です。 |
(2) Kafka受付呼び出し機能
Kafka受付の呼び出しを実行することで,取得したレコード情報をビジネスプロセスで処理できるようになります。
Kafka受付の呼び出しは,ポーリングで取得したメッセージごとに行われます。そのため,ポーリングで複数のメッセージを受信した場合でも,Kafka受付に渡されるメッセージは1つです。また,Kafka受付の呼び出しはBrokerからメッセージが受信できた場合だけ行われます。
(3) Kafkaインバウンドアダプタの実行
メッセージの受信機能およびKafka受付呼び出し機能は,J2EEサーバのリソースアダプタのワーク管理を使用して実行します。リソースアダプタのワーク管理については,マニュアル「アプリケーションサーバ 機能解説 基本・開発編(コンテナ共通機能)」の「3.16.2 リソースアダプタのワーク管理」を参照してください。
リソースアダプタのワーク管理では,ワークに割り当てるスレッドをスレッドプールで管理します。スレッドプールの動作は,Connector属性ファイルで設定します。Kafkaインバウンドアダプタのスレッドプールに関するプロパティを次に示します。
プロパティ |
説明 |
---|---|
MaxTPoolSize |
スレッドプールで同時に実行される最大スレッド数 |
MinTPoolSize |
スレッドプールにプールする最小スレッド数 |
TPoolKeepalive |
スレッドプールのスレッド解放までのタイムアウト値 |
スレッドプールに関する設定については,マニュアル「サービスプラットフォーム システム構築・運用ガイド」の「3.1.2(11) Kafkaインバウンドアダプタのセットアップ」を参照してください。
(4) 配備できるリソースアダプタ数
1つのHCSCサーバに配備できるリソースアダプタ数は,1つです。リソースアダプタを複数配備した場合の動作は保証されません。
(5) KafkaインバウンドアダプタのConsumerグループ
Consumerは,Consumerグループと呼ばれるグループに属することができます。Consumerは,初回のポーリングまたはConsumerグループのリバランス時に,ConsumerグループのコーディネータであるBrokerに,グループへの参加要求を行います。
Consumerグループのメッセージの受信例を次に示します。
この例の場合,「Group1」「Group2」で同一のメッセージが受信されます。
同一グループ内に複数のConsumerがある場合,グループ内のどれか1つのConsumerが一意のメッセージを受信できます。
パーティションはBrokerから各Consumerに割り振られますが,Consumerグループ内の各Consumerに同一のパーティションが割り当てられることはありません。
レコードは各パーティションのどれかに格納されるため,Consumerグループ内のConsumerが同一のメッセージを受信することはありません。
Consumerグループが異なる場合は,Consumerグループ単位で同一のメッセージを受信することができます。
(a) Consumerグループのメンバーへのパーティションの割り当て
パーティション数に応じて,Consumerグループのメンバーへのパーティションは次のように割り当てられます。
パーティション数とConsumerグループのメンバーの関係 |
パーティションの割り当て |
---|---|
パーティション数 < グループ内のメンバー |
パーティション数がグループ内のメンバーより少ない場合,パーティションが割り当てられないメンバーが存在します。 |
パーティション数 = グループ内のメンバー |
パーティション数がグループ内のメンバーと等しい場合,均等に割り当てられます。 |
パーティション数 > グループ内のメンバー |
パーティション数がグループ内のメンバーより多い場合,割り当てに偏りが出ます。 |
- パーティション数 < グループ内のメンバーの場合
-
パーティション数がグループ内のメンバーより少ない場合,パーティションが割り当てられないConsumerが存在します。パーティションが割り当てられないConsumerは,リバランスでパーティションが割り当てられない限りメッセージを受信しません。
パーティション数がグループ内のメンバーより少ない場合の割り当て例を次に示します。
パーティション数 < グループ内のメンバーの場合の割り当て例
Consumer2に割り当てるパーティションがないため,Consumer2はメッセージを受信しません。
- パーティション数 = グループ内のメンバーの場合
-
パーティション数がグループ内のメンバーと等しい場合,グループ内のすべてのConsumerがメッセージを受信できる状態となります。
パーティション数がグループ内のメンバーと等しい場合の割り当て例を次に示します。
パーティション数 = グループ内のメンバーの場合の割り当て例
グループ内のすべてのConsumerにパーティションが割り当たります。
- パーティション数 > グループ内のメンバーの場合
-
パーティション数がグループ内のメンバーより多い場合,グループ内のどれかのConsumerに,多くのパーティションが割り当てられます。
パーティション数がグループ内のメンバーより多い場合の割り当て例を次に示します。
パーティション数 > グループ内のメンバーの場合の割り当て例
グループ内にパーティションを均等に割り当てることができないため,Consumer1に2つのパーティションが割り当てられます。
(b) パーティションの範囲割り当て
Kafkaでは,Consumerがグループ構成になっている場合,トピック内のパーティションをグループのメンバーに分散して割り当てます。これを範囲割り当てといいます。
範囲割り当ては,次の順序で行われます。
-
トピックごとに使用できるパーティションを番号順に,Consumerを辞書式順序でレイアウトします。
-
パーティションの数をConsumerの総数で割り,各Consumerに割り当てるパーティションの数を決定します。
均等に分割できない場合,最初のConsumerから順にパーティションを1つ追加で割り当てます。
- 範囲割り当ての例
-
2つのConsumerC0とC1,2つのトピックt0とt1があり,各トピックに3つのパーティションp0,p1,p2がある場合,パーティションはt0p0,t0p1,t0p2,t1p0,t1p1,およびt1p2の6つとなります。
トピックt0のパーティションは3つあり,これらのパーティションを2つのConsumerで分割します。均等に分割できないため,C0にパーティションが1つ追加で割り当てられます。トピックt1のパーティションも同様に割り当てが行われます。
そのため,Consumerごとのパーティションの割り当ては次のようになります。
-
C0:t0p0,t0p1,t1p0,t1p1
-
C1:t0p2,t1p2
-
(c) Consumerグループのメンバーシップ
Consumerグループのメンバーシップは動的メンバーシップとなります。静的メンバーシップに変更することはできません。
(d) Consumerグループのリバランス
ConsumerグループへのConsumerの加入や除外などを契機として,Brokerがグループ内のリバランスを行います。Consumerグループのリバランス時は,ポーリングが停止し,リバランス完了後にポーリングが再開します。
(e) 冗長構成
パーティションはConsumerごとに割り当てられるため,冗長構成にすることができます。Consumerグループがクラスタ内の同一のKafka受付を使用した冗長構成の例を次の図に示します。
ただし,Consumerごとにパーティションが割り当てられるため,冗長構成にしても特定のHCSCサーバの負荷を分散することはできません。
(f) 注意事項
Consumerグループについての注意事項を次に示します。
-
Consumerグループに対する操作は,Consumerグループの情報表示に関する操作,およびオフセットのリセットだけ許可されています。それ以外の操作を行った場合の動作は保証されません。
-
サービスプラットフォームで構成したConsumerグループに参加できるのは,サービスプラットフォームのConsumerだけです。その他の製品のConsumerがグループに参加した場合の動作は保証されません。
(6) コミットのタイミング
Consumerがコミット処理を行うことで,どこまでメッセージを読み込んだかBrokerに通知されます。メッセージをどこまで読んだかを管理する単位をオフセットといいます。
オフセットには,次の2つがあります。
-
現在のオフセット
Consumerが読み取る次のメッセージのオフセットを示します。
-
コミットされたオフセット
パーティションの最後にコミットされたオフセットを示します。
オフセットが初期化されている場合,初回のポーリングでは割り当てられたパーティションの「コミットされたオフセット」以降のメッセージを受信します。2回目以降のポーリングでは「現在のオフセット」以降のメッセージを受信します。オフセットが初期化されていない場合,または「現在のオフセット」がサーバ上にない場合は,オフセットの自動リセットを行います。Consumerは,パーティション内のメッセージの読み込み位置を指定することはできません。
オフセットされたコミットの例を次に示します。
オフセットの初期位置が0の場合,5通のメッセージを受信して,コミットを行うと,コミットされたオフセットは「5」となります。
コミットのタイミングは,Consumer単位で制御できます。コミットのタイミングは,次のどちらかを選択できます。
-
Brokerからのメッセージ受信後
-
メッセージ処理後(Kafka受付からの応答後)
コミットのタイミングの違いによる動作の違いを次に示します。
コミットのタイミング |
一度のポーリングで受信できるメッセージ数 |
同一メッセージを受信するケース |
ポーリングの最大遅延時間内にメッセージ処理が終了しない場合 |
同一メッセージの再送制限 |
---|---|---|---|---|
メッセージ受信後 |
1つ以上 |
コミットが完了するまでにエラーが発生した場合,またはコミットが実行されなかった場合※1は,リバランスでパーティションを引き継いだConsumerが同一メッセージを受信します。 リバランスが発生して該当するパーティションがグループ内のConsumerに引き継がれた場合は,パーティションを引き継いだConsumerはコミットされたオフセットからメッセージを受信するため,同一のメッセージを受信します。 |
処理を続行します。 Consumerはグループから除外されます。この場合,次回のポーリング処理で再度グループに加入するようにリバランスされます。 |
制限できない |
メッセージ処理後 |
1つだけ |
メッセージ受信後にエラーが発生した場合,またはコミットが実行されなかった場合※1は,再送の上限数まで同一のConsumerが受信します。 また,このあとにリバランスが発生した場合は,パーティションを引き継いだConsumerが同一メッセージを受信します。 |
Consumerを停止します。Consumerはグループから除外されます。グループから除外された場合,割り当てられていたパーティションがグループ内の他のConsumerに割り当てられます。※2 |
制限できる※3 |
パーティションを引き継いだ場合の例を次に示します。
図中の1.〜4.について説明します。
-
Consumer1がポーリング処理を行います。
現在のオフセットが「1」のため,オフセット「1」のメッセージを受信します。
-
ポーリングの最大遅延時間内に次のポーリング処理を呼べなかった場合,Consumer1がグループから除外されます。
-
リバランス処理によってConsumer2にパーティションが引き継がれます。
コミットされたオフセットが「1」のため,Consumer2ではオフセット「1」のメッセージを受信します。
-
Consumer1がメッセージをコミットします。
Consumer1は割り当てられていないパーティションに対してコミットするため,コミットが失敗します。
(a) メッセージ受信後のコミット
メッセージの処理順序を考慮する必要がない場合に設定します。この場合,ビジネスプロセスのメッセージ処理が失敗したときに,該当するメッセージ処理の回復処理を行わなくても,該当するメッセージよりあとのオフセットのメッセージが処理できます。
Kafka受付からの応答が正常か,異常かに関わらず,「現在のオフセット」および「コミットされたオフセット」が更新されるため,次回のポーリングでは,前回受信したメッセージの次のオフセットのメッセージが受信されます。
図中の1.〜3.について説明します。
-
ポーリング処理でメッセージを受信します。
受信するメッセージ数は1つです。現在のオフセットは「1」であるため,オフセット「1」のメッセージを受信します。メッセージ受信後に,現在のオフセットは「2」になります。
-
メッセージをコミットします。
コミット実行後のオフセット(コミットされたオフセット)は「2」となります。
-
オフセット「1」のメッセージをKafka受付に送信します。
Kafka受付からの応答が正常か,異常かに関わらず,現在のオフセットは「2」のため,次回のポーリングではオフセット「2」のメッセージを受信します。
また,コミット後にリバランスが発生して,該当するパーティションがグループ内のConsumerに引き継がれた場合は,パーティションを引き継いだConsumerは2.でコミットされたオフセットからメッセージを受信します。
(b) メッセージ処理後のコミット
ビジネスプロセスのメッセージ処理が失敗した場合,該当するメッセージ処理の回復処理を行ってから,次のオフセットのメッセージを処理する必要があります。この場合,該当するメッセージは再送されます。
メッセージ処理後のコミットでは,Kafka受付からの応答が正常な場合だけコミットを行います。メッセージ受信後の処理が失敗した場合や,Kafka受付からエラー応答が返ってきた場合はコミットしません。また,次回のポーリングでは前回受信したメッセージを再度受信します。
図中の1.〜2.について説明します。
-
ポーリング処理を行い,メッセージを受信します。
現在のオフセットは「1」のため,オフセット「1」のメッセージを受信します。メッセージ受信後に現在のオフセットは「2」となります。
-
オフセット「1」のメッセージをKafka受付に送信します。
Kafka受付からエラー応答があった場合,コミット処理が省略されるため,コミットされたオフセットは更新されません。
また,次回のポーリングで同一のメッセージを受信するために,現在のオフセットをエラーが発生したメッセージのオフセットに戻します。
(c) オフセットの自動リセット
オフセットが初期化されていない場合,または「現在のオフセット」がサーバ上にない場合は,オフセットの自動リセットを行います。自動リセットは,Kafka受付定義ファイルのconsumer.auto.offset.resetで設定します。
自動リセットの方法に応じて取得されるデータを次に示します。
プロパティ |
設定値 |
説明 |
---|---|---|
consumer.auto.offset.reset |
earliest |
最も早いオフセットに自動的にリセットします。パーティションの先頭からすべてのメッセージを取得します。 |
latest |
自動的に最新のオフセットにリセットします。サブスクライブのあとに送信されたメッセージを取得します。 |
自動リセットの設定に応じた自動リセットで取得できるメッセージの例を次に示します。
earliest(最も早いオフセットにリセット)を設定した場合,オフセットが0にリセットされるため,オフセット0以降のメッセージを受信できます。
latest(最新のオフセットにリセット)を設定した場合,サブスクライブのあとに送信されたメッセージであるオフセット4以降のメッセージが受信できます。サブスクライブ前にパーティションに存在していたオフセット0〜3のメッセージは受信しません。
(7) レコードのデータサイズ
レコードのデータサイズはKafka受付定義ファイルで設定します。データは,レコードバッチ単位で取得します。
レコードのデータサイズを設定するプロパティを次に示します。
プロパティ |
説明 |
---|---|
consumer.fetch.min.bytes※ |
サーバがフェッチ要求に対して返す必要のあるデータの最小量を指定します。 |
consumer.fetch.max.bytes |
サーバがフェッチ要求に対して返す必要のあるデータの最大量を指定します。 |
consumer.max.partition.fetch.bytes |
サーバが返すパーティションごとのデータの最大量を指定します。 |
- 注※
-
利用できるデータに満たない場合,Brokerはその量のデータが蓄積されるまで待ってからConsumerに応答します。データが蓄積されるまでの待ち時間は,consumer.fetch.max.wait.ms(フェッチ要求のブロック時間)で設定します。この待ち時間を経過した場合,最小サイズに満たなくてもBrokerから応答が返却されます。
- 利用できるデータを満たす場合
-
最小サイズを20バイトと設定し,次に示すパーティションの,オフセット0のメッセージから受信する場合の例を次に示します。データの取得単位は,1レコード1バッチです。
図2‒102 利用できるデータを満たす場合の例 最小サイズを満たすデータがあるため,最大サイズまでのデータが受信できます。
- 利用できるデータに満たない場合
-
最小サイズを50バイトと設定し,次に示すパーティションの,オフセット0のメッセージから受信する場合の例を次に示します。データの取得単位は,1レコード1バッチです。
図2‒103 利用できるデータに満たない場合の例 最小サイズに満たないため,データが蓄積されるまで待ちます。
待ち時間内に最小サイズを満たすデータが蓄積されない場合,オフセット0〜3のメッセージが受信されます。
待ち時間内に最小サイズを満たすデータが蓄積された場合,オフセット4のメッセージまで受信できます。