uCosminexus Stream Data Platform - Application Framework Application Development Guide

[Contents][Index][Back][Next]

8.3.5 Content of the in-process connection data reception application (polling method)

This subsection explains the content of the in-process connection data reception application (Inprocess_Receiver.java) compiled in step 4 under 8.3.1 Procedure for executing the sample program for an in-process connection custom adaptor. In this program, the Inprocess_Receiver thread polls the SDP server at 100-millisecond intervals to acquire tuples. This thread is started by the Inprocess_Main class, which is a transmission/reception control program.

The source code is described below. Comments indicated by //[1], //...[1] and the like correspond to the numbers in the explanation provided below. Note that these comments are not written in the actual sample program.

Sample program content

 
package samples;
 
import jp.co.Hitachi.soft.sdp.common.data.StreamTuple;
import jp.co.Hitachi.soft.sdp.common.exception.SDPClientException;
import jp.co.Hitachi.soft.sdp.common.exception.SDPClientQueryGroupHoldException;
import jp.co.Hitachi.soft.sdp.common.exception.SDPClientQueryGroupStopException;
import jp.co.Hitachi.soft.sdp.common.exception.SDPClientEndOfStreamException;
import jp.co.Hitachi.soft.sdp.common.util.StreamTime;
import jp.co.Hitachi.soft.sdp.api.SDPConnector;
import jp.co.Hitachi.soft.sdp.api.StreamOutput;
 
public class Inprocess_Receiver extends Thread {
 
  //Connector to the SDP server
  private SDPConnector connector;
 
  //[2]
  // Thread's startup status
  private volatile boolean running = true;
  //...[2]
 
  // Generate a reception thread.
  public Inprocess_Receiver(SDPConnector c) {
    this.connector = c;
  }
 
  public void run() {
 
    final String groupName = "Inprocess_QueryGroupTest";
    final String streamName = "FILTER1";
 
    // Result stream object
    StreamOutput so = null;
 
    // Connect to the result stream.
    try {
      so = connector.openStreamOutput(groupName,streamName);
    } catch (SDPClientException sce) {
      System.err.println("Receiver : " + sce.getMessage());
      running = false;
    }
 
    //[1]
    // Start receiving tuples.
    while(running){
    //...[1]
      try {
        Thread.sleep(100);
      } catch (InterruptedException e) {
        System.err.println("Receiver : Thread is interrupted");
        e.printStackTrace();
      }
 
      // Tuple object
      StreamTuple tuple = null;
 
      // Acquire tuples based on polling.
      try {
        tuple = so.get();
      } catch (SDPClientEndOfStreamException see) {
        //Transmission stream data has ended.
        System.out.println("Receiver : " + see.getMessage());
        break;
      } catch (SDPClientQueryGroupStopException sce) {
        //The query group is stopped. (sdpcqlstop command)
        System.err.println("Receiver : " + sce.getMessage());
        break;
      } catch (SDPClientQueryGroupHoldException sce) {
        //The query group is held.
        System.err.println("Receiver : " + sce.getMessage());
        break;
      } catch (SDPClientException sce) {
        System.err.println("Receiver : " + sce.getMessage());
        break;
      }
 
      //Display the received tuples.
      if (tuple != null) {
        Object[] data = tuple.getDataArray();
        String val = (String) data[0];
        Long id = (Long) data[1];
        StreamTime time = tuple.getSystemTime();
        System.out.println("Receiver : Tuple Get on " + streamName
          +" [ VAL="+val+", ID="+id+", TIME="+time.toString()+" ]");
      }
    }
 
    try {
      if (so != null) {
        // Close the result stream.
        so.close();
      }
    } catch (SDPClientException sce) {
      System.err.println("Receiver : " + sce.getMessage());
    }
 
  }
 
 
  //[2]
  public void terminate() {
      System.out.println"Receiver : terminate called");
    this.running = false;
  }
  //...[2]
}
 

The content of the source code is explained as follows:

  1. The data reception process is executed until a termination condition is specified by the terminate method in step 2.
  2. The process for stopping the reception of tuples, that is, the termination condition for the Inprocess_Receiver thread, is described in the terminate method.
    Since the terminate method is called from the SDP server thread, the running field of the Inprocess_Receiver class may be read from two threads at the same time in some cases. Therefore, the volatile attribute must be specified for the running field.