ストリームデータ処理基盤 uCosminexus Stream Data Platform - Application Framework アプリケーション開発ガイド
ここでは,「9.2.1 RMI連携カスタムアダプターのサンプルプログラムの実行手順」の手順4.でコンパイルしたRMI連携データ受信AP(RMI_ReceiveTupleTest.java)の内容について説明します。
このアプリケーションは,ストリームs1,s2をJOINした結果を受け取るアプリケーションです。データ受信では,StreamOutputクラスを使用します。
ソースコードを次に示します。なお,「//【1】」「//〜【1】」などのコメントは,以降の説明の番号と対応しています(これらのコメントは,実際のサンプルプログラムには記載されていません。また,一部コメントの説明は実際のサンプルプログラムと異なります)。
package samples;
import java.util.Iterator;
import java.util.List;
import jp.co.Hitachi.soft.sdp.api.SDPConnectorFactory;
import jp.co.Hitachi.soft.sdp.api.SDPConnector;
import jp.co.Hitachi.soft.sdp.api.StreamOutput;
import jp.co.Hitachi.soft.sdp.common.data.StreamTuple;
import jp.co.Hitachi.soft.sdp.common.exception.SDPClientCommunicationException;
import jp.co.Hitachi.soft.sdp.common.exception.SDPClientException;
import jp.co.Hitachi.soft.sdp.common.exception.SDPClientQueryGroupHoldException;
import jp.co.Hitachi.soft.sdp.common.exception.SDPClientQueryGroupStopException;
import jp.co.Hitachi.soft.sdp.common.exception.SDPClientEndOfStreamException;
import jp.co.Hitachi.soft.sdp.common.util.StreamTime;
/**
* ストリームデータを取得し,表示します
*
*/
public class RMI_ReceiveTupleTest {
//取得するストリーム名称を設定します
private static final String STREAM_NAME = "JOIN";
private static final String TARGET_QUERYGROUP = "RMI_QueryGroupTest";
private static SDPConnector connector;
private StreamOutput streamOutput;
private boolean run=true;
private long interval=10;
public static void main(String[] args) {
//【1】
try {
// SDPサーバに接続します
connector = SDPConnectorFactory.connect();
//受信処理を開始します
RMI_ReceiveTupleTest receiver = new RMI_ReceiveTupleTest();
receiver.execute();
} catch (SDPClientException sce) {
System.err.println(sce.getMessage());
}
//〜【1】
}
//受信処理を実行します
public void execute() {
try {
//【2】
//結果ストリームに接続します
streamOutput = connector.openStreamOutput(TARGET_QUERYGROUP, STREAM_NAME);
//〜【2】
while (run) {
try {
Thread.sleep(interval);
} catch (InterruptedException e) {
e.printStackTrace();
}
//【3】
//タプルを受信します
List<StreamTuple> tupleList = streamOutput.getAll();
if (tupleList.isEmpty()) continue;
//〜【3】
for (Iterator iterator = tupleList.iterator(); iterator.hasNext();) {
StreamTuple tuple = (StreamTuple) iterator.next();
//【4】
//受信したタプルを表示します
Object[] data = tuple.getDataArray();
Integer id1 = (Integer)data[0];
String val1 = (String)data[1];
String val2 = (String)data[2];
StreamTime time = tuple.getSystemTime();
System.out.println("Get Data: ID1="+id1+
", VAL1="+val1+
", VAL2="+val2+
", TIME="+time.toString());
//〜【4】
}
}
} catch (SDPClientEndOfStreamException see) {
//送信ストリームデータが終了しました
System.out.println(see.getMessage());
} catch (SDPClientQueryGroupStopException sce) {
//クエリグループが停止しています(sdpcqlstopコマンド)
System.err.println(sce.getMessage());
} catch (SDPClientQueryGroupHoldException sce) {
//クエリグループが閉塞しています
System.err.println(sce.getMessage());
} catch (SDPClientCommunicationException sce) {
System.err.println(sce.getMessage());
} catch (SDPClientException sce) {
System.err.println(sce.getMessage());
} finally {
if (!connector.isClosed()) {
//【5】
try {
//出力ストリームと接続を閉じます
streamOutput.close();
// SDPサーバと接続を閉じます
connector.close();
} catch (SDPClientException sce) {
System.err.println(sce.getMessage());
}
//〜【5】
}
}
}
}
ソースコードの内容について説明します。
All Rights Reserved. Copyright (C) 2010, 2014, Hitachi, Ltd.