2.12.4 Kafkaインバウンドアダプタのタイムアウト監視
Kafkaインバウンドアダプタのタイムアウトを監視するポイントを次に示します。
- (凡例)
-
-
アイドルタイムアウト※
-
ポーリングの最大遅延時間
-
メタデータの更新を強制するまでの期間※
-
ポーリングのブロック時間
-
コミットのタイムアウト
-
最初のソケット接続が確立されるまでクライアントが待機する最大時間
-
最初のソケット接続が確立されるまで待機する時間
-
繰り返し接続に失敗したBrokerに再接続するときに待機する最大時間
-
フェッチ要求のブロック時間
-
パーティションへの要求の待機時間
-
特定のホストへの再接続を試行する前に待機する基本時間
- 注※
-
フェッチ要求の延長で,タイムアウトが経過したか監視しています。
-
タイムアウトは次に示す関係で設定することを推奨します。
1. > 3. > 4. > 6. > 7. > 8. > 9. > 10. > 11. 1. > 2. > 5.
また,次の関係で設定することを推奨します。
4. > (6. + 7. + 8. + 9. + 10. + 11.)
- 〈この項の構成〉
(1) Kafkaのグループ管理機能のタイムアウト
Kafkaのグループ管理機能は,BrokerがConsumerグループを管理するための機能です。Kafkaクラスタのサーバにあるグループコーディネータが,Consumerグループのメンバーが生存しているか把握できるように,Consumerから定期的にハートビートを送信しています。Consumerがグループコーディネータにハートビートを一定期間送信できなかった場合,グループコーディネータはハートビートが送信できなかったConsumerをグループから除外します。
グループ管理機能に関するタイムアウトについて,次の表に示します。
タイムアウト項目 |
説明 |
該当するKafka受付定義ファイルのプロパティ |
---|---|---|
セッションタイムアウト |
Consumerセッションのタイムアウトです。 |
consumer.session.timeout.ms |
ハートビート間隔 |
ハートビートの送信間隔です。 |
consumer.heartbeat.interval.ms |
- タイムアウト発生後の動作
-
セッションタイムアウトの期間内に,ハートビートがBrokerに送信されなかった場合は,ポーリング処理がエラーとなります。エラーメッセージをメッセージログに出力して,Consumerを停止します。
(2) ポーリング処理のタイムアウト
ポーリング処理に関するタイムアウトについて,次の表に示します。
タイムアウト項目 |
説明 |
該当するKafka受付定義ファイルのプロパティ |
タイムアウト発生後の動作 |
---|---|---|---|
ポーリングのブロック時間 |
ポーリングの待ち時間です。 |
urecp-kafka.poll-timeout.ms |
空のレコードセットを返却し,ポーリング処理を続行します。 |
ポーリングの最大遅延時間 |
ポーリング実行後に,次のポーリングが行われるまでの時間です。 |
consumer.max.poll.interval.ms |
ポーリングの最大遅延時間のタイムアウト発生後の動作は,「2.12.3(6) コミットのタイミング」を参照してください。 |
フェッチ要求のブロック時間 |
フェッチ要求でサーバが返却する最小データ量(fetch.min.bytes)を,すぐに満たすのに十分なデータがない場合に,サーバがフェッチ要求に応答する前にブロックする最大時間です。 |
consumer.fetch.max.wait.ms |
フェッチ要求が終了し,ポーリング処理を続行します。 |
パーティションへの要求の待機時間 |
パーティションへの要求が失敗した場合に,失敗した要求を再試行する前に待機する時間です。 |
consumer.retry.backoff.ms |
失敗した要求を再試行し,ポーリング処理を続行します。 |
メタデータの更新を強制するまでの期間 |
メタデータを強制リフレッシュするための最大待ち時間です。Brokerやパーティションの追加を行った場合,この時間経過後にBrokerやパーティションが検出されます。 |
consumer.metadata.max.age.ms |
メタデータを強制リフレッシュし,ポーリング処理を続行します。 |
ポーリング処理のタイムアウトの範囲を次の図に示します。
- (凡例)
-
-
ポーリングのブロック時間
-
ポーリングの最大遅延時間
-
フェッチ要求のブロック時間
-
パーティションへの要求の待機時間
-
メタデータの更新を強制するまでの期間
-
(3) コミットのタイムアウト
コミットのタイムアウトは,オフセットコミットの完了を待機する最大時間です。Kafka受付定義ファイルのurecp-kafka.commit-timeout.msで設定します。
タイムアウトの範囲を次の図に示します。コミット時間を設定する場合,ポーリングの最大遅延時間を考慮する必要があります。
コミット処理はポーリングの後に行われるため,ポーリングの最大遅延時間内に含まれます。そのため,ポーリングの最大遅延時間よりコミットのタイムアウトが長い場合は,ポーリングの最大遅延時間が経過したあとにコミットが行われるときがあります。このときは,ポーリングの最大遅延時間が経過したあとにConsumerがグループから除外されるため,コミットが失敗するおそれがあります。コミットのタイムアウトは,ポーリングの最大遅延時間を超えないように設定してください。
- タイムアウト発生後の動作
-
オフセットコミットが正常に完了する前にタイムアウトが切れた場合は,Consumerを停止し,エラーメッセージをメッセージログに出力して,処理を終了します。
(4) アイドルタイムアウト
アイドルタイムアウトは,アイドル状態の接続を閉じる際に使用されるタイムアウトです。Kafka受付定義ファイルのconsumer.connections.max.idle.msで設定します。
- タイムアウト発生後の動作
-
アイドル状態の接続を閉じ,Consumerを続行します。
(5) Consumerインスタンスのクローズタイムアウト
Consumerインスタンスをクローズするときに使用されるタイムアウトです。タイムアウト値は,30秒です。Consumerインスタンスのクローズタイムアウトのタイムアウト値は変更できません。
- タイムアウト発生後の動作
-
エラーメッセージを出力して,処理を続行します。
(6) Broker接続
Broker接続のタイムアウトについて,次の表に示します。Broker接続はポーリング時に行われるため,これらのタイムアウトが,ポーリングの最大遅延時間を超えないように設定する必要があります。
タイムアウト項目 |
説明 |
該当するKafka受付定義ファイルのプロパティ |
タイムアウト発生後の動作 |
---|---|---|---|
最初のソケット接続が確立されるまで待機する時間 |
最初のソケット接続が確立されるまで待機する時間です。 |
consumer.socket.connection.setup.timeout.ms |
最大時間が経過していない場合,Brokerに再接続して,Consumerを続行します。 |
最初のソケット接続が確立されるまでクライアントが待機する最大時間 |
最初のソケット接続が確立されるまでクライアントが待機する最大時間です。 |
consumer.socket.connection.setup.timeout.max.ms |
最大時間が経過していない場合,Brokerに再接続して,Consumerを続行します。 |
再接続を試行する前に待機する基本時間 |
特定のホストへの再接続を試行する前に待機する基本時間です。 |
consumer.reconnect.backoff.ms |
最大時間が経過していない場合,Brokerに再接続して,Consumerを続行します。 |
再接続するときに待機する最大時間 |
繰り返し接続に失敗したBrokerに再接続するときに待機する最大時間です。 |
consumer.reconnect.backoff.max.ms |
最大時間が経過していない場合,Brokerに再接続して,Consumerを続行します。 |