ストリームデータ処理基盤 uCosminexus Stream Data Platform - Application Framework アプリケーション開発ガイド
ここでは,「9.2.1 RMI連携カスタムアダプターのサンプルプログラムの実行手順」の手順4.でコンパイルしたRMI連携データ受信AP(RMI_ReceiveTupleTest.java)の内容について説明します。
このアプリケーションは,ストリームs1,s2をJOINした結果を受け取るアプリケーションです。データ受信では,StreamOutputクラスを使用します。
ソースコードを次に示します。なお,「//【1】」「//〜【1】」などのコメントは,以降の説明の番号と対応しています(これらのコメントは,実際のサンプルプログラムには記載されていません。また,一部コメントの説明は実際のサンプルプログラムと異なります)。
package samples; import java.util.Iterator; import java.util.List; import jp.co.Hitachi.soft.sdp.api.SDPConnectorFactory; import jp.co.Hitachi.soft.sdp.api.SDPConnector; import jp.co.Hitachi.soft.sdp.api.StreamOutput; import jp.co.Hitachi.soft.sdp.common.data.StreamTuple; import jp.co.Hitachi.soft.sdp.common.exception.SDPClientCommunicationException; import jp.co.Hitachi.soft.sdp.common.exception.SDPClientException; import jp.co.Hitachi.soft.sdp.common.exception.SDPClientQueryGroupHoldException; import jp.co.Hitachi.soft.sdp.common.exception.SDPClientQueryGroupStopException; import jp.co.Hitachi.soft.sdp.common.exception.SDPClientEndOfStreamException; import jp.co.Hitachi.soft.sdp.common.util.StreamTime; /** * ストリームデータを取得し,表示します * */ public class RMI_ReceiveTupleTest { //取得するストリーム名称を設定します private static final String STREAM_NAME = "JOIN"; private static final String TARGET_QUERYGROUP = "RMI_QueryGroupTest"; private static SDPConnector connector; private StreamOutput streamOutput; private boolean run=true; private long interval=10; public static void main(String[] args) { //【1】 try { // SDPサーバに接続します connector = SDPConnectorFactory.connect(); //受信処理を開始します RMI_ReceiveTupleTest receiver = new RMI_ReceiveTupleTest(); receiver.execute(); } catch (SDPClientException sce) { System.err.println(sce.getMessage()); } //〜【1】 } //受信処理を実行します public void execute() { try { //【2】 //結果ストリームに接続します streamOutput = connector.openStreamOutput(TARGET_QUERYGROUP, STREAM_NAME); //〜【2】 while (run) { try { Thread.sleep(interval); } catch (InterruptedException e) { e.printStackTrace(); } //【3】 //タプルを受信します List<StreamTuple> tupleList = streamOutput.getAll(); if (tupleList.isEmpty()) continue; //〜【3】 for (Iterator iterator = tupleList.iterator(); iterator.hasNext();) { StreamTuple tuple = (StreamTuple) iterator.next(); //【4】 //受信したタプルを表示します Object[] data = tuple.getDataArray(); Integer id1 = (Integer)data[0]; String val1 = (String)data[1]; String val2 = (String)data[2]; StreamTime time = tuple.getSystemTime(); System.out.println("Get Data: ID1="+id1+ ", VAL1="+val1+ ", VAL2="+val2+ ", TIME="+time.toString()); //〜【4】 } } } catch (SDPClientEndOfStreamException see) { //送信ストリームデータが終了しました System.out.println(see.getMessage()); } catch (SDPClientQueryGroupStopException sce) { //クエリグループが停止しています(sdpcqlstopコマンド) System.err.println(sce.getMessage()); } catch (SDPClientQueryGroupHoldException sce) { //クエリグループが閉塞しています System.err.println(sce.getMessage()); } catch (SDPClientCommunicationException sce) { System.err.println(sce.getMessage()); } catch (SDPClientException sce) { System.err.println(sce.getMessage()); } finally { if (!connector.isClosed()) { //【5】 try { //出力ストリームと接続を閉じます streamOutput.close(); // SDPサーバと接続を閉じます connector.close(); } catch (SDPClientException sce) { System.err.println(sce.getMessage()); } //〜【5】 } } } }
ソースコードの内容について説明します。
All Rights Reserved. Copyright (C) 2010, 2014, Hitachi, Ltd.