8.3.4 Content of the in-process connection data transmission application

This subsection explains the content of the in-process connection data transmission application (Inprocess_Sender.java) complied in step 4 under 8.3.1 Procedure for executing the sample program for an in-process connection custom adaptor.

Reference note
The sample program shows an example in which a thread class specifically dedicated for transmission is defined, and the client application's main class (the Inprocess_Main class in which the StreamInprocessUP interface is implemented) generates this thread to perform the transmission. A different method would be for the Inprocess_Main class itself to transmit tuples to the SDP server.

The source code is described below. Comments indicated by //[1], //...[1] and the like correspond to the numbers in the explanation provided below. Note that these comments are not written in the actual sample program.

Sample program content

package samples;

import java.util.ArrayList;

import jp.co.Hitachi.soft.sdp.common.data.StreamTuple;
import jp.co.Hitachi.soft.sdp.common.exception.SDPClientException;
import jp.co.Hitachi.soft.sdp.common.exception.SDPClientQueryGroupHoldException;
import jp.co.Hitachi.soft.sdp.common.exception.SDPClientQueryGroupStopException;
import jp.co.Hitachi.soft.sdp.api.SDPConnector;
import jp.co.Hitachi.soft.sdp.api.StreamInput;

public class Inprocess_Sender extends Thread{

 //[2]
 // Thread's startup status
 private volatile boolean running = true;
 //...[2]

 //Connector to the SDP server
 private SDPConnector connector;

 // Generate a transmission thread.
 public Inprocess_Sender(SDPConnector c) {
   this.connector = c;
 }

 ArrayList<StreamTuple> list = new ArrayList<StreamTuple>();

 public void run()  {

   final String group_name = "Inprocess_QueryGroupTest";
   final String stream_name = "DATA0";

   // Transmission stream object
   StreamInput si = null;

   // Connect to the transmission stream.
   try {
     si = connector.openStreamInput(group_name,stream_name);
   } catch (SDPClientException sce) {
     System.err.println("Sender   :" + sce.getMessage());
     running = false;
   }

   for(int i = 0; i < 50;i++){
     if(!running) {
       break;
     }

     // Generate the data to be transmitted.
     Object[] data = new Object[]{
       new String("ad"+i),
       new Long(i)
     };

     // Generate a tuple object.
     StreamTuple tuple = new StreamTuple(data);

     try {
       Thread.sleep(100);
     } catch (InterruptedException e) {
       System.err.println("Sender   : Thread is interrupted");
       e.printStackTrace();
     }

     // Send a single tuple.
     try {
       si.put(tuple);
     } catch (SDPClientQueryGroupStopException sce) {
       //The query group is stopped. (sdpcqlstop command)
       //No action takes place and the processing continues.
     } catch (SDPClientQueryGroupHoldException sce) {
       //The query group is held.
       //No action takes place and the processing continues.
     } catch (SDPClientException sce) {
       System.err.println("Sender   :" + sce.getMessage());
       running = false;
     }
   }

   //[1]
   if(running) {
     try {
       si.putEnd();
   //...[1]
     } catch (SDPClientQueryGroupStopException sce) {
       //The query group is stopped. (sdpcqlstop command)
       //No action takes place and the processing continues.
     } catch (SDPClientQueryGroupHoldException sce) {
       //The query group is held.
       //No action takes place and the processing continues.
     } catch (SDPClientException sce) {
       System.err.println("Sender   :" + sce.getMessage());
     }
   }

   try {
     if (si != null) {
       // Close the transmission stream.
       si.close();
     }
   } catch (SDPClientException sce) {
     System.err.println("Sender   :" + sce.getMessage());
   }

 }
 //[2]
 public void terminate() {
   System.out.println("Sender   : terminate called");
   this.running = false;
 //...[2]
 }
}

The content of the source code is explained as follows:

  1. The data transmission method is the same as for an RMI connection. However, a data transmission completion notification is sent if the termination condition is not specified by the terminate method.
  2. The process for stopping the transmission of tuples, that is, the termination condition for the Inprocess_Sender thread, is described in the terminate method.
    Since the terminate method is called from the SDP server thread, the running field of the Inprocess_Receiver class may be read from two threads at the same time in some cases. Therefore, the volatile attribute must be specified for the running field.