ストリームデータ処理基盤 uCosminexus Stream Data Platform - Application Framework アプリケーション開発ガイド

[目次][索引][前へ][次へ]

7.3.2 クエリ結果データの受信(インプロセス連携カスタムアダプター)

ここでは,インプロセス連携カスタムアダプターでのクエリ結果データの受信処理の基本的な流れについて,実装例を基に説明します。

インプロセス連携アダプターでのクエリ結果データの受信方式には,ポーリング方式とコールバック方式の2種類があります。それぞれの方式について説明します。

<この項の構成>
(1) ポーリング方式
(2) コールバック方式

(1) ポーリング方式

SDPサーバから能動的にストリームクエリ結果データを受信する方式です。

ポーリング方式によるクエリ結果データの受信処理について,実装例を次に示します。

実装例

public class Inpro_ReceiveSample1 implements StreamInprocessUP { 
  // StreamInprocessUPの実装
  public void execute(SDPConnector con) {
    try {
      // 1. 出力ストリームに接続
      StreamOutput o = con.openStreamOutput("GROUP","QUERY1");
 
      // 2. タプルを受信
      try {
        while(true) {
          ArrayList tupleList = o.getAll();
        }
      } catch (SDPClientEndOfStreamException e) {
        System.out.println("データ受信完了");
      }
 
      // 3. 出力ストリームとの接続を切断
      o.close();
 
      // 4. SDPサーバとの接続を切断
      con.close();
    } catch (SDPClientException e) {
      System.err.println(e.getMessage());
    }
  }
}
 

実装内容の説明

それぞれの処理の意味について説明します。番号は実装例中のコメントの番号に対応しています。

  1. executeメソッドのパラメーターで渡されたSDPConnector型オブジェクト(con)を使用して,クエリグループ名が"GROUP",クエリ名が"QUERY1"の出力ストリームに接続し,StreamOutput型オブジェクト(o)を取得します。
    StreamOutput o = con.openStreamOutput("GROUP","QUERY1");
  2. StreamOutput型オブジェクト(o)を使用して,クエリ結果データ(タプル)を取得します。
    ArrayList tupleList = o.getAll();
  3. クエリ結果データの受信完了後,StreamOutput型オブジェクト(o)を使用して,出力ストリームとの接続を切断します。
    o.close();
  4. SDPConnector型オブジェクト(con)を使用して,SDPサーバとの接続を切断します。
    con.close();
     

(2) コールバック方式

クエリ結果データ受信用のメソッドがSDPサーバによって呼び出される方式です。

コールバック方式によるクエリ結果データの受信処理について,実装例を次に示します。

コールバック方式によってクエリ結果データを受信する場合,次の二つのメソッドを実装する必要があります。

それぞれの実装例について説明します。

実装例(executeメソッドの実装)

public class Inpro_ReceiveSample2 implements StreamInprocessUP { 
  // StreamInprocessUPの実装
  public void execute(SDPConnector con) {  // AP起動時に実行される処理
    // 1. 出力ストリームに接続
    StreamOutput o = con.openStreamOutput("GROUP","QUERY2");
 
    // 2. コールバック用リスナーオブジェクトの生成
    CallBack notifiable = new CallBack();
 
    // 3. コールバック用リスナーオブジェクトの登録
    o.registerForNotification(notifiable);
  }
  public void stop() {  // AP停止時に実行される処理
    // 4. コールバック用リスナーオブジェクトの解除
    o.unregisterForNotification(notifiable);
 
    // 5. SDPサーバとの接続を切断
    con.close();
  }
}
 

実装例(onEventメソッドの実装)

public class CallBack implements StreamEventListener {
  // StreamEventListenerの実装
  public void onEvent(StreamTuple tuple) {
    // 6. タプル受信時の処理
    System.out.println("タプル受信:" + tuple); 
  }
}
 

実装内容の説明

それぞれの処理の意味について説明します。番号は実装例中のコメントの番号に対応しています。

  1. executeメソッドのパラメーターに指定されたSDPConnector型オブジェクト(con)を使用して,クエリグループ名が"GROUP",クエリ名が"QUERY2"の出力ストリームに接続し,StreamOutput型オブジェクト(o)を取得します。
    StreamOutput o = con.openStreamOutput("GROUP","QUERY2");
  2. StreamEventListenerインタフェースの実装クラス(CallBack)のオブジェクト(notifiable)を生成します。
    CallBack notifiable = new CallBack();
  3. StreamOutput型オブジェクト(o)を使用して,手順2.で作成したコールバック用リスナーオブジェクトを登録します。
    o.registerForNotification(notifiable);
  4. カスタムアダプターを停止するときに,StreamOutput型オブジェクト(o)を使用して,コールバック用リスナーオブジェクト(notifiable)を解除します。
    o.unregisterForNotification(notifiable);
  5. カスタムアダプターを停止するときに,SDPConnector型オブジェクト(con)を使用して,SDPサーバとの接続を切断します。
    con.close();
  6. 出力ストリームにクエリ結果データのタプルが到着したときに,StreamEventListenerインタフェースのonEventメソッドに記述した内容が実行されます。
    public void onEvent(StreamTuple tuple) {
    // 6. タプル受信時の処理
      }