ストリームデータ処理基盤 uCosminexus Stream Data Platform - Application Framework アプリケーション開発ガイド
ここでは,インプロセス連携カスタムアダプターでのクエリ結果データの受信処理の基本的な流れについて,実装例を基に説明します。
インプロセス連携アダプターでのクエリ結果データの受信方式には,ポーリング方式とコールバック方式の2種類があります。それぞれの方式について説明します。
SDPサーバから能動的にストリームクエリ結果データを受信する方式です。
ポーリング方式によるクエリ結果データの受信処理について,実装例を次に示します。
public class Inpro_ReceiveSample1 implements StreamInprocessUP { // StreamInprocessUPの実装 public void execute(SDPConnector con) { try { // 1. 出力ストリームに接続 StreamOutput o = con.openStreamOutput("GROUP","QUERY1"); // 2. タプルを受信 try { while(true) { ArrayList tupleList = o.getAll(); } } catch (SDPClientEndOfStreamException e) { System.out.println("データ受信完了"); } // 3. 出力ストリームとの接続を切断 o.close(); // 4. SDPサーバとの接続を切断 con.close(); } catch (SDPClientException e) { System.err.println(e.getMessage()); } } }
それぞれの処理の意味について説明します。番号は実装例中のコメントの番号に対応しています。
StreamOutput o = con.openStreamOutput("GROUP","QUERY1");
ArrayList tupleList = o.getAll();
o.close();
con.close();
クエリ結果データ受信用のメソッドがSDPサーバによって呼び出される方式です。
コールバック方式によるクエリ結果データの受信処理について,実装例を次に示します。
コールバック方式によってクエリ結果データを受信する場合,次の二つのメソッドを実装する必要があります。
それぞれの実装例について説明します。
public class Inpro_ReceiveSample2 implements StreamInprocessUP { // StreamInprocessUPの実装 public void execute(SDPConnector con) { // AP起動時に実行される処理 // 1. 出力ストリームに接続 StreamOutput o = con.openStreamOutput("GROUP","QUERY2"); // 2. コールバック用リスナーオブジェクトの生成 CallBack notifiable = new CallBack(); // 3. コールバック用リスナーオブジェクトの登録 o.registerForNotification(notifiable); } public void stop() { // AP停止時に実行される処理 // 4. コールバック用リスナーオブジェクトの解除 o.unregisterForNotification(notifiable); // 5. SDPサーバとの接続を切断 con.close(); } }
public class CallBack implements StreamEventListener { // StreamEventListenerの実装 public void onEvent(StreamTuple tuple) { // 6. タプル受信時の処理 System.out.println("タプル受信:" + tuple); } }
それぞれの処理の意味について説明します。番号は実装例中のコメントの番号に対応しています。
StreamOutput o = con.openStreamOutput("GROUP","QUERY2");
CallBack notifiable = new CallBack();
o.registerForNotification(notifiable);
o.unregisterForNotification(notifiable);
con.close();
public void onEvent(StreamTuple tuple) { // 6. タプル受信時の処理 }
All Rights Reserved. Copyright (C) 2010, 2014, Hitachi, Ltd.