This subsection explains the content of the RMI connection data reception application (RMI_ReceiveTupleTest.java) compiled in step 4 under 8.2.1 Procedure for executing the sample program for an RMI connection custom adaptor.
This application receives the result of joining streams s1 and s2. The StreamOutput class is used for receiving the data.
The source code is described below. Comments indicated by //[1], //...[1] and the like correspond to the numbers in the explanation provided below. Note that these numbered comments are not in the actual sample program. Note also that some comments in the code below may differ from the comments in the actual sample program.
Sample program content
package samples;
import java.util.Iterator;
import java.util.List;
import jp.co.Hitachi.soft.sdp.api.SDPConnectorFactory;
import jp.co.Hitachi.soft.sdp.api.SDPConnector;
import jp.co.Hitachi.soft.sdp.api.StreamOutput;
import jp.co.Hitachi.soft.sdp.common.data.StreamTuple;
import jp.co.Hitachi.soft.sdp.common.exception.SDPClientCommunicationException;
import jp.co.Hitachi.soft.sdp.common.exception.SDPClientException;
import jp.co.Hitachi.soft.sdp.common.exception.SDPClientQueryGroupHoldException;
import jp.co.Hitachi.soft.sdp.common.exception.SDPClientQueryGroupStopException;
import jp.co.Hitachi.soft.sdp.common.exception.SDPClientEndOfStreamException;
import jp.co.Hitachi.soft.sdp.common.util.StreamTime;
/**
* Acquiring and Displaying Stream Data.
*
*/
public class RMI_ReceiveTupleTest {
//Specify the name of the stream to be acquired.
private static final String STREAM_NAME = "JOIN";
private static final String TARGET_QUERYGROUP = "RMI_QueryGroupTest";
private static SDPConnector connector;
private StreamOutput streamOutput;
private boolean run=true;
private long interval=10;
public static void main(String[] args) {
//[1]
try {
//Connect to the SDP server.
connector = SDPConnectorFactory.connect();
//Begin the receiving process.
RMI_ReceiveTupleTest receiver = new RMI_ReceiveTupleTest();
receiver.execute();
} catch (SDPClientException sce) {
System.err.println(sce.getMessage());
}
//...[1]
}
//Perform the receiving process.
public void execute() {
try {
//[2]
//Connect to the result stream.
streamOutput = connector.openStreamOutput(TARGET_QUERYGROUP, STREAM_NAME);
//...[2]
while (run) {
try {
Thread.sleep(interval);
} catch (InterruptedException e) {
e.printStackTrace();
}
//[3]
//Receive tuples.
List<StreamTuple> tupleList = streamOutput.getAll();
if (tupleList.isEmpty()) continue;
//...[3]
for (Iterator iterator = tupleList.iterator(); iterator.hasNext();) {
StreamTuple tuple = (StreamTuple) iterator.next();
//[4]
//Display the received tuples.
Object[] data = tuple.getDataArray();
Integer id1 = (Integer)data[0];
String val1 = (String)data[1];
String val2 = (String)data[2];
StreamTime time = tuple.getSystemTime();
System.out.println("Get Data: ID1="+id1+
", VAL1="+val1+
", VAL2="+val2+
", TIME="+time.toString());
//...[4]
}
}
} catch (SDPClientEndOfStreamException see) {
//The transmission stream data has ended.
System.out.println(see.getMessage());
} catch (SDPClientQueryGroupStopException sce) {
//The query group is stopped. (sdpcqlstop command)
System.err.println(sce.getMessage());
} catch (SDPClientQueryGroupHoldException sce) {
//The query group is held.
System.err.println(sce.getMessage());
} catch (SDPClientCommunicationException sce) {
System.err.println(sce.getMessage());
} catch (SDPClientException sce) {
System.err.println(sce.getMessage());
} finally {
if (!connector.isClosed()) {
//[5]
try {
//Close the connection to the output stream.
streamOutput.close();
//Close the connection to the SDP server.
connector.close();
} catch (SDPClientException sce) {
System.err.println(sce.getMessage());
}
//...[5]
}
}
}
}
The content of the source code is explained as follows: