ストリームデータ処理基盤 uCosminexus Stream Data Platform - Application Framework アプリケーション開発ガイド
ここでは,「9.3.1 インプロセス連携カスタムアダプターのサンプルプログラムの実行手順」の手順4.でコンパイルしたインプロセス連携データ受信AP(Inprocess_Receiver.java)の内容について説明します。このプログラムでは,Inprocess_Receiverスレッドが100ミリ秒間隔でSDPサーバに対してポーリング方式でタプル取得処理を実行します。このスレッドは,送受信制御プログラムであるInprocess_Mainクラスによって起動されます。
ソースコードを次に示します。なお,「//【1】」「//〜【1】」などのコメントは,以降の説明の番号と対応しています(これらのコメントは,実際のサンプルプログラムには記載されていません)。
package samples;
import jp.co.Hitachi.soft.sdp.common.data.StreamTuple;
import jp.co.Hitachi.soft.sdp.common.exception.SDPClientException;
import jp.co.Hitachi.soft.sdp.common.exception.SDPClientQueryGroupHoldException;
import jp.co.Hitachi.soft.sdp.common.exception.SDPClientQueryGroupStopException;
import jp.co.Hitachi.soft.sdp.common.exception.SDPClientEndOfStreamException;
import jp.co.Hitachi.soft.sdp.common.util.StreamTime;
import jp.co.Hitachi.soft.sdp.api.SDPConnector;
import jp.co.Hitachi.soft.sdp.api.StreamOutput;
public class Inprocess_Receiver extends Thread {
// SDPサーバとのコネクター
private SDPConnector connector;
//【2】
// スレッドの起動状態
private volatile boolean running = true;
//〜【2】
// 受信スレッドの生成処理
public Inprocess_Receiver(SDPConnector c) {
this.connector = c;
}
public void run() {
final String groupName = "Inprocess_QueryGroupTest";
final String streamName = "FILTER1";
// 結果ストリームオブジェクト
StreamOutput so = null;
// 結果ストリームに接続します
try {
so = connector.openStreamOutput(groupName,streamName);
} catch (SDPClientException sce) {
System.err.println("Receiver : " + sce.getMessage());
running = false;
}
//【1】
// タプルの受信処理を開始します
while(running){
//〜【1】
try {
Thread.sleep(100);
} catch (InterruptedException e) {
System.err.println("Receiver : Thread is interrupted");
e.printStackTrace();
}
// タプルオブジェクト
StreamTuple tuple = null;
// タプルをポーリングで取得します
try {
tuple = so.get();
} catch (SDPClientEndOfStreamException see) {
//送信ストリームデータが終了しました
System.out.println("Receiver : " + see.getMessage());
break;
} catch (SDPClientQueryGroupStopException sce) {
//クエリグループが停止しています(sdpcqlstopコマンド)
System.err.println("Receiver : " + sce.getMessage());
break;
} catch (SDPClientQueryGroupHoldException sce) {
//クエリグループが閉塞しています
System.err.println("Receiver : " + sce.getMessage());
break;
} catch (SDPClientException sce) {
System.err.println("Receiver : " + sce.getMessage());
break;
}
//受信したタプルを表示します
if (tuple != null) {
Object[] data = tuple.getDataArray();
String val = (String) data[0];
Long id = (Long) data[1];
StreamTime time = tuple.getSystemTime();
System.out.println("Receiver : Tuple Get on " + streamName
+" [ VAL="+val+", ID="+id+", TIME="+time.toString()+" ]");
}
}
try {
if (so != null) {
// 結果ストリームを閉じます
so.close();
}
} catch (SDPClientException sce) {
System.err.println("Receiver : " + sce.getMessage());
}
}
//【2】
public void terminate() {
System.out.println"Receiver : terminate called");
this.running = false;
}
//〜【2】
}
ソースコードの内容について説明します。
All Rights Reserved. Copyright (C) 2010, 2014, Hitachi, Ltd.