This subsection explains the content of the in-process connection data reception application (Inprocess_Receiver.java) compiled in step 4 under 8.3.1 Procedure for executing the sample program for an in-process connection custom adaptor. In this program, the Inprocess_Receiver thread polls the SDP server at 100-millisecond intervals to acquire tuples. This thread is started by the Inprocess_Main class, which is a transmission/reception control program.
The source code is described below. Comments indicated by //[1], //...[1] and the like correspond to the numbers in the explanation provided below. Note that these comments are not written in the actual sample program.
Sample program content
package samples;
import jp.co.Hitachi.soft.sdp.common.data.StreamTuple;
import jp.co.Hitachi.soft.sdp.common.exception.SDPClientException;
import jp.co.Hitachi.soft.sdp.common.exception.SDPClientQueryGroupHoldException;
import jp.co.Hitachi.soft.sdp.common.exception.SDPClientQueryGroupStopException;
import jp.co.Hitachi.soft.sdp.common.exception.SDPClientEndOfStreamException;
import jp.co.Hitachi.soft.sdp.common.util.StreamTime;
import jp.co.Hitachi.soft.sdp.api.SDPConnector;
import jp.co.Hitachi.soft.sdp.api.StreamOutput;
public class Inprocess_Receiver extends Thread {
//Connector to the SDP server
private SDPConnector connector;
//[2]
// Thread's startup status
private volatile boolean running = true;
//...[2]
// Generate a reception thread.
public Inprocess_Receiver(SDPConnector c) {
this.connector = c;
}
public void run() {
final String groupName = "Inprocess_QueryGroupTest";
final String streamName = "FILTER1";
// Result stream object
StreamOutput so = null;
// Connect to the result stream.
try {
so = connector.openStreamOutput(groupName,streamName);
} catch (SDPClientException sce) {
System.err.println("Receiver : " + sce.getMessage());
running = false;
}
//[1]
// Start receiving tuples.
while(running){
//...[1]
try {
Thread.sleep(100);
} catch (InterruptedException e) {
System.err.println("Receiver : Thread is interrupted");
e.printStackTrace();
}
// Tuple object
StreamTuple tuple = null;
// Acquire tuples based on polling.
try {
tuple = so.get();
} catch (SDPClientEndOfStreamException see) {
//Transmission stream data has ended.
System.out.println("Receiver : " + see.getMessage());
break;
} catch (SDPClientQueryGroupStopException sce) {
//The query group is stopped. (sdpcqlstop command)
System.err.println("Receiver : " + sce.getMessage());
break;
} catch (SDPClientQueryGroupHoldException sce) {
//The query group is held.
System.err.println("Receiver : " + sce.getMessage());
break;
} catch (SDPClientException sce) {
System.err.println("Receiver : " + sce.getMessage());
break;
}
//Display the received tuples.
if (tuple != null) {
Object[] data = tuple.getDataArray();
String val = (String) data[0];
Long id = (Long) data[1];
StreamTime time = tuple.getSystemTime();
System.out.println("Receiver : Tuple Get on " + streamName
+" [ VAL="+val+", ID="+id+", TIME="+time.toString()+" ]");
}
}
try {
if (so != null) {
// Close the result stream.
so.close();
}
} catch (SDPClientException sce) {
System.err.println("Receiver : " + sce.getMessage());
}
}
//[2]
public void terminate() {
System.out.println"Receiver : terminate called");
this.running = false;
}
//...[2]
}
The content of the source code is explained as follows: