ストリームデータ処理基盤 uCosminexus Stream Data Platform - Application Framework アプリケーション開発ガイド

[目次][索引][前へ][次へ]

9.2.3 RMI連携データ送信APの内容

ここでは,「9.2.1 RMI連携カスタムアダプターのサンプルプログラムの実行手順」の手順4.でコンパイルしたRMI連携データ送信AP(RMI_SendTupleTest.java)の内容について説明します。

このアプリケーションは,ストリームs1,s2をSDPサーバに送信するアプリケーションです。データ送信では,StreamInputインタフェースを使用します。

ソースコードを次に示します。なお,「//【1】」「//〜【1】」などのコメントは,以降の説明の番号と対応しています(これらのコメントは,実際のサンプルプログラムには記載されていません。また,一部コメントの説明は実際のサンプルプログラムと異なります)。

サンプルプログラムの内容

 
package samples;
 
import jp.co.Hitachi.soft.sdp.common.data.StreamTuple;
import jp.co.Hitachi.soft.sdp.common.exception.SDPClientException;
import jp.co.Hitachi.soft.sdp.common.exception.SDPClientQueryGroupHoldException;
import jp.co.Hitachi.soft.sdp.common.exception.SDPClientQueryGroupStopException;
import jp.co.Hitachi.soft.sdp.api.SDPConnector;
import jp.co.Hitachi.soft.sdp.api.SDPConnectorFactory;
import jp.co.Hitachi.soft.sdp.api.StreamInput;
 
public class RMI_SendTupleTest{
  //送信対象のストリーム名称を設定します
  private static final String TARGET_STREAM1 = "s1";
  private static final String TARGET_STREAM2 = "s2";
  private static final String TARGET_QUERYGROUP = "RMI_QueryGroupTest";
 
  private static SDPConnector connector;
  private static StreamInput streamInput1;
  private static StreamInput streamInput2;
  private static long interval=1000;
 
  //【1】
  public static void main(String[] args) {
    try {
      //SDPサーバに接続します
      connector = SDPConnectorFactory.connect();
      execute();
    } catch (SDPClientException sce) {
      System.err.println(sce.getMessage());
    }
  }
  //〜【1】
  
  // データ送信処理を実行します
  public static void execute() {
    try {
      //【2】
      //送信対象のストリームに接続します
      streamInput1 = connector.openStreamInput(TARGET_QUERYGROUP, TARGET_STREAM1);
      streamInput2 = connector.openStreamInput(TARGET_QUERYGROUP, TARGET_STREAM2);
      //〜【2】
  
      //データを送信します
      for(int i=0; i<10; i++){
        try{
          Thread.sleep(interval);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
  
      //【3】
      //タプルに設定するオブジェクトを生成します
      Object[] data1 = new Object[]{
        new Integer(i),
        new String("data1:"+i)
      };
      Object[] data2 = new Object[]{
        new Integer(i),
        new String("data2:"+i)
      };
  
      //オブジェクトをタプルに設定します
      StreamTuple tuple1 = new StreamTuple(data1);
      StreamTuple tuple2 = new StreamTuple(data2);
      //〜【3】
  
      //【4】
      //タプルを送信します
      streamInput1.put(tuple1);
      streamInput2.put(tuple2);
    }
    //データ送信完了を通知します
    streamInput1.putEnd();
    streamInput2.putEnd();
    //〜【4】
    } catch (SDPClientQueryGroupStopException sce) {
      //クエリグループが停止しています(sdpcqlstopコマンド)
      System.err.println(sce.getMessage());
    } catch (SDPClientQueryGroupHoldException sce) {
      //クエリグループが閉塞しています
      System.err.println(sce.getMessage());
    } catch (SDPClientException sce) {
      System.err.println(sce.getMessage());
  
    //【5】
    } finally {
        if (!connector.isClosed()) {
          try {
            //入力ストリームと接続を閉じます
            streamInput1.close();
            streamInput2.close();
            // SDPサーバと接続を閉じます
            connector.close();
          } catch (SDPClientException sce2) {
            System.err.println(sce2.getMessage());
          }
        }
      }
    }
    //〜【5】
  }
 

ソースコードの内容について説明します。

  1. SDPConnectorFactory.connectメソッドでSDPサーバに接続して,SDPConnector型オブジェクトを取得します。
  2. SDPConnector型オブジェクトを使用して,openStreamInputメソッドを呼び出します。このメソッドによって,送信先ストリームの入力ストリームに接続して,StreamInput型オブジェクトを取得します。
  3. 送信データをオブジェクトとして生成して,タプルに設定します。
  4. StreamInput型オブジェクトを使用してputメソッドを呼び出し,2.で接続した入力ストリームに対してデータ(タプル)を送信します。すべてのデータの送信が完了したあと,putEndメソッドで送信完了を通知します。
  5. StreamInput型オブジェクトを使用してcloseメソッドを呼び出して,入力ストリームとの接続を切断します。その後,SDPConnector型オブジェクトを使用してcloseメソッドを呼び出して,SDPConnectorを閉じます。