Hitachi

Cosminexus V11 BPM/ESB基盤 サービスプラットフォーム リファレンス


3.13.1 Kafkaアダプタ定義ファイル

〈この項の構成〉

(1) 形式

[adpkafka.xml2json={true|false}]
 
[adpkafka.request.record.binary.decoding={hexBinary|base64Binary}]
 
 producer.key.serializer={ByteArraySerializer|StringSerializer}
 producer.value.serializer={ByteArraySerializer|StringSerializer}
 
[producer.acks=Apache Kafkaが書き込み完了通知を返すタイミング]
 
[producer.compression.type={none|gzip}]
 
[producer.retries=メッセージの再送信回数]
 
[producer.client.id=送信リクエストの実行時にBrokerに渡すid文字列]
 
[producer.connections.max.idle.ms=アイドル状態の接続タイムアウト時間]
[producer.delivery.timeout.ms=メッセージ送信の応答のタイムアウト時間]
[producer.max.block.ms=送信リクエストの呼び出しのタイムアウト時間]
[producer.request.timeout.ms=送信要求の応答のタイムアウト時間]
[producer.socket.connection.setup.timeout.max.ms=ソケット接続が確立されるまでクライアントが待機する最大時間]
[producer.socket.connection.setup.timeout.ms=ソケット接続が確立されるまでクライアントが待機する時間]

(2) 機能

Kafkaアダプタの動作情報を設定します。設定した内容はKafkaアダプタの開始時に有効になります。

Kafkaアダプタ定義ファイルは「adpkafka.properties」というファイル名で作成します。

(3) ファイルの格納先

<サービスプラットフォームのインストールディレクトリ>\CSC\custom-adapter\Kafka\config\templates\adpkafka.properties
注意事項

ここに格納されているファイルはテンプレートファイルです。このテンプレートファイルを任意のディレクトリにコピーしたあとファイルを編集し,定義ファイルを作成してください。

(4) 設定できるプロパティ

(a) 拡張機能

adpkafka.xml2json={true|false} 〜<<false>>

ボディ要求電文のJSON-XML変換を実施するかどうかを指定します。大文字と小文字は区別されます。

  • true

    JSON-XML変換を実施します。

  • false

    JSON-XML変換を実施しません。

(b) デコード形式

adpkafka.request.record.binary.decoding={hexBinary|base64Binary} 〜<<hexBinary>>

要求電文中の要素値をバイナリデータへデコードする際のデコード形式を指定します。

  • hexBinary

    hexBinary形式でデコードします。

  • base64Binary

    base64Binary形式でデコードします。

producer.key.serializerプロパティにByteArraySerializerを指定した場合,ヘッダ要求電文のメッセージのkey要素に指定した値は,ここで指定したデコード形式でデコードされてApache Kafkaへ送信されます。そのため,key要素の値はデコード形式に合わせて指定する必要があります。形式が異なる場合の動作は保証できません。

(c) シリアライザクラス

producer.key.serializer={ByteArraySerializer|StringSerializer}

メッセージのキーのシリアライザクラスを指定します。指定は省略できません。

  • ByteArraySerializer

    メッセージのキーのデータ型をByte[]として扱います。

    これを指定した場合,ヘッダ要求電文のメッセージのkey要素に指定した値は,adpkafka.request.record.binary.decodingプロパティに指定したデコード形式でデコードされてApache Kafkaへ送信されます。

  • StringSerializer

    メッセージのキーのデータ型をjava.lang.Stringとして扱います。

ここで設定した値は,Apache Kafkaへのメッセージ送信に使用されるProducer APIのプロパティ「key.serializer」に設定されます。詳細はApache Kafkaの公式ドキュメントを参照してください。

producer.value.serializer={ByteArraySerializer|StringSerializer}

メッセージの値のシリアライザクラスを指定します。指定は省略できません。

JSON-XML変換を実施(adpkafka.xml2json=trueを指定)する場合は,StringSerializerを指定してください。

  • ByteArraySerializer

    メッセージの値のデータ型をByte[]として扱います。

  • StringSerializer

    メッセージの値のデータ型をjava.lang.Stringとして扱います。

ここで設定した値は,Apache Kafkaへのメッセージ送信に使用されるProducer APIのプロパティ「value.serializer」に設定されます。詳細はApache Kafkaの公式ドキュメントを参照してください。

(d) 書き込み完了通知のタイミング

producer.acks=Apache Kafkaが書き込み完了通知を返すタイミング 〜<<1>>

Kafkaアダプタからの送信リクエストに対し,Apache Kafkaが書き込み完了通知(ack)を返すタイミングを指定します。

ここで設定した値は,Apache Kafkaへのメッセージ送信に使用されるProducer APIのプロパティ「acks」に設定されます。設定値などの詳細はApache Kafkaの公式ドキュメントを参照してください。

(e) 圧縮タイプ

producer.compression.type={none|gzip} 〜<<none>>

メッセージの圧縮タイプを指定します。

  • none

    圧縮しません。

  • gzip

    gzipを使用します。

ここで設定した値は,Apache Kafkaへのメッセージ送信に使用されるProducer APIのプロパティ「compression.type」に設定されます。詳細はApache Kafkaの公式ドキュメントを参照してください。

(f) 再試行回数

producer.retries=メッセージの再送信回数 〜<<2147483647>>

一時的なエラーで送信が失敗したメッセージの再送信回数を指定します。

ここで設定した値は,Apache Kafkaへのメッセージ送信に使用されるProducer APIのプロパティ「retries」に設定されます。設定できる値の範囲などの詳細はApache Kafkaの公式ドキュメントを参照してください。

(g) クライアントID

producer.client.id=送信リクエストの実行時にBrokerに渡すid文字列 〜<<空文字列>>

送信リクエストの実行時にBrokerに渡すid文字列を指定します。デフォルト値で動作させる場合は,このプロパティを定義しないでください。

ここで設定した値は,Apache Kafkaへのメッセージ送信に使用されるProducer APIのプロパティ「client.id」に設定されます。設定値などの詳細はApache Kafkaの公式ドキュメントを参照してください。

(h) タイムアウト時間

producer.connections.max.idle.ms=アイドル状態の接続タイムアウト時間 〜<<540000>>

アイドル状態の接続を閉じるまでの最大待機時間をミリ秒単位で指定します。

指定した時間を過ぎると,アイドル状態となっている接続を閉じますが,メッセージ送信機能は続行します。

ここで設定した値は,Apache Kafkaへのメッセージ送信に使用されるProducer APIのプロパティ「connections.max.idle.ms」に設定されます。設定できる値の範囲などの詳細はApache Kafkaの公式ドキュメントを参照してください。

producer.delivery.timeout.ms=メッセージ送信の応答のタイムアウト時間 〜<<120000>>

Kafkaアダプタからの送信リクエストの呼び出しが返されてから,Apache Kafkaへメッセージ送信の成功または失敗を応答するまでの最大待機時間をミリ秒単位で指定します。producer.request.timeout.msプロパティの指定値以上の値を指定してください。

指定した時間を過ぎると,エラーメッセージを出力してメッセージ送信機能を停止します。

ここで設定した値は,Apache Kafkaへのメッセージ送信に使用されるProducer APIのプロパティ「delivery.timeout.ms」に設定されます。設定できる値の範囲などの詳細はApache Kafkaの公式ドキュメントを参照してください。

producer.max.block.ms=送信リクエストの呼び出しのタイムアウト時間 〜<<60000>>

送信リクエストの呼び出しまでの最大待機時間をミリ秒単位で指定します。

指定した時間を過ぎると,エラーメッセージを出力してメッセージ送信機能を停止します。

ここで設定した値は,Apache Kafkaへのメッセージ送信に使用されるProducer APIのプロパティ「max.block.ms」に設定されます。設定できる値の範囲などの詳細はApache Kafkaの公式ドキュメントを参照してください。

producer.request.timeout.ms=送信要求の応答のタイムアウト時間 〜<<30000>>

送信要求の応答までの最大待機時間をミリ秒単位で指定します。

指定した時間を過ぎると,次のように対処されます。

  • メッセージを再送できる場合は,リクエストが再試行されます。

  • メッセージを再送できない場合は,エラーメッセージを出力してメッセージ送信機能を停止します。

ここで設定した値は,Apache Kafkaへのメッセージ送信に使用されるProducer APIのプロパティ「request.timeout.ms」に設定されます。設定できる値の範囲などの詳細はApache Kafkaの公式ドキュメントを参照してください。

producer.socket.connection.setup.timeout.max.ms=ソケット接続が確立されるまでクライアントが待機する最大時間 〜<<30000>>

ソケット接続が確立されるまでのクライアントの最大待機時間をミリ秒単位で指定します。

指定した時間を過ぎると,ほかのノードに接続を切り替えます。メッセージ送信機能は続行します。

ここで設定した値は,Apache Kafkaへのメッセージ送信に使用されるProducer APIのプロパティ「socket.connection.setup.timeout.max.ms」に設定されます。設定できる値の範囲などの詳細はApache Kafkaの公式ドキュメントを参照してください。

producer.socket.connection.setup.timeout.ms=ソケット接続が確立されるまでクライアントが待機する時間 〜<<10000>>

ソケット接続が確立されるまでのクライアントの待機時間をミリ秒単位で指定します。

指定した時間を過ぎると,ほかのノードに接続を切り替えます。メッセージ送信機能は続行します。

ここで設定した値は,Apache Kafkaへのメッセージ送信に使用されるProducer APIのプロパティ「socket.connection.setup.timeout.ms」に設定されます。設定できる値の範囲などの詳細はApache Kafkaの公式ドキュメントを参照してください。