ストリームデータ処理基盤 uCosminexus Stream Data Platform - Application Framework アプリケーション開発ガイド
ここでは,「9.3.1 インプロセス連携カスタムアダプターのサンプルプログラムの実行手順」の手順4.でコンパイルしたインプロセス連携データ送信AP(Inprocess_Sender.java)の内容について説明します。
ソースコードを次に示します。なお,「//【1】」「//〜【1】」などのコメントは,以降の説明の番号と対応しています(これらのコメントは,実際のサンプルプログラムには記載されていません)。
package samples;
import java.util.ArrayList;
import jp.co.Hitachi.soft.sdp.common.data.StreamTuple;
import jp.co.Hitachi.soft.sdp.common.exception.SDPClientException;
import jp.co.Hitachi.soft.sdp.common.exception.SDPClientQueryGroupHoldException;
import jp.co.Hitachi.soft.sdp.common.exception.SDPClientQueryGroupStopException;
import jp.co.Hitachi.soft.sdp.api.SDPConnector;
import jp.co.Hitachi.soft.sdp.api.StreamInput;
public class Inprocess_Sender extends Thread{
//【2】
// スレッドの起動状態
private volatile boolean running = true;
//〜【2】
// SDPサーバとのコネクター
private SDPConnector connector;
// 送信スレッドの生成処理
public Inprocess_Sender(SDPConnector c) {
this.connector = c;
}
ArrayList<StreamTuple> list = new ArrayList<StreamTuple>();
public void run() {
final String group_name = "Inprocess_QueryGroupTest";
final String stream_name = "DATA0";
// 送信ストリームオブジェクト
StreamInput si = null;
// 送信ストリームに接続します
try {
si = connector.openStreamInput(group_name,stream_name);
} catch (SDPClientException sce) {
System.err.println("Sender :" + sce.getMessage());
running = false;
}
for(int i = 0; i < 50;i++){
if(!running) {
break;
}
// 送信データを生成します
Object[] data = new Object[]{
new String("ad"+i),
new Long(i)
};
// タプルオブジェクトを生成します
StreamTuple tuple = new StreamTuple(data);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
System.err.println("Sender : Thread is interrupted");
e.printStackTrace();
}
// 単一のタプルを送信します
try {
si.put(tuple);
} catch (SDPClientQueryGroupStopException sce) {
//クエリグループが停止しています(sdpcqlstopコマンド)
//何もせずに処理を続行します
} catch (SDPClientQueryGroupHoldException sce) {
//クエリグループが閉塞しています
//何もせずに処理を続行します
} catch (SDPClientException sce) {
System.err.println("Sender :" + sce.getMessage());
running = false;
}
}
//【1】
if(running) {
try {
si.putEnd();
//〜【1】
} catch (SDPClientQueryGroupStopException sce) {
//クエリグループが停止しています(sdpcqlstopコマンド)
//何もせずに処理を続行します
} catch (SDPClientQueryGroupHoldException sce) {
//クエリグループが閉塞しています
//何もせずに処理を続行します
} catch (SDPClientException sce) {
System.err.println("Sender :" + sce.getMessage());
}
}
try {
if (si != null) {
// 送信ストリームを閉じます
si.close();
}
} catch (SDPClientException sce) {
System.err.println("Sender :" + sce.getMessage());
}
}
//【2】
public void terminate() {
System.out.println("Sender : terminate called");
this.running = false;
//〜【2】
}
}
ソースコードの内容について説明します。
All Rights Reserved. Copyright (C) 2010, 2014, Hitachi, Ltd.