ストリームデータ処理基盤 uCosminexus Stream Data Platform - Application Framework アプリケーション開発ガイド
ここでは,「9.3.1 インプロセス連携カスタムアダプターのサンプルプログラムの実行手順」の手順4.でコンパイルしたインプロセス連携データ送信AP(Inprocess_Sender.java)の内容について説明します。
ソースコードを次に示します。なお,「//【1】」「//〜【1】」などのコメントは,以降の説明の番号と対応しています(これらのコメントは,実際のサンプルプログラムには記載されていません)。
package samples; import java.util.ArrayList; import jp.co.Hitachi.soft.sdp.common.data.StreamTuple; import jp.co.Hitachi.soft.sdp.common.exception.SDPClientException; import jp.co.Hitachi.soft.sdp.common.exception.SDPClientQueryGroupHoldException; import jp.co.Hitachi.soft.sdp.common.exception.SDPClientQueryGroupStopException; import jp.co.Hitachi.soft.sdp.api.SDPConnector; import jp.co.Hitachi.soft.sdp.api.StreamInput; public class Inprocess_Sender extends Thread{ //【2】 // スレッドの起動状態 private volatile boolean running = true; //〜【2】 // SDPサーバとのコネクター private SDPConnector connector; // 送信スレッドの生成処理 public Inprocess_Sender(SDPConnector c) { this.connector = c; } ArrayList<StreamTuple> list = new ArrayList<StreamTuple>(); public void run() { final String group_name = "Inprocess_QueryGroupTest"; final String stream_name = "DATA0"; // 送信ストリームオブジェクト StreamInput si = null; // 送信ストリームに接続します try { si = connector.openStreamInput(group_name,stream_name); } catch (SDPClientException sce) { System.err.println("Sender :" + sce.getMessage()); running = false; } for(int i = 0; i < 50;i++){ if(!running) { break; } // 送信データを生成します Object[] data = new Object[]{ new String("ad"+i), new Long(i) }; // タプルオブジェクトを生成します StreamTuple tuple = new StreamTuple(data); try { Thread.sleep(100); } catch (InterruptedException e) { System.err.println("Sender : Thread is interrupted"); e.printStackTrace(); } // 単一のタプルを送信します try { si.put(tuple); } catch (SDPClientQueryGroupStopException sce) { //クエリグループが停止しています(sdpcqlstopコマンド) //何もせずに処理を続行します } catch (SDPClientQueryGroupHoldException sce) { //クエリグループが閉塞しています //何もせずに処理を続行します } catch (SDPClientException sce) { System.err.println("Sender :" + sce.getMessage()); running = false; } } //【1】 if(running) { try { si.putEnd(); //〜【1】 } catch (SDPClientQueryGroupStopException sce) { //クエリグループが停止しています(sdpcqlstopコマンド) //何もせずに処理を続行します } catch (SDPClientQueryGroupHoldException sce) { //クエリグループが閉塞しています //何もせずに処理を続行します } catch (SDPClientException sce) { System.err.println("Sender :" + sce.getMessage()); } } try { if (si != null) { // 送信ストリームを閉じます si.close(); } } catch (SDPClientException sce) { System.err.println("Sender :" + sce.getMessage()); } } //【2】 public void terminate() { System.out.println("Sender : terminate called"); this.running = false; //〜【2】 } }
ソースコードの内容について説明します。
All Rights Reserved. Copyright (C) 2010, 2014, Hitachi, Ltd.