This subsection explains the content of the RMI connection data transmission application (RMI_SendTupleTest.java) compiled in step 4 under 8.2.1 Procedure for executing the sample program for an RMI connection custom adaptor.
This application sends streams s1 and s2 to the SDP server. The StreamInput interface is used for sending data.
The source code is described below. Comments indicated by //[1], //...[1] and the like correspond to the numbers in the explanation provided below. Note that these numbered comments are not in the actual sample program. Note also that some comments in the code below may differ from the comments in the actual sample program.
Sample program content
package samples;
import jp.co.Hitachi.soft.sdp.common.data.StreamTuple;
import jp.co.Hitachi.soft.sdp.common.exception.SDPClientException;
import jp.co.Hitachi.soft.sdp.common.exception.SDPClientQueryGroupHoldException;
import jp.co.Hitachi.soft.sdp.common.exception.SDPClientQueryGroupStopException;
import jp.co.Hitachi.soft.sdp.api.SDPConnector;
import jp.co.Hitachi.soft.sdp.api.SDPConnectorFactory;
import jp.co.Hitachi.soft.sdp.api.StreamInput;
public class RMI_SendTupleTest{
//Specify the name of the stream to be sent.
private static final String TARGET_STREAM1 = "s1";
private static final String TARGET_STREAM2 = "s2";
private static final String TARGET_QUERYGROUP = "RMI_QueryGroupTest";
private static SDPConnector connector;
private static StreamInput streamInput1;
private static StreamInput streamInput2;
private static long interval=1000;
//[1]
public static void main(String[] args) {
try {
//Connect to the SDP server.
connector = SDPConnectorFactory.connect();
execute();
} catch (SDPClientException sce) {
System.err.println(sce.getMessage());
}
}
//...[1]
//Perform data transmission.
public static void execute() {
try {
//[2]
//Connect to the stream to be sent.
streamInput1 = connector.openStreamInput(TARGET_QUERYGROUP, TARGET_STREAM1);
streamInput2 = connector.openStreamInput(TARGET_QUERYGROUP, TARGET_STREAM2);
//...[2]
//Send data.
for(int i=0; i<10; i++){
try{
Thread.sleep(interval);
} catch (InterruptedException e) {
e.printStackTrace();
}
//[3]
//Generate an object to be specified for the tuple.
Object[] data1 = new Object[]{
new Integer(i),
new String("data1:"+i)
};
Object[] data2 = new Object[]{
new Integer(i),
new String("data2:"+i)
};
//Set the object in the tuple.
StreamTuple tuple1 = new StreamTuple(data1);
StreamTuple tuple2 = new StreamTuple(data2);
//...[3]
//[4]
//Send the tuple.
streamInput1.put(tuple1);
streamInput2.put(tuple2);
}
//Send a data transmission completion notification.
streamInput1.putEnd();
streamInput2.putEnd();
//...[4]
} catch (SDPClientQueryGroupStopException sce) {
//The query group is stopped (sdpcqlstop command)
System.err.println(sce.getMessage());
} catch (SDPClientQueryGroupHoldException sce) {
//The query group is held.
System.err.println(sce.getMessage());
} catch (SDPClientException sce) {
System.err.println(sce.getMessage());
//[5]
} finally {
if (!connector.isClosed()) {
try {
//Close the connection to the input stream.
streamInput1.close();
streamInput2.close();
//Close the connection to the SDP server.
connector.close();
} catch (SDPClientException sce2) {
System.err.println(sce2.getMessage());
}
}
}
}
//...[5]
}
The content of the source code is explained as follows: