ストリームデータ処理基盤 uCosminexus Stream Data Platform - Application Framework アプリケーション開発ガイド
ここでは,「9.2.1 RMI連携カスタムアダプターのサンプルプログラムの実行手順」の手順4.でコンパイルしたRMI連携データ送信AP(RMI_SendTupleTest.java)の内容について説明します。
このアプリケーションは,ストリームs1,s2をSDPサーバに送信するアプリケーションです。データ送信では,StreamInputインタフェースを使用します。
ソースコードを次に示します。なお,「//【1】」「//〜【1】」などのコメントは,以降の説明の番号と対応しています(これらのコメントは,実際のサンプルプログラムには記載されていません。また,一部コメントの説明は実際のサンプルプログラムと異なります)。
package samples;
import jp.co.Hitachi.soft.sdp.common.data.StreamTuple;
import jp.co.Hitachi.soft.sdp.common.exception.SDPClientException;
import jp.co.Hitachi.soft.sdp.common.exception.SDPClientQueryGroupHoldException;
import jp.co.Hitachi.soft.sdp.common.exception.SDPClientQueryGroupStopException;
import jp.co.Hitachi.soft.sdp.api.SDPConnector;
import jp.co.Hitachi.soft.sdp.api.SDPConnectorFactory;
import jp.co.Hitachi.soft.sdp.api.StreamInput;
public class RMI_SendTupleTest{
//送信対象のストリーム名称を設定します
private static final String TARGET_STREAM1 = "s1";
private static final String TARGET_STREAM2 = "s2";
private static final String TARGET_QUERYGROUP = "RMI_QueryGroupTest";
private static SDPConnector connector;
private static StreamInput streamInput1;
private static StreamInput streamInput2;
private static long interval=1000;
//【1】
public static void main(String[] args) {
try {
//SDPサーバに接続します
connector = SDPConnectorFactory.connect();
execute();
} catch (SDPClientException sce) {
System.err.println(sce.getMessage());
}
}
//〜【1】
// データ送信処理を実行します
public static void execute() {
try {
//【2】
//送信対象のストリームに接続します
streamInput1 = connector.openStreamInput(TARGET_QUERYGROUP, TARGET_STREAM1);
streamInput2 = connector.openStreamInput(TARGET_QUERYGROUP, TARGET_STREAM2);
//〜【2】
//データを送信します
for(int i=0; i<10; i++){
try{
Thread.sleep(interval);
} catch (InterruptedException e) {
e.printStackTrace();
}
//【3】
//タプルに設定するオブジェクトを生成します
Object[] data1 = new Object[]{
new Integer(i),
new String("data1:"+i)
};
Object[] data2 = new Object[]{
new Integer(i),
new String("data2:"+i)
};
//オブジェクトをタプルに設定します
StreamTuple tuple1 = new StreamTuple(data1);
StreamTuple tuple2 = new StreamTuple(data2);
//〜【3】
//【4】
//タプルを送信します
streamInput1.put(tuple1);
streamInput2.put(tuple2);
}
//データ送信完了を通知します
streamInput1.putEnd();
streamInput2.putEnd();
//〜【4】
} catch (SDPClientQueryGroupStopException sce) {
//クエリグループが停止しています(sdpcqlstopコマンド)
System.err.println(sce.getMessage());
} catch (SDPClientQueryGroupHoldException sce) {
//クエリグループが閉塞しています
System.err.println(sce.getMessage());
} catch (SDPClientException sce) {
System.err.println(sce.getMessage());
//【5】
} finally {
if (!connector.isClosed()) {
try {
//入力ストリームと接続を閉じます
streamInput1.close();
streamInput2.close();
// SDPサーバと接続を閉じます
connector.close();
} catch (SDPClientException sce2) {
System.err.println(sce2.getMessage());
}
}
}
}
//〜【5】
}
ソースコードの内容について説明します。
All Rights Reserved. Copyright (C) 2010, 2014, Hitachi, Ltd.