ストリームデータ処理基盤 uCosminexus Stream Data Platform - Application Framework アプリケーション開発ガイド

[目次][索引][前へ][次へ]

9.2.4 RMI連携データ受信APの内容

ここでは,「9.2.1 RMI連携カスタムアダプターのサンプルプログラムの実行手順」の手順4.でコンパイルしたRMI連携データ受信AP(RMI_ReceiveTupleTest.java)の内容について説明します。

このアプリケーションは,ストリームs1,s2をJOINした結果を受け取るアプリケーションです。データ受信では,StreamOutputクラスを使用します。

ソースコードを次に示します。なお,「//【1】」「//〜【1】」などのコメントは,以降の説明の番号と対応しています(これらのコメントは,実際のサンプルプログラムには記載されていません。また,一部コメントの説明は実際のサンプルプログラムと異なります)。

サンプルプログラムの内容

 
package samples;
 
import java.util.Iterator;
import java.util.List;
 
import jp.co.Hitachi.soft.sdp.api.SDPConnectorFactory;
import jp.co.Hitachi.soft.sdp.api.SDPConnector;
import jp.co.Hitachi.soft.sdp.api.StreamOutput;
import jp.co.Hitachi.soft.sdp.common.data.StreamTuple;
import jp.co.Hitachi.soft.sdp.common.exception.SDPClientCommunicationException;
import jp.co.Hitachi.soft.sdp.common.exception.SDPClientException;
import jp.co.Hitachi.soft.sdp.common.exception.SDPClientQueryGroupHoldException;
import jp.co.Hitachi.soft.sdp.common.exception.SDPClientQueryGroupStopException;
import jp.co.Hitachi.soft.sdp.common.exception.SDPClientEndOfStreamException;
import jp.co.Hitachi.soft.sdp.common.util.StreamTime;
/**
 * ストリームデータを取得し,表示します
 *
 */
 
public class RMI_ReceiveTupleTest {
  //取得するストリーム名称を設定します
  private static final String STREAM_NAME = "JOIN";
  private static final String TARGET_QUERYGROUP = "RMI_QueryGroupTest";
  
  private static SDPConnector connector;
  private StreamOutput streamOutput;
  private boolean run=true;
  private long interval=10;
  public static void main(String[] args) {
 
    //【1】
    try {
      // SDPサーバに接続します
      connector = SDPConnectorFactory.connect();
      
      //受信処理を開始します
      RMI_ReceiveTupleTest receiver = new RMI_ReceiveTupleTest();
      receiver.execute();
    } catch (SDPClientException sce) {
      System.err.println(sce.getMessage());
    }
    //〜【1】
  }
  
  //受信処理を実行します
  public void execute() {
    try {
      //【2】
      //結果ストリームに接続します
      streamOutput = connector.openStreamOutput(TARGET_QUERYGROUP, STREAM_NAME);
      //〜【2】
      
      while (run) {
        try {
          Thread.sleep(interval);
      } catch (InterruptedException e) {
          e.printStackTrace();
      }
    
      //【3】
      //タプルを受信します
      List<StreamTuple> tupleList = streamOutput.getAll();
      if (tupleList.isEmpty()) continue;
      //〜【3】
  
      for (Iterator iterator = tupleList.iterator(); iterator.hasNext();) {
      StreamTuple tuple = (StreamTuple) iterator.next();
      
      //【4】
      //受信したタプルを表示します
      Object[] data = tuple.getDataArray();
      Integer id1 = (Integer)data[0];
      String val1 = (String)data[1];
      String val2 = (String)data[2];
      StreamTime time = tuple.getSystemTime();
      System.out.println("Get Data: ID1="+id1+
        ", VAL1="+val1+
        ", VAL2="+val2+
        ", TIME="+time.toString());
      //〜【4】
      }
    }
  } catch (SDPClientEndOfStreamException see) {
    //送信ストリームデータが終了しました
    System.out.println(see.getMessage());
  } catch (SDPClientQueryGroupStopException sce) {
    //クエリグループが停止しています(sdpcqlstopコマンド)
    System.err.println(sce.getMessage());
  } catch (SDPClientQueryGroupHoldException sce) {
    //クエリグループが閉塞しています
    System.err.println(sce.getMessage());
  } catch (SDPClientCommunicationException sce) {
    System.err.println(sce.getMessage());
  } catch (SDPClientException sce) {
    System.err.println(sce.getMessage());
  } finally {
      if (!connector.isClosed()) {
        //【5】
        try {
          //出力ストリームと接続を閉じます
          streamOutput.close();
          // SDPサーバと接続を閉じます
          connector.close();
        } catch (SDPClientException sce) {
          System.err.println(sce.getMessage());
        }
        //〜【5】
      }
    }
  }
}
 

ソースコードの内容について説明します。

  1. SDPConnectorFactory.connectメソッドでSDPサーバに接続して,SDPConnector型オブジェクトを取得します。
  2. SDPConnector型オブジェクトを使用して,openStreamOutputメソッドを呼び出します。このメソッドによって,受信元ストリームの出力ストリームに接続して,StreamOutput型オブジェクトを取得します。
  3. StreamOutput型オブジェクトを使用してgetAllメソッドを呼び出し,2.で接続した出力ストリームからStreamTuple型オブジェクトのタプルをまとめて受信します。
  4. StreamTuple型オブジェクトを使用してgetDataArrayメソッドを呼び出し,タプルのオブジェクト配列を取得します。このサンプルプログラムでは,タプルのデータをInteger型およびString型で取り出して,標準出力に出力しています。
  5. StreamOutput型オブジェクトのcloseメソッドで出力ストリームとの接続を切断します。その後,SDPConnector型オブジェクトのcloseメソッドでSDPConnectorを閉じます。