ストリームデータ処理基盤 uCosminexus Stream Data Platform - Application Framework アプリケーション開発ガイド
ここでは,「9.3.1 インプロセス連携カスタムアダプターのサンプルプログラムの実行手順」の手順4.でコンパイルしたインプロセス連携データ受信AP(Inprocess_Receiver.java)の内容について説明します。このプログラムでは,Inprocess_Receiverスレッドが100ミリ秒間隔でSDPサーバに対してポーリング方式でタプル取得処理を実行します。このスレッドは,送受信制御プログラムであるInprocess_Mainクラスによって起動されます。
ソースコードを次に示します。なお,「//【1】」「//〜【1】」などのコメントは,以降の説明の番号と対応しています(これらのコメントは,実際のサンプルプログラムには記載されていません)。
package samples; import jp.co.Hitachi.soft.sdp.common.data.StreamTuple; import jp.co.Hitachi.soft.sdp.common.exception.SDPClientException; import jp.co.Hitachi.soft.sdp.common.exception.SDPClientQueryGroupHoldException; import jp.co.Hitachi.soft.sdp.common.exception.SDPClientQueryGroupStopException; import jp.co.Hitachi.soft.sdp.common.exception.SDPClientEndOfStreamException; import jp.co.Hitachi.soft.sdp.common.util.StreamTime; import jp.co.Hitachi.soft.sdp.api.SDPConnector; import jp.co.Hitachi.soft.sdp.api.StreamOutput; public class Inprocess_Receiver extends Thread { // SDPサーバとのコネクター private SDPConnector connector; //【2】 // スレッドの起動状態 private volatile boolean running = true; //〜【2】 // 受信スレッドの生成処理 public Inprocess_Receiver(SDPConnector c) { this.connector = c; } public void run() { final String groupName = "Inprocess_QueryGroupTest"; final String streamName = "FILTER1"; // 結果ストリームオブジェクト StreamOutput so = null; // 結果ストリームに接続します try { so = connector.openStreamOutput(groupName,streamName); } catch (SDPClientException sce) { System.err.println("Receiver : " + sce.getMessage()); running = false; } //【1】 // タプルの受信処理を開始します while(running){ //〜【1】 try { Thread.sleep(100); } catch (InterruptedException e) { System.err.println("Receiver : Thread is interrupted"); e.printStackTrace(); } // タプルオブジェクト StreamTuple tuple = null; // タプルをポーリングで取得します try { tuple = so.get(); } catch (SDPClientEndOfStreamException see) { //送信ストリームデータが終了しました System.out.println("Receiver : " + see.getMessage()); break; } catch (SDPClientQueryGroupStopException sce) { //クエリグループが停止しています(sdpcqlstopコマンド) System.err.println("Receiver : " + sce.getMessage()); break; } catch (SDPClientQueryGroupHoldException sce) { //クエリグループが閉塞しています System.err.println("Receiver : " + sce.getMessage()); break; } catch (SDPClientException sce) { System.err.println("Receiver : " + sce.getMessage()); break; } //受信したタプルを表示します if (tuple != null) { Object[] data = tuple.getDataArray(); String val = (String) data[0]; Long id = (Long) data[1]; StreamTime time = tuple.getSystemTime(); System.out.println("Receiver : Tuple Get on " + streamName +" [ VAL="+val+", ID="+id+", TIME="+time.toString()+" ]"); } } try { if (so != null) { // 結果ストリームを閉じます so.close(); } } catch (SDPClientException sce) { System.err.println("Receiver : " + sce.getMessage()); } } //【2】 public void terminate() { System.out.println"Receiver : terminate called"); this.running = false; } //〜【2】 }
ソースコードの内容について説明します。
All Rights Reserved. Copyright (C) 2010, 2014, Hitachi, Ltd.