8.2.3 Content of the RMI connection data transmission application

This subsection explains the content of the RMI connection data transmission application (RMI_SendTupleTest.java) compiled in step 4 under 8.2.1 Procedure for executing the sample program for an RMI connection custom adaptor.

This application sends streams s1 and s2 to the SDP server. The StreamInput interface is used for sending data.

The source code is described below. Comments indicated by //[1], //...[1] and the like correspond to the numbers in the explanation provided below. Note that these numbered comments are not in the actual sample program. Note also that some comments in the code below may differ from the comments in the actual sample program.

Sample program content

package samples;

import jp.co.Hitachi.soft.sdp.common.data.StreamTuple;
import jp.co.Hitachi.soft.sdp.common.exception.SDPClientException;
import jp.co.Hitachi.soft.sdp.common.exception.SDPClientQueryGroupHoldException;
import jp.co.Hitachi.soft.sdp.common.exception.SDPClientQueryGroupStopException;
import jp.co.Hitachi.soft.sdp.api.SDPConnector;
import jp.co.Hitachi.soft.sdp.api.SDPConnectorFactory;
import jp.co.Hitachi.soft.sdp.api.StreamInput;

public class RMI_SendTupleTest{
 //Specify the name of the stream to be sent.
 private static final String TARGET_STREAM1 = "s1";
 private static final String TARGET_STREAM2 = "s2";
 private static final String TARGET_QUERYGROUP = "RMI_QueryGroupTest";

 private static SDPConnector connector;
 private static StreamInput streamInput1;
 private static StreamInput streamInput2;
 private static long interval=1000;

 //[1]
 public static void main(String[] args) {
   try {
     //Connect to the SDP server.
     connector = SDPConnectorFactory.connect();
     execute();
   } catch (SDPClientException sce) {
     System.err.println(sce.getMessage());
   }
 }
 //...[1]

 //Perform data transmission.
 public static void execute() {
   try {
     //[2]
     //Connect to the stream to be sent.
     streamInput1 = connector.openStreamInput(TARGET_QUERYGROUP, TARGET_STREAM1);
     streamInput2 = connector.openStreamInput(TARGET_QUERYGROUP, TARGET_STREAM2);
     //...[2]

     //Send data.
     for(int i=0; i<10; i++){
       try{
         Thread.sleep(interval);
       } catch (InterruptedException e) {
         e.printStackTrace();
       }

     //[3]
     //Generate an object to be specified for the tuple.
     Object[] data1 = new Object[]{
       new Integer(i),
       new String("data1:"+i)
     };
     Object[] data2 = new Object[]{
       new Integer(i),
       new String("data2:"+i)
     };

     //Set the object in the tuple.
     StreamTuple tuple1 = new StreamTuple(data1);
     StreamTuple tuple2 = new StreamTuple(data2);
     //...[3]

     //[4]
     //Send the tuple.
     streamInput1.put(tuple1);
     streamInput2.put(tuple2);
   }
   //Send a data transmission completion notification.
   streamInput1.putEnd();
   streamInput2.putEnd();
   //...[4]
   } catch (SDPClientQueryGroupStopException sce) {
     //The query group is stopped (sdpcqlstop command)
     System.err.println(sce.getMessage());
   } catch (SDPClientQueryGroupHoldException sce) {
     //The query group is held.
     System.err.println(sce.getMessage());
   } catch (SDPClientException sce) {
     System.err.println(sce.getMessage());

   //[5]
   } finally {
       if (!connector.isClosed()) {
         try {
           //Close the connection to the input stream.
           streamInput1.close();
           streamInput2.close();
           //Close the connection to the SDP server.
           connector.close();
         } catch (SDPClientException sce2) {
           System.err.println(sce2.getMessage());
         }
       }
     }
   }
   //...[5]
 }

The content of the source code is explained as follows:

  1. The SDPConnectorFactory.connect method is used to connect to the SDP server and acquire an SDPConnector type object.
  2. The SDPConnector type object is used to call the openStreamInput method. This method connects to the input stream that is being transmitted and acquires a StreamInput type object.
  3. An object is generated from the transmission data and then set to a tuple.
  4. The StreamInput type object is used to call the put method and sends the data (tuples) to the input stream that you connected to in step 2. After all of the data has been transmitted, the putEnd method is used to send a transmission completion notification.
  5. The StreamInput type object is used to call the close method and disconnect from the input stream. Afterwards, the SDPConnector type object is used to call the close method and close SDPConnector.