ストリームデータ処理基盤 uCosminexus Stream Data Platform - Application Framework アプリケーション開発ガイド
ここでは,「9.2.1 RMI連携カスタムアダプターのサンプルプログラムの実行手順」の手順4.でコンパイルしたRMI連携データ送信AP(RMI_SendTupleTest.java)の内容について説明します。
このアプリケーションは,ストリームs1,s2をSDPサーバに送信するアプリケーションです。データ送信では,StreamInputインタフェースを使用します。
ソースコードを次に示します。なお,「//【1】」「//〜【1】」などのコメントは,以降の説明の番号と対応しています(これらのコメントは,実際のサンプルプログラムには記載されていません。また,一部コメントの説明は実際のサンプルプログラムと異なります)。
package samples; import jp.co.Hitachi.soft.sdp.common.data.StreamTuple; import jp.co.Hitachi.soft.sdp.common.exception.SDPClientException; import jp.co.Hitachi.soft.sdp.common.exception.SDPClientQueryGroupHoldException; import jp.co.Hitachi.soft.sdp.common.exception.SDPClientQueryGroupStopException; import jp.co.Hitachi.soft.sdp.api.SDPConnector; import jp.co.Hitachi.soft.sdp.api.SDPConnectorFactory; import jp.co.Hitachi.soft.sdp.api.StreamInput; public class RMI_SendTupleTest{ //送信対象のストリーム名称を設定します private static final String TARGET_STREAM1 = "s1"; private static final String TARGET_STREAM2 = "s2"; private static final String TARGET_QUERYGROUP = "RMI_QueryGroupTest"; private static SDPConnector connector; private static StreamInput streamInput1; private static StreamInput streamInput2; private static long interval=1000; //【1】 public static void main(String[] args) { try { //SDPサーバに接続します connector = SDPConnectorFactory.connect(); execute(); } catch (SDPClientException sce) { System.err.println(sce.getMessage()); } } //〜【1】 // データ送信処理を実行します public static void execute() { try { //【2】 //送信対象のストリームに接続します streamInput1 = connector.openStreamInput(TARGET_QUERYGROUP, TARGET_STREAM1); streamInput2 = connector.openStreamInput(TARGET_QUERYGROUP, TARGET_STREAM2); //〜【2】 //データを送信します for(int i=0; i<10; i++){ try{ Thread.sleep(interval); } catch (InterruptedException e) { e.printStackTrace(); } //【3】 //タプルに設定するオブジェクトを生成します Object[] data1 = new Object[]{ new Integer(i), new String("data1:"+i) }; Object[] data2 = new Object[]{ new Integer(i), new String("data2:"+i) }; //オブジェクトをタプルに設定します StreamTuple tuple1 = new StreamTuple(data1); StreamTuple tuple2 = new StreamTuple(data2); //〜【3】 //【4】 //タプルを送信します streamInput1.put(tuple1); streamInput2.put(tuple2); } //データ送信完了を通知します streamInput1.putEnd(); streamInput2.putEnd(); //〜【4】 } catch (SDPClientQueryGroupStopException sce) { //クエリグループが停止しています(sdpcqlstopコマンド) System.err.println(sce.getMessage()); } catch (SDPClientQueryGroupHoldException sce) { //クエリグループが閉塞しています System.err.println(sce.getMessage()); } catch (SDPClientException sce) { System.err.println(sce.getMessage()); //【5】 } finally { if (!connector.isClosed()) { try { //入力ストリームと接続を閉じます streamInput1.close(); streamInput2.close(); // SDPサーバと接続を閉じます connector.close(); } catch (SDPClientException sce2) { System.err.println(sce2.getMessage()); } } } } //〜【5】 }
ソースコードの内容について説明します。
All Rights Reserved. Copyright (C) 2010, 2014, Hitachi, Ltd.