2.12.5 Kafkaアダプタの機能
Kafkaアダプタは,ビジネスプロセスからのメッセージ送信要求に従い,Kafkaにメッセージを送信するためのサービスアダプタです。Kafkaアダプタが利用できるApache Kafkaのバージョンは2.6.0以降です。
Kafkaアダプタを利用したメッセージの送信の例を次に示します。
図中の1.〜3.について説明します。
-
ビジネスプロセスから要求電文が送信されます。
-
Kafkaアダプタで要求電文をメッセージに変換します。
メッセージの変換については,「(3) メッセージの変換」を参照してください。
-
KafkaアダプタからKafkaにメッセージが送信されます。
メッセージの送信については,「(4) メッセージの送信」を参照してください。
- 〈この項の構成〉
(1) KafkaアダプタとApache Kafkaとの関係
KafkaアダプタとApache Kafkaとの関係を次の図に示します。
- Apache Kafka
-
複数台のサーバで大量のデータを処理する分散メッセージングシステムです。Kafkaとも表記します。
- Broker
-
Kafkaの構成要素の一つで,データを受信,および配信するサービスです。
- メッセージ
-
Kafka内で扱うデータの単位です。レコードともいいます。メッセージはキーバリュー形式で構成されています。
- トピック
-
メッセージを種別ごとに管理するためのストレージです。Broker上に配置され,管理されます。Brokerで設定されたauto.create.topics.enableがtrueで,かつ送信先のBrokerに指定したトピックがない場合,Apache Kafkaは,自動でトピックを生成します。このときもKafkaアダプタでは,メッセージ送信処理を続行します。トピックの自動生成を無効にするには,Brokerのauto.create.topics.enableをfalseに設定してください。Brokerで設定するプロパティについてはKafkaの公式ドキュメントを参照してください。
- パーティション
-
Broker上のデータの読み書きを行う単位です。トピックは複数のパーティションで構成されます。パーティションの切り分けは次の順に決定します。
-
メッセージのpartition要素に有効なパーティションIDが指定されている場合,そのパーティションIDを持つパーティションに送信されます。ヘッダ要求電文の<kafka-header-send-request>-<record>-<partition>で設定します。
-
メッセージのpartition要素にパーティションIDが指定されていない場合,メッセージのキーを指定することで,キーのハッシュ値に応じて送信先のパーティションに送信されます。同一のキーを持つメッセージは同一のパーティションIDを持つパーティションに送信されます。メッセージのキーはヘッダ要求電文の<kafka-header-send-request>-<record>-<key>で設定します。
-
メッセージのキーに何も指定されていない場合,デフォルトの設定に応じて複数のパーティションにメッセージが送信されます。
-
- Kafkaクラスタ
-
複数のBrokerによって構成されたクラスタシステムです。
Kafkaはトピックと呼ばれる単位で,種別ごとにメッセージを管理しています。Kafkaアダプタは,ビジネスプロセスから送られる要求電文からメッセージを作成し,定義ファイルで設定した内容を基に指定したApache Kafka内にメッセージを格納します。メッセージのキーと値の型の構成が同一,かつ,同じKafkaクラスタ内にメッセージを送信する場合,1つのKafkaアダプタでApache Kafkaにメッセージを送信できます。また,1つのKafkaアダプタで同じKafkaクラスタ内の異なるトピックにメッセージを送信することもできます。
KafkaアダプタとKafkaは,ローカル環境でもリモート環境でも接続できます。
(2) Kafkaアダプタで使用できる通信モデル
Kafkaアダプタの通信モデルは,同期通信です。通信モデルに「非同期」を設定した場合は,Kafkaアダプタの呼び出し時にエラーになります。
(3) メッセージの変換
ビジネスプロセスから渡された要求電文をKafkaアダプタでメッセージに変換します。Kafkaアダプタの要求電文の形式については,マニュアル「サービスプラットフォーム 開発ガイド 受付・アダプタ定義編」の「3.3.16 Kafkaアダプタを定義する」を参照してください。
Kafkaでは,メッセージのキーおよび値はシリアライズされています。Kafkaで使用できる形式のメッセージを作成するため,Kafkaアダプタではメッセージのシリアライズを行います。シリアライズ形式には,バイナリ形式と文字列形式が使用できます。メッセージのシリアライズ形式の指定は,Kafkaアダプタ定義ファイルの次のプロパティで指定します。
プロパティ |
説明 |
---|---|
producer.key.serializer |
メッセージのキーのシリアライザクラスを指定します。 |
producer.value.serializer |
メッセージの値のシリアライザクラスを指定します。 |
(4) メッセージの送信
Kafkaアダプタで変換したメッセージをKafkaに送信します。Kafkaアダプタでは,要求電文から生成されたメッセージを,1つのメッセージごとにKafkaに送信します。ただし,マルチスレッドでの送信のため,メッセージの送信順序は保証されません。
Kafkaは,Kafkaアダプタから送られてくるメッセージを受信するとメッセージ受信完了通知(Ack)をKafkaアダプタに返します。Kafkaアダプタは,Ackを受信すると,メッセージ送信の完了処理を行います。
メッセージの送信では,メッセージの配信保証レベル,再送,圧縮をKafkaアダプタ定義ファイルのプロパティで設定できます。
(a) メッセージの配信保証レベル
メッセージ配信の保証レベルを設定できます。メッセージ配信の保証レベルはKafkaアダプタ定義ファイルのproducer.acksで設定します。
Kafkaアダプタで保証しているメッセージ配信の種類を次に示します。
保証レベルの種類 |
説明 |
再送 |
重複除去 |
---|---|---|---|
At Most Once |
一回はメッセージの配信を試みます。重複メッセージは発生しません。メッセージがロストする場合があります。 |
× |
× |
At Least Once |
少なくとも一回はメッセージを配信します。重複メッセージが発生する場合があります。デフォルトではAt Least Onceが設定されています。 |
○ |
× |
- 注意事項
-
Kafkaにメッセージ送信リクエストを実行して完了通知を受け取る間にクラッシュした場合,Kafkaアダプタはそのメッセージの送信完了前という状態になり,再度ビジネスプロセスから送信要求を実行した場合に同じメッセージを再送することがあります。そのため,メッセージ配信の保証の設定に関わらず,メッセージの重複を前提として設計してください。
(b) メッセージの再送
一時的なエラーで送信に失敗した場合,メッセージを再送できます。メッセージの再送は,Kafkaアダプタ定義ファイルの次のプロパティで設定します。
プロパティ |
説明 |
---|---|
producer.acks |
Kafkaアダプタからの送信リクエストに,Kafkaのメッセージ受信完了通知(Ack)を返すタイミングを指定します。 |
producer.retries |
一時的なエラーで送信に失敗したメッセージを再送信する場合の再試行回数を指定します。 |
producer.delivery.timeout.ms |
Kafkaアダプタからの送信リクエストの呼び出しが戻ったあと,Kafkaへのメッセージ送信の成功または失敗を応答する時間の上限を指定します。 |
ネットワークエラーが発生した場合,メッセージを再送しようとすると重複が発生するときがあります。メッセージが送信される前にネットワークエラーが発生した場合は,重複は発生しません。ただし,メッセージがKafkaに格納されたあと応答を送信者に配信する前にネットワークエラーが発生した場合は,Kafkaアダプタはエラーを検出できません。
(c) メッセージの圧縮
送信するメッセージを指定した圧縮方式で圧縮できます。メッセージの圧縮は,Kafkaアダプタ定義ファイルのproducer.compression.typeで設定します。
(5) Kafkaアダプタのセキュリティ設定
Kafkaアダプタでは,TLSを使用した暗号化と認証,ホスト名の検証をサポートしています。TLSで使用するセキュリティ通信用プロトコルについては,Kafkaの公式ドキュメントを参照してください。
TLSを使用した暗号化と認証に関する定義を設定するには,Kafkaアダプタ実行環境(共通)プロパティファイルに次のプロパティを設定します。
プロパティ |
説明 |
---|---|
producer.security.protocol |
Brokerとの通信に使用されるプロトコルの指定 |
producer.ssl.key.password |
キーストアファイル内の秘密鍵のパスワード |
producer.ssl.keystore.key |
キーストアファイルの秘密キー |
producer.ssl.keystore.location |
キーストアファイルの格納場所 |
producer.ssl.keystore.password |
キーストアファイルのストアパスワード |
producer.ssl.truststore.location |
トラストストアファイルの格納場所 |
producer.ssl.truststore.password |
トラストストアファイルのパスワード |
producer.ssl.enabled.protocols |
SSL接続が有効になっているプロトコルのリスト |
producer.ssl.keystore.type |
キーストアファイルのファイル形式 |
producer.ssl.protocol |
SSLコンテキストの生成に使用されるSSLプロトコル |
producer.ssl.provider |
SSL接続に使用されるセキュリティプロバイダの名前 |
producer.ssl.truststore.type |
トラストストアファイルのファイル形式 |
TLSで使用できるTLS暗号スイートの制限はありません。ただし,使用する暗号スイートを設定することはできません。
KafkaのTLSを使用した暗号化と認証の設定については,Kafkaの公式ドキュメントを参照してください。