ストリームデータ処理基盤 uCosminexus Stream Data Platform - Application Framework アプリケーション開発ガイド

[目次][索引][前へ][次へ]

9.3.4 インプロセス連携データ送信APの内容

ここでは,「9.3.1 インプロセス連携カスタムアダプターのサンプルプログラムの実行手順」の手順4.でコンパイルしたインプロセス連携データ送信AP(Inprocess_Sender.java)の内容について説明します。

参考
このサンプルプログラムでは,送信専用のスレッドクラスを別に定義して,クライアントAPのメインクラス(StreamInprocessUPインタフェースを実装したInprocess_Mainクラス)側からそのスレッドを生成して送信を委譲する例を示しています。これ以外の方法として,Inprocess_Mainクラス自身がSDPサーバにタプルを送信する処理もできます。

ソースコードを次に示します。なお,「//【1】」「//〜【1】」などのコメントは,以降の説明の番号と対応しています(これらのコメントは,実際のサンプルプログラムには記載されていません)。

サンプルプログラムの内容

 
package samples;
  
import java.util.ArrayList;
  
import jp.co.Hitachi.soft.sdp.common.data.StreamTuple;
import jp.co.Hitachi.soft.sdp.common.exception.SDPClientException;
import jp.co.Hitachi.soft.sdp.common.exception.SDPClientQueryGroupHoldException;
import jp.co.Hitachi.soft.sdp.common.exception.SDPClientQueryGroupStopException;
import jp.co.Hitachi.soft.sdp.api.SDPConnector;
import jp.co.Hitachi.soft.sdp.api.StreamInput;
  
public class Inprocess_Sender extends Thread{
  
  //【2】
  // スレッドの起動状態
  private volatile boolean running = true;
  //〜【2】
  
  // SDPサーバとのコネクター
  private SDPConnector connector;
  
  // 送信スレッドの生成処理
  public Inprocess_Sender(SDPConnector c) {
    this.connector = c;
  }
  
  ArrayList<StreamTuple> list = new ArrayList<StreamTuple>();
  
  public void run()  {
  
    final String group_name = "Inprocess_QueryGroupTest";
    final String stream_name = "DATA0"; 
  
    // 送信ストリームオブジェクト
    StreamInput si = null;
  
    // 送信ストリームに接続します
    try {
      si = connector.openStreamInput(group_name,stream_name);
    } catch (SDPClientException sce) {
      System.err.println("Sender   :" + sce.getMessage());
      running = false;
    }
  
    for(int i = 0; i < 50;i++){
      if(!running) {
        break;
      }
  
      // 送信データを生成します
      Object[] data = new Object[]{
        new String("ad"+i),
        new Long(i)
      };
  
      // タプルオブジェクトを生成します
      StreamTuple tuple = new StreamTuple(data);
  
      try {
        Thread.sleep(100);
      } catch (InterruptedException e) {
        System.err.println("Sender   : Thread is interrupted");
        e.printStackTrace();
      }
  
      // 単一のタプルを送信します
      try {
        si.put(tuple);
      } catch (SDPClientQueryGroupStopException sce) {
        //クエリグループが停止しています(sdpcqlstopコマンド)
        //何もせずに処理を続行します
      } catch (SDPClientQueryGroupHoldException sce) {
        //クエリグループが閉塞しています
        //何もせずに処理を続行します
      } catch (SDPClientException sce) {
        System.err.println("Sender   :" + sce.getMessage()); 
        running = false;
      }
    }
 
    //【1】
    if(running) {
      try {
        si.putEnd();
    //〜【1】
      } catch (SDPClientQueryGroupStopException sce) {
        //クエリグループが停止しています(sdpcqlstopコマンド)
        //何もせずに処理を続行します
      } catch (SDPClientQueryGroupHoldException sce) {
        //クエリグループが閉塞しています
        //何もせずに処理を続行します
      } catch (SDPClientException sce) {
        System.err.println("Sender   :" + sce.getMessage());  
      }
    }
 
    try {
      if (si != null) {
        // 送信ストリームを閉じます
        si.close();
      }
    } catch (SDPClientException sce) {
      System.err.println("Sender   :" + sce.getMessage());
    }
  
  }
  //【2】
  public void terminate() {
    System.out.println("Sender   : terminate called");
    this.running = false;
  //〜【2】
  }
}
 

ソースコードの内容について説明します。

  1. データの送信方法は,RMI連携の場合と同様です。ただし,データ送信完了通知は,terminateメソッドによって終了条件が設定されていない場合に実行します。
  2. terminateメソッドにはタプルの送信を停止させる処理,つまりInprocess_Senderスレッドの終了条件を記述しています。
    terminateメソッドは,SDPサーバのスレッドから呼び出されるため,Inprocess_Receiverクラスのrunningフィールドは二つのスレッドから同時に読み取られる場合があります。このため,runningフィールドは,volatile属性にする必要があります。