ストリームデータ処理基盤 uCosminexus Stream Data Platform - Application Framework アプリケーション開発ガイド

[目次][索引][前へ][次へ]

9.3.5 インプロセス連携データ受信APの内容(ポーリング方式)

ここでは,「9.3.1 インプロセス連携カスタムアダプターのサンプルプログラムの実行手順」の手順4.でコンパイルしたインプロセス連携データ受信AP(Inprocess_Receiver.java)の内容について説明します。このプログラムでは,Inprocess_Receiverスレッドが100ミリ秒間隔でSDPサーバに対してポーリング方式でタプル取得処理を実行します。このスレッドは,送受信制御プログラムであるInprocess_Mainクラスによって起動されます。

ソースコードを次に示します。なお,「//【1】」「//〜【1】」などのコメントは,以降の説明の番号と対応しています(これらのコメントは,実際のサンプルプログラムには記載されていません)。

サンプルプログラムの内容

 
package samples;
 
import jp.co.Hitachi.soft.sdp.common.data.StreamTuple;
import jp.co.Hitachi.soft.sdp.common.exception.SDPClientException;
import jp.co.Hitachi.soft.sdp.common.exception.SDPClientQueryGroupHoldException;
import jp.co.Hitachi.soft.sdp.common.exception.SDPClientQueryGroupStopException;
import jp.co.Hitachi.soft.sdp.common.exception.SDPClientEndOfStreamException;
import jp.co.Hitachi.soft.sdp.common.util.StreamTime;
import jp.co.Hitachi.soft.sdp.api.SDPConnector;
import jp.co.Hitachi.soft.sdp.api.StreamOutput;
 
public class Inprocess_Receiver extends Thread {
 
  // SDPサーバとのコネクター
  private SDPConnector connector;
  
  //【2】
  // スレッドの起動状態
  private volatile boolean running = true;
  //〜【2】
  
  // 受信スレッドの生成処理
  public Inprocess_Receiver(SDPConnector c) {
    this.connector = c;
  }
  
  public void run() {
  
    final String groupName = "Inprocess_QueryGroupTest";
    final String streamName = "FILTER1";
  
    // 結果ストリームオブジェクト
    StreamOutput so = null;
  
    // 結果ストリームに接続します
    try {
      so = connector.openStreamOutput(groupName,streamName);
    } catch (SDPClientException sce) {
      System.err.println("Receiver : " + sce.getMessage());
      running = false; 
    }
  
    //【1】
    // タプルの受信処理を開始します
    while(running){
    //〜【1】
      try {
        Thread.sleep(100);
      } catch (InterruptedException e) {
        System.err.println("Receiver : Thread is interrupted");
        e.printStackTrace();
      }
  
      // タプルオブジェクト
      StreamTuple tuple = null;
  
      // タプルをポーリングで取得します
      try {
        tuple = so.get();
      } catch (SDPClientEndOfStreamException see) {
        //送信ストリームデータが終了しました
        System.out.println("Receiver : " + see.getMessage());
        break;
      } catch (SDPClientQueryGroupStopException sce) {
        //クエリグループが停止しています(sdpcqlstopコマンド)
        System.err.println("Receiver : " + sce.getMessage());
        break;
      } catch (SDPClientQueryGroupHoldException sce) {
        //クエリグループが閉塞しています
        System.err.println("Receiver : " + sce.getMessage());
        break;
      } catch (SDPClientException sce) {
        System.err.println("Receiver : " + sce.getMessage());
        break;
      }
  
      //受信したタプルを表示します
      if (tuple != null) {
        Object[] data = tuple.getDataArray();
        String val = (String) data[0];
        Long id = (Long) data[1];
        StreamTime time = tuple.getSystemTime();
        System.out.println("Receiver : Tuple Get on " + streamName
          +" [ VAL="+val+", ID="+id+", TIME="+time.toString()+" ]");
      }
    }
  
    try {
      if (so != null) {
        // 結果ストリームを閉じます
        so.close();
      }
    } catch (SDPClientException sce) {
      System.err.println("Receiver : " + sce.getMessage());
    }
  
  }
  
  
  //【2】
  public void terminate() {
      System.out.println"Receiver : terminate called");
    this.running = false;
  }
  //〜【2】
}
 

ソースコードの内容について説明します。

  1. データ受信処理は,2.のterminateメソッドによって終了条件が設定されるまで,実行されます。
  2. terminateメソッドには,タプルの受信を停止させる処理,つまりInprocess_Receiverスレッドの終了条件を記述しています。
    terminateメソッドはSDPサーバのスレッドから呼び出されるため,Inprocess_Receiverクラスのrunningフィールドは二つのスレッドから同時に読み取られる場合があります。このため,runningフィールドは,volatile属性にする必要があります。