8.3.5 Content of the in-process connection data reception application (polling method)

This subsection explains the content of the in-process connection data reception application (Inprocess_Receiver.java) compiled in step 4 under 8.3.1 Procedure for executing the sample program for an in-process connection custom adaptor. In this program, the Inprocess_Receiver thread polls the SDP server at 100-millisecond intervals to acquire tuples. This thread is started by the Inprocess_Main class, which is a transmission/reception control program.

The source code is described below. Comments indicated by //[1], //...[1] and the like correspond to the numbers in the explanation provided below. Note that these comments are not written in the actual sample program.

Sample program content

package samples;

import jp.co.Hitachi.soft.sdp.common.data.StreamTuple;
import jp.co.Hitachi.soft.sdp.common.exception.SDPClientException;
import jp.co.Hitachi.soft.sdp.common.exception.SDPClientQueryGroupHoldException;
import jp.co.Hitachi.soft.sdp.common.exception.SDPClientQueryGroupStopException;
import jp.co.Hitachi.soft.sdp.common.exception.SDPClientEndOfStreamException;
import jp.co.Hitachi.soft.sdp.common.util.StreamTime;
import jp.co.Hitachi.soft.sdp.api.SDPConnector;
import jp.co.Hitachi.soft.sdp.api.StreamOutput;

public class Inprocess_Receiver extends Thread {

 //Connector to the SDP server
 private SDPConnector connector;

 //[2]
 // Thread's startup status
 private volatile boolean running = true;
 //...[2]

 // Generate a reception thread.
 public Inprocess_Receiver(SDPConnector c) {
   this.connector = c;
 }

 public void run() {

   final String groupName = "Inprocess_QueryGroupTest";
   final String streamName = "FILTER1";

   // Result stream object
   StreamOutput so = null;

   // Connect to the result stream.
   try {
     so = connector.openStreamOutput(groupName,streamName);
   } catch (SDPClientException sce) {
     System.err.println("Receiver : " + sce.getMessage());
     running = false;
   }

   //[1]
   // Start receiving tuples.
   while(running){
   //...[1]
     try {
       Thread.sleep(100);
     } catch (InterruptedException e) {
       System.err.println("Receiver : Thread is interrupted");
       e.printStackTrace();
     }

     // Tuple object
     StreamTuple tuple = null;

     // Acquire tuples based on polling.
     try {
       tuple = so.get();
     } catch (SDPClientEndOfStreamException see) {
       //Transmission stream data has ended.
       System.out.println("Receiver : " + see.getMessage());
       break;
     } catch (SDPClientQueryGroupStopException sce) {
       //The query group is stopped. (sdpcqlstop command)
       System.err.println("Receiver : " + sce.getMessage());
       break;
     } catch (SDPClientQueryGroupHoldException sce) {
       //The query group is held.
       System.err.println("Receiver : " + sce.getMessage());
       break;
     } catch (SDPClientException sce) {
       System.err.println("Receiver : " + sce.getMessage());
       break;
     }

     //Display the received tuples.
     if (tuple != null) {
       Object[] data = tuple.getDataArray();
       String val = (String) data[0];
       Long id = (Long) data[1];
       StreamTime time = tuple.getSystemTime();
       System.out.println("Receiver : Tuple Get on " + streamName
         +" [ VAL="+val+", ID="+id+", TIME="+time.toString()+" ]");
     }
   }

   try {
     if (so != null) {
       // Close the result stream.
       so.close();
     }
   } catch (SDPClientException sce) {
     System.err.println("Receiver : " + sce.getMessage());
   }

 }


 //[2]
 public void terminate() {
     System.out.println"Receiver : terminate called");
   this.running = false;
 }
 //...[2]
}

The content of the source code is explained as follows:

  1. The data reception process is executed until a termination condition is specified by the terminate method in step 2.
  2. The process for stopping the reception of tuples, that is, the termination condition for the Inprocess_Receiver thread, is described in the terminate method.
    Since the terminate method is called from the SDP server thread, the running field of the Inprocess_Receiver class may be read from two threads at the same time in some cases. Therefore, the volatile attribute must be specified for the running field.