8.2.4 Content of the RMI connection data reception application

This subsection explains the content of the RMI connection data reception application (RMI_ReceiveTupleTest.java) compiled in step 4 under 8.2.1 Procedure for executing the sample program for an RMI connection custom adaptor.

This application receives the result of joining streams s1 and s2. The StreamOutput class is used for receiving the data.

The source code is described below. Comments indicated by //[1], //...[1] and the like correspond to the numbers in the explanation provided below. Note that these numbered comments are not in the actual sample program. Note also that some comments in the code below may differ from the comments in the actual sample program.

Sample program content

package samples;

import java.util.Iterator;
import java.util.List;

import jp.co.Hitachi.soft.sdp.api.SDPConnectorFactory;
import jp.co.Hitachi.soft.sdp.api.SDPConnector;
import jp.co.Hitachi.soft.sdp.api.StreamOutput;
import jp.co.Hitachi.soft.sdp.common.data.StreamTuple;
import jp.co.Hitachi.soft.sdp.common.exception.SDPClientCommunicationException;
import jp.co.Hitachi.soft.sdp.common.exception.SDPClientException;
import jp.co.Hitachi.soft.sdp.common.exception.SDPClientQueryGroupHoldException;
import jp.co.Hitachi.soft.sdp.common.exception.SDPClientQueryGroupStopException;
import jp.co.Hitachi.soft.sdp.common.exception.SDPClientEndOfStreamException;
import jp.co.Hitachi.soft.sdp.common.util.StreamTime;
/**
* Acquiring and Displaying Stream Data.
*
*/

public class RMI_ReceiveTupleTest {
 //Specify the name of the stream to be acquired.
 private static final String STREAM_NAME = "JOIN";
 private static final String TARGET_QUERYGROUP = "RMI_QueryGroupTest";

 private static SDPConnector connector;
 private StreamOutput streamOutput;
 private boolean run=true;
 private long interval=10;
 public static void main(String[] args) {

   //[1]
   try {
     //Connect to the SDP server.
     connector = SDPConnectorFactory.connect();

     //Begin the receiving process.
     RMI_ReceiveTupleTest receiver = new RMI_ReceiveTupleTest();
     receiver.execute();
   } catch (SDPClientException sce) {
     System.err.println(sce.getMessage());
   }
   //...[1]
 }

 //Perform the receiving process.
 public void execute() {
   try {
     //[2]
     //Connect to the result stream.
     streamOutput = connector.openStreamOutput(TARGET_QUERYGROUP, STREAM_NAME);
     //...[2]

     while (run) {
       try {
         Thread.sleep(interval);
     } catch (InterruptedException e) {
         e.printStackTrace();
     }

     //[3]
     //Receive tuples.
     List<StreamTuple> tupleList = streamOutput.getAll();
     if (tupleList.isEmpty()) continue;
     //...[3]

     for (Iterator iterator = tupleList.iterator(); iterator.hasNext();) {
     StreamTuple tuple = (StreamTuple) iterator.next();

     //[4]
     //Display the received tuples.
     Object[] data = tuple.getDataArray();
     Integer id1 = (Integer)data[0];
     String val1 = (String)data[1];
     String val2 = (String)data[2];
     StreamTime time = tuple.getSystemTime();
     System.out.println("Get Data: ID1="+id1+
       ", VAL1="+val1+
       ", VAL2="+val2+
       ", TIME="+time.toString());
     //...[4]
     }
   }
 } catch (SDPClientEndOfStreamException see) {
   //The transmission stream data has ended.
   System.out.println(see.getMessage());
 } catch (SDPClientQueryGroupStopException sce) {
   //The query group is stopped. (sdpcqlstop command)
   System.err.println(sce.getMessage());
 } catch (SDPClientQueryGroupHoldException sce) {
   //The query group is held.
   System.err.println(sce.getMessage());
 } catch (SDPClientCommunicationException sce) {
   System.err.println(sce.getMessage());
 } catch (SDPClientException sce) {
   System.err.println(sce.getMessage());
 } finally {
     if (!connector.isClosed()) {
       //[5]
       try {
         //Close the connection to the output stream.
         streamOutput.close();
         //Close the connection to the SDP server.
         connector.close();
       } catch (SDPClientException sce) {
         System.err.println(sce.getMessage());
       }
       //...[5]
     }
   }
 }
}

The content of the source code is explained as follows:

  1. The SDPConnectorFactory.connect method is used to connect to the SDP server and acquire an SDPConnector type object.
  2. The SDPConnector type object is used to call the openStreamOutput method. This method connects to the output stream that is receiving data and acquires a StreamOutput type object.
  3. The StreamOutput type object is used to call the getAll method and receive all of the tuples in the StreamTuple type object from the output stream that you connected to in step 2.
  4. The StreamTuple type object is used to call the getDataArray method and acquires the tuple's object array. In the sample program, the tuple data as retrieved is in the Integer and String types, and it is output to standard output.
  5. The close method of the StreamOutput type object is used to disconnect from the output stream. Then, the close method of the SDPConnector type object is used to close SDPConnector.