Hitachi

Cosminexus V11 BPM/ESB基盤 サービスプラットフォーム リファレンス


3.14.1 Kafka受付定義ファイル

〈この項の構成〉

(1) 形式

 consumer.key.deserializer={StringDeserializer|ByteArrayDeserializer}
 consumer.value.deserializer={StringDeserializer|ByteArrayDeserializer}
 consumer.bootstrap.servers=初期接続時のホスト/ポート
[consumer.fetch.min.bytes=フェッチ要求への返却データの最小量]
 consumer.group.id=Consumerグループの識別文字列
[consumer.heartbeat.interval.ms=ハートビート間の予想時間]
[consumer.max.partition.fetch.bytes=パーティションごとのデータの最大量]
[consumer.session.timeout.ms=クライアントの障害検出用のタイムアウト時間]
[consumer.ssl.key.password=キーストアファイル内の秘密鍵のパスワード]
[consumer.ssl.keystore.key=キーストアファイルの秘密キー]
[consumer.ssl.keystore.location=キーストアファイルの格納場所]
[consumer.ssl.keystore.password=キーストアファイルのストアパスワード]
[consumer.ssl.truststore.location=トラストストアファイルの格納場所]
[consumer.ssl.truststore.password=トラストストアファイルのパスワード]
[consumer.auto.offset.reset={earliest|latest}]
[consumer.client.dns.lookup={use_all_dns_ips|resolve_canonical_bootstrap_servers_only}]
[consumer.connections.max.idle.ms=アイドル状態の接続タイムアウト時間]
[consumer.fetch.max.bytes=フェッチ要求への返却データの最大量]
[consumer.max.poll.interval.ms=ポーリングの最大遅延時間]
[consumer.max.poll.records=1回のポーリングの呼び出しで返されるレコードの最大数]
[consumer.receive.buffer.bytes=TCP受信ソケットバッファのサイズ]
[consumer.request.timeout.ms=要求の応答を待機する最大時間]
[consumer.security.protocol={PLAINTEXT|SSL}]
[consumer.send.buffer.bytes=TCP送信ソケットバッファのサイズ]
[consumer.socket.connection.setup.timeout.max.ms=ソケット接続が確立されるまでクライアントが待機する最大時間]
[consumer.socket.connection.setup.timeout.ms=ソケット接続が確立されるまでクライアントが待機する時間]
[consumer.ssl.enabled.protocols=SSL接続が有効なプロトコルのリスト]
[consumer.ssl.keystore.type=キーストアファイルのファイル形式]
[consumer.ssl.protocol=SSLContextの生成に使用するSSLプロトコル]
[consumer.ssl.provider=SSL接続に使用するセキュリティプロバイダの名称]
[consumer.ssl.truststore.type=トラストストアファイルのファイル形式]
[consumer.client.id=リクエスト時にサーバに渡すID文字列]
[consumer.fetch.max.wait.ms=フェッチ要求への応答のブロック時間]
[consumer.metadata.max.age.ms=メタデータの強制更新までの時間]
[consumer.reconnect.backoff.max.ms=Brokerへ再接続するまでの最大待機時間]
[consumer.reconnect.backoff.ms=特定のホストへの再接続を試行する前に待機する基本時間]
[consumer.retry.backoff.ms=特定のパーティションへの再要求までの待機時間]
 
[urecp-kafka.commit-processing.control={msg-rcv-after|msg-proc-after}]
[urecp-kafka.commit-timeout.ms=オフセットのコミットの最大待機時間]
[urecp-kafka.poll-timeout.ms=ポーリング時の待ち時間]
 urecp-kafka.topic-name=サブスクライブするトピック
[urecp-kafka.err-message.retry-count=Kafka受付の呼び出しのリトライ回数]
[urecp-kafka.record-binary.encoding={hexBinary|base64Binary}]
[urecp-kafka.resource-adapter=Kafkaインバウンドアダプタの表示名]
[urecp-kafka.ejb-transaction-timeout=EJBトランザクションのタイムアウト時間]
[urecp-kafka.json2xml={true|false}]

(2) 機能

Kafkaインバウンドアダプタ内で動作するConsumerの動作や,Kafka受付の動作を設定します。設定した内容は,Kafka受付のデプロイ時,Kafka受付の起動時,またはcsccmctlコマンドの実行時に読み込まれます。

Kafka受付定義ファイルは「cscurecpkafka.properties」というファイル名で作成してください。

(3) ファイルの格納先

<サービスプラットフォームのインストールディレクトリ>\CSC\custom-reception\kafka\config\templates\cscurecpkafka.properties
注意事項

ここに格納されているファイルはテンプレートファイルです。Kafka受付定義ファイルを編集する場合は,Kafka受付の追加時にユーザ定義受付定義画面(詳細)で[独自定義ファイル]の[編集]ボタンをクリックし,定義ファイルの内容を修正してください。

(4) 設定できるプロパティ

(a) Apache Kafkaの定義

consumer.key.deserializer={StringDeserializer|ByteArrayDeserializer}

メッセージのキーのデシリアライザを指定します。指定は省略できません。

  • StringDeserializer

    デシリアライザとしてStringDeserializerを使用します。

  • ByteArrayDeserializer

    デシリアライザとしてByteArrayDeserializerを使用します。

ここで設定した値はApache KafkaのConsumer定義「key.deserializer」に設定され,KafkaインバウンドアダプタのConsumer機能に使用されます。詳細はApache Kafkaの公式ドキュメントを参照してください。

consumer.value.deserializer={StringDeserializer|ByteArrayDeserializer}

メッセージの値のデシリアライザを指定します。指定は省略できません。

  • StringDeserializer

    デシリアライザとしてStringDeserializerを使用します。

  • ByteArrayDeserializer

    デシリアライザとしてByteArrayDeserializerを使用します。

ここで設定した値はApache KafkaのConsumer定義「value.deserializer」に設定され,KafkaインバウンドアダプタのConsumer機能に使用されます。詳細はApache Kafkaの公式ドキュメントを参照してください。

consumer.bootstrap.servers=初期接続時のホスト/ポート

Kafkaクラスタへの初期接続を確立するために使用するホスト/ポートを指定します。指定は省略できません。

ここで設定した値はApache KafkaのConsumer定義「bootstrap.servers」に設定され,KafkaインバウンドアダプタのConsumer機能に使用されます。設定できる値などの詳細はApache Kafkaの公式ドキュメントを参照してください。

consumer.fetch.min.bytes=フェッチ要求への返却データの最小量 〜<<1>>

サーバがフェッチ要求に対して返す必要のあるデータの最小量を指定します。

ここで設定した値はApache KafkaのConsumer定義「fetch.min.bytes」に設定され,KafkaインバウンドアダプタのConsumer機能に使用されます。設定できる値などの詳細はApache Kafkaの公式ドキュメントを参照してください。

consumer.group.id=Consumerグループの識別文字列

Consumerが属するConsumerグループを識別するための,一意の文字列を指定します。指定は省略できません。

ここで設定した値はApache KafkaのConsumer定義「group.id」に設定され,KafkaインバウンドアダプタのConsumer機能に使用されます。設定できる値などの詳細はApache Kafkaの公式ドキュメントを参照してください。

consumer.heartbeat.interval.ms=ハートビート間の予想時間 〜<<3000>>

Consumerコーディネータへのハートビート間の予想時間をミリ秒で指定します。

ここで設定した値はApache KafkaのConsumer定義「heartbeat.interval.ms」に設定され,KafkaインバウンドアダプタのConsumer機能に使用されます。設定できる値などの詳細はApache Kafkaの公式ドキュメントを参照してください。

consumer.max.partition.fetch.bytes=パーティションごとのデータの最大量 〜<<1048576>>

サーバが返すパーティションごとのデータの最大量を指定します。

ここで設定した値はApache KafkaのConsumer定義「max.partition.fetch.bytes」に設定され,KafkaインバウンドアダプタのConsumer機能に使用されます。設定できる値などの詳細はApache Kafkaの公式ドキュメントを参照してください。

consumer.session.timeout.ms=クライアントの障害検出用のタイムアウト時間 〜<<10000>>

Apache Kafkaのグループ管理機能の使用時のタイムアウト時間をミリ秒で指定します。クライアントの障害を検出するために使用されます。

ここで設定した値はApache KafkaのConsumer定義「session.timeout.ms」に設定され,KafkaインバウンドアダプタのConsumer機能に使用されます。設定できる値などの詳細はApache Kafkaの公式ドキュメントを参照してください。

consumer.ssl.key.password=キーストアファイル内の秘密鍵のパスワード

キーストアファイル内の秘密鍵のパスワードを指定します。

ここで設定した値はApache KafkaのConsumer定義「ssl.key.password」に設定され,KafkaインバウンドアダプタのConsumer機能に使用されます。設定できる値などの詳細はApache Kafkaの公式ドキュメントを参照してください。

consumer.ssl.keystore.key=キーストアファイルの秘密キー

キーストアファイルの秘密キーを指定します。

ここで設定した値はApache KafkaのConsumer定義「ssl.keystore.key」に設定され,KafkaインバウンドアダプタのConsumer機能に使用されます。設定できる値などの詳細はApache Kafkaの公式ドキュメントを参照してください。

consumer.ssl.keystore.location=キーストアファイルの格納場所

キーストアファイルの格納場所を指定します。

ここで設定した値はApache KafkaのConsumer定義「ssl.keystore.location」に設定され,KafkaインバウンドアダプタのConsumer機能に使用されます。設定できる値などの詳細はApache Kafkaの公式ドキュメントを参照してください。

consumer.ssl.keystore.password=キーストアファイルのストアパスワード

キーストアファイルのストアパスワードを指定します。

ここで設定した値はApache KafkaのConsumer定義「ssl.keystore.password」に設定され,KafkaインバウンドアダプタのConsumer機能に使用されます。設定できる値などの詳細はApache Kafkaの公式ドキュメントを参照してください。

consumer.ssl.truststore.location=トラストストアファイルの格納場所

トラストストアファイルの格納場所を指定します。

ここで設定した値はApache KafkaのConsumer定義「ssl.truststore.location」に設定され,KafkaインバウンドアダプタのConsumer機能に使用されます。設定できる値などの詳細はApache Kafkaの公式ドキュメントを参照してください。

consumer.ssl.truststore.password=トラストストアファイルのパスワード

トラストストアファイルのパスワードを指定します。

ここで設定した値はApache KafkaのConsumer定義「ssl.truststore.password」に設定され,KafkaインバウンドアダプタのConsumer機能に使用されます。設定できる値などの詳細はApache Kafkaの公式ドキュメントを参照してください。

consumer.auto.offset.reset={earliest|latest} 〜<<latest>>

Apache Kafkaに初期オフセットがない場合や,現在のオフセットがサーバ上に存在しない場合の対処方法を次のどちらかで指定します。

  • earliest

    最も早いオフセットに自動的にリセットします。

  • latest

    自動的に最新のオフセットにリセットします。

ここで設定した値はApache KafkaのConsumer定義「auto.offset.reset」に設定され,KafkaインバウンドアダプタのConsumer機能に使用されます。詳細はApache Kafkaの公式ドキュメントを参照してください。

consumer.client.dns.lookup={use_all_dns_ips|resolve_canonical_bootstrap_servers_only} 〜<<use_all_dns_ips>>

クライアントがDNSルックアップを使用する方法を次のどちらかで指定します。

  • use_all_dns_ips

    接続が正常に確立されるまで,返された各IPアドレスに順番に接続します。すべてのIPアドレスが一度使用されると,クライアントはホスト名からIPアドレスを再度名前解決します(ただし,JVMとOSキャッシュの両方のDNS名のルックアップ)。

  • resolve_canonical_bootstrap_servers_only

    各ブートストラップアドレスを正規名のリストで名前解決します。

    ブートストラップフェーズのあとは,use_all_dns_ipsと同じ動作となります。

ここで設定した値はApache KafkaのConsumer定義「client.dns.lookup」に設定され,KafkaインバウンドアダプタのConsumer機能に使用されます。詳細はApache Kafkaの公式ドキュメントを参照してください。

consumer.connections.max.idle.ms=アイドル状態の接続タイムアウト時間 〜<<540000>>

アイドル状態の接続を閉じるまでのタイムアウト時間をミリ秒で指定します。

ここで設定した値はApache KafkaのConsumer定義「connections.max.idle.ms」に設定され,KafkaインバウンドアダプタのConsumer機能に使用されます。設定できる値などの詳細はApache Kafkaの公式ドキュメントを参照してください。

consumer.fetch.max.bytes=フェッチ要求への返却データの最大量 〜<<52428800>>

サーバがフェッチ要求に対して返す必要のあるデータの最大量を指定します。

ここで設定した値はApache KafkaのConsumer定義「fetch.max.bytes」に設定され,KafkaインバウンドアダプタのConsumer機能に使用されます。設定できる値などの詳細はApache Kafkaの公式ドキュメントを参照してください。

consumer.max.poll.interval.ms=ポーリングの最大遅延時間 〜<<300000>>

ポーリングの最大遅延時間をミリ秒で指定します。

ここで設定した値はApache KafkaのConsumer定義「max.poll.interval.ms」に設定され,KafkaインバウンドアダプタのConsumer機能に使用されます。設定できる値などの詳細はApache Kafkaの公式ドキュメントを参照してください。

consumer.max.poll.records=1回のポーリングの呼び出しで返されるレコードの最大数 〜<<500>>

1回のポーリングの呼び出しで返されるレコードの最大数を指定します。

ここで設定した値はApache KafkaのConsumer定義「max.poll.records」に設定され,KafkaインバウンドアダプタのConsumer機能に使用されます。設定できる値などの詳細はApache Kafkaの公式ドキュメントを参照してください。

consumer.receive.buffer.bytes=TCP受信ソケットバッファのサイズ 〜<<65536>>

TCP受信ソケットバッファのサイズを指定します。

ここで設定した値はApache KafkaのConsumer定義「receive.buffer.bytes」に設定され,KafkaインバウンドアダプタのConsumer機能に使用されます。設定できる値などの詳細はApache Kafkaの公式ドキュメントを参照してください。

consumer.request.timeout.ms=要求の応答を待機する最大時間 〜<<30000>>

要求に対する応答を待機する最大時間をミリ秒で指定します。

ここで設定した値はApache KafkaのConsumer定義「request.timeout.ms」に設定され,KafkaインバウンドアダプタのConsumer機能に使用されます。設定できる値などの詳細はApache Kafkaの公式ドキュメントを参照してください。

consumer.security.protocol={PLAINTEXT|SSL} 〜<<PLAINTEXT>>

Brokerとの通信に使用するプロトコルを次のどちらかで指定します。

  • PLAINTEXT

    PLAINTEXTを使用します。

  • SSL

    SSLを使用します。

ここで設定した値はApache KafkaのConsumer定義「security.protocol」に設定され,KafkaインバウンドアダプタのConsumer機能に使用されます。詳細はApache Kafkaの公式ドキュメントを参照してください。

consumer.send.buffer.bytes=TCP送信ソケットバッファのサイズ 〜<<131072>>

TCP送信ソケットバッファのサイズを指定します。

ここで設定した値はApache KafkaのConsumer定義「send.buffer.bytes」に設定され,KafkaインバウンドアダプタのConsumer機能に使用されます。設定できる値などの詳細はApache Kafkaの公式ドキュメントを参照してください。

consumer.socket.connection.setup.timeout.max.ms=ソケット接続が確立されるまでクライアントが待機する最大時間 〜<<30000>>

ソケット接続が確立されるまでクライアントが待機する時間の最大値をミリ秒で指定します。

ここで設定した値はApache KafkaのConsumer定義「socket.connection.setup.timeout.max.ms」に設定され,KafkaインバウンドアダプタのConsumer機能に使用されます。設定できる値などの詳細はApache Kafkaの公式ドキュメントを参照してください。

consumer.socket.connection.setup.timeout.ms=ソケット接続が確立されるまでクライアントが待機する時間 〜<<10000>>

ソケット接続が確立されるまでクライアントが待機する時間をミリ秒で指定します。

ここで設定した値はApache KafkaのConsumer定義「socket.connection.setup.timeout.ms」に設定され,KafkaインバウンドアダプタのConsumer機能に使用されます。設定できる値などの詳細はApache Kafkaの公式ドキュメントを参照してください。

consumer.ssl.enabled.protocols=SSL接続が有効なプロトコルのリスト 〜<<TLSv1.2>>

SSL接続が有効になっているプロトコルのリストを指定します。

ここで設定した値はApache KafkaのConsumer定義「ssl.enabled.protocols」に設定され,KafkaインバウンドアダプタのConsumer機能に使用されます。設定できる値などの詳細はApache Kafkaの公式ドキュメントを参照してください。

consumer.ssl.keystore.type=キーストアファイルのファイル形式 〜<<JKS>>

キーストアファイルのファイル形式を指定します。

ここで設定した値はApache KafkaのConsumer定義「ssl.keystore.type」に設定され,KafkaインバウンドアダプタのConsumer機能に使用されます。設定できる値などの詳細はApache Kafkaの公式ドキュメントを参照してください。

consumer.ssl.protocol=SSLContextの生成に使用するSSLプロトコル 〜<<TLSv1.2>>

SSLContextの生成に使用するSSLプロトコルを指定します。

ここで設定した値はApache KafkaのConsumer定義「ssl.protocol」に設定され,KafkaインバウンドアダプタのConsumer機能に使用されます。設定できる値などの詳細はApache Kafkaの公式ドキュメントを参照してください。

consumer.ssl.provider=SSL接続に使用するセキュリティプロバイダの名称 〜<<null>>

SSL接続に使用するセキュリティプロバイダの名前を指定します。デフォルト値で動作させる場合は,このプロパティを定義しないでください。

ここで設定した値はApache KafkaのConsumer定義「ssl.provider」に設定され,KafkaインバウンドアダプタのConsumer機能に使用されます。設定できる値などの詳細はApache Kafkaの公式ドキュメントを参照してください。

consumer.ssl.truststore.type=トラストストアファイルのファイル形式 〜<<JKS>>

トラストストアファイルのファイル形式を指定します。

ここで設定した値はApache KafkaのConsumer定義「ssl.truststore.type」に設定され,KafkaインバウンドアダプタのConsumer機能に使用されます。設定できる値などの詳細はApache Kafkaの公式ドキュメントを参照してください。

consumer.client.id=リクエスト時にサーバに渡すID文字列 〜<<空文字列>>

リクエスト時にサーバに渡すID文字列を指定します。デフォルト値で動作させる場合は,このプロパティを定義しないでください。

ここで設定した値はApache KafkaのConsumer定義「client.id」に設定され,KafkaインバウンドアダプタのConsumer機能に使用されます。設定できる値などの詳細はApache Kafkaの公式ドキュメントを参照してください。

consumer.fetch.max.wait.ms=フェッチ要求への応答のブロック時間 〜<<500>>

Consumer定義「fetch.min.bytes」で指定されたデータ最小量に達するデータがない場合,サーバがフェッチ要求に応答する前にブロックする時間をミリ秒で指定します。

ここで設定した値はApache KafkaのConsumer定義「fetch.max.wait.ms」に設定され,KafkaインバウンドアダプタのConsumer機能に使用されます。設定できる値などの詳細はApache Kafkaの公式ドキュメントを参照してください。

consumer.metadata.max.age.ms=メタデータの強制更新までの時間 〜<<300000>>

メタデータの強制更新までの時間をミリ秒単位で指定します。

ここで設定した値はApache KafkaのConsumer定義「metadata.max.age.ms」に設定され,KafkaインバウンドアダプタのConsumer機能に使用されます。設定できる値などの詳細はApache Kafkaの公式ドキュメントを参照してください。

consumer.reconnect.backoff.max.ms=Brokerへ再接続するまでの最大待機時間 〜<<1000>>

接続に失敗したBrokerに再接続するまでの待機時間をミリ秒単位で指定します。

ここで設定した値はApache KafkaのConsumer定義「reconnect.backoff.max.ms」に設定され,KafkaインバウンドアダプタのConsumer機能に使用されます。設定できる値などの詳細はApache Kafkaの公式ドキュメントを参照してください。

consumer.reconnect.backoff.ms=特定のホストへの再接続を試行する前に待機する基本時間 〜<<50>>

特定のホストへの再接続を試行する前に待機する基本時間をミリ秒で指定します。

ここで設定した値はApache KafkaのConsumer定義「reconnect.backoff.ms」に設定され,KafkaインバウンドアダプタのConsumer機能に使用されます。設定できる値などの詳細はApache Kafkaの公式ドキュメントを参照してください。

consumer.retry.backoff.ms=特定のパーティションへの再要求までの待機時間 〜<<100>>

要求が失敗した特定のトピックのパーティションに対して,要求を再試行するまでの待機時間をミリ秒で指定します。

ここで設定した値はApache KafkaのConsumer定義「retry.backoff.ms」に設定され,KafkaインバウンドアダプタのConsumer機能に使用されます。設定できる値などの詳細はApache Kafkaの公式ドキュメントを参照してください。

(b) Kafka受付独自の定義

urecp-kafka.commit-processing.control={msg-rcv-after|msg-proc-after} 〜<<msg-proc-after>>

KafkaインバウンドアダプタからBrokerへオフセットをコミットするタイミングを指定します。

  • msg-rcv-after

    Apache Kafkaからメッセージを受信した直後にコミットします。

  • msg-proc-after

    Kafka受付から応答が返ったあとでコミットします。

urecp-kafka.commit-timeout.ms=オフセットのコミットの最大待機時間 〜<数字>((1−2147483647))<<60000>>

KafkaインバウンドアダプタからBrokerへコミットする際に,オフセットのコミットを待機する最大時間をミリ秒で指定します。

urecp-kafka.poll-timeout.ms=ポーリング時の待ち時間 〜<数字>((1−2147483647))<<60000>>

Kafkaインバウンドアダプタがポーリングを実行するまでの待ち時間をミリ秒で指定します。

urecp-kafka.topic-name=サブスクライブするトピック 〜<半角英数字,アンダーバー(_),ハイフン(-),ピリオド(.)>((1トピックあたり1−249文字))

Kafkaインバウンドアダプタがサブスクライブするトピックの名称を指定します。指定は省略できません。

コマンド制御(MBean)の実行時は設定値が適切かチェックされます。csccmctlコマンドの実行時はチェックされません。

トピックを複数指定する場合は,値をコンマ区切りで指定してください。

urecp-kafka.err-message.retry-count=Kafka受付の呼び出しのリトライ回数 〜<数字>((0−1024))<<1>>

Kafka受付がKafkaインバウンドアダプタへエラー応答をした際に,Kafka受付の呼び出しをリトライする回数を指定します。値に0を指定した場合はリトライを実行しません。

urecp-kafka.record-binary.encoding={hexBinary|base64Binary} 〜<<hexBinary>>

Kafkaインバウンドアダプタから渡されたレコードデータをビジネスプロセスに渡す要求電文に関して,要素値に設定するレコードデータのエンコード形式を指定します。

  • hexBinary

    レコードデータをhexBinary形式でエンコードします。

  • base64Binary

    レコードデータをbase64Binary形式でエンコードします。

urecp-kafka.resource-adapter=Kafkaインバウンドアダプタの表示名 〜<英数字(a〜z,A〜Z,0〜9)およびアンダーバー(_)>((1−240文字))<<Kafka_Inbound_Resource_Adapter>>

受付処理が使用するKafkaインバウンドアダプタの表示名(Connector属性ファイルのdisplay-nameタグの設定値)を指定します。

urecp-kafka.ejb-transaction-timeout=EJBトランザクションのタイムアウト時間 〜<数字>((0−2147483647))<<0>>

受付処理およびカスタム受付フレームワークのEJBトランザクションのタイムアウト時間(秒)を指定します。

0を指定した場合,J2EEサーバに設定されたデフォルトのタイムアウト値で動作します。

urecp-kafka.json2xml={true|false} 〜<<false>>

Kafka受付のリクエスト処理時にJSON-XML変換をするかどうかを指定します。大文字と小文字は区別されます。

  • true

    JSON-XML変換機能を使用して,レコードの値をXMLに変換します。

  • false

    JSON-XML変換機能を使用しません。