ストリームデータ処理基盤 uCosminexus Stream Data Platform - Application Framework アプリケーション開発ガイド
ここでは,インプロセス連携カスタムアダプターでのクエリ結果データの受信処理の基本的な流れについて,実装例を基に説明します。
インプロセス連携アダプターでのクエリ結果データの受信方式には,ポーリング方式とコールバック方式の2種類があります。それぞれの方式について説明します。
SDPサーバから能動的にストリームクエリ結果データを受信する方式です。
ポーリング方式によるクエリ結果データの受信処理について,実装例を次に示します。
public class Inpro_ReceiveSample1 implements StreamInprocessUP {
// StreamInprocessUPの実装
public void execute(SDPConnector con) {
try {
// 1. 出力ストリームに接続
StreamOutput o = con.openStreamOutput("GROUP","QUERY1");
// 2. タプルを受信
try {
while(true) {
ArrayList tupleList = o.getAll();
}
} catch (SDPClientEndOfStreamException e) {
System.out.println("データ受信完了");
}
// 3. 出力ストリームとの接続を切断
o.close();
// 4. SDPサーバとの接続を切断
con.close();
} catch (SDPClientException e) {
System.err.println(e.getMessage());
}
}
}
それぞれの処理の意味について説明します。番号は実装例中のコメントの番号に対応しています。
StreamOutput o = con.openStreamOutput("GROUP","QUERY1");ArrayList tupleList = o.getAll();
o.close();
con.close();
クエリ結果データ受信用のメソッドがSDPサーバによって呼び出される方式です。
コールバック方式によるクエリ結果データの受信処理について,実装例を次に示します。
コールバック方式によってクエリ結果データを受信する場合,次の二つのメソッドを実装する必要があります。
それぞれの実装例について説明します。
public class Inpro_ReceiveSample2 implements StreamInprocessUP {
// StreamInprocessUPの実装
public void execute(SDPConnector con) { // AP起動時に実行される処理
// 1. 出力ストリームに接続
StreamOutput o = con.openStreamOutput("GROUP","QUERY2");
// 2. コールバック用リスナーオブジェクトの生成
CallBack notifiable = new CallBack();
// 3. コールバック用リスナーオブジェクトの登録
o.registerForNotification(notifiable);
}
public void stop() { // AP停止時に実行される処理
// 4. コールバック用リスナーオブジェクトの解除
o.unregisterForNotification(notifiable);
// 5. SDPサーバとの接続を切断
con.close();
}
}
public class CallBack implements StreamEventListener {
// StreamEventListenerの実装
public void onEvent(StreamTuple tuple) {
// 6. タプル受信時の処理
System.out.println("タプル受信:" + tuple);
}
}
それぞれの処理の意味について説明します。番号は実装例中のコメントの番号に対応しています。
StreamOutput o = con.openStreamOutput("GROUP","QUERY2");CallBack notifiable = new CallBack();
o.registerForNotification(notifiable);
o.unregisterForNotification(notifiable);
con.close();
public void onEvent(StreamTuple tuple) {
// 6. タプル受信時の処理
}All Rights Reserved. Copyright (C) 2010, 2014, Hitachi, Ltd.