This subsection explains the content of the in-process connection data transmission application (Inprocess_Sender.java) complied in step 4 under 8.3.1 Procedure for executing the sample program for an in-process connection custom adaptor.
The source code is described below. Comments indicated by //[1], //...[1] and the like correspond to the numbers in the explanation provided below. Note that these comments are not written in the actual sample program.
Sample program content
package samples;
import java.util.ArrayList;
import jp.co.Hitachi.soft.sdp.common.data.StreamTuple;
import jp.co.Hitachi.soft.sdp.common.exception.SDPClientException;
import jp.co.Hitachi.soft.sdp.common.exception.SDPClientQueryGroupHoldException;
import jp.co.Hitachi.soft.sdp.common.exception.SDPClientQueryGroupStopException;
import jp.co.Hitachi.soft.sdp.api.SDPConnector;
import jp.co.Hitachi.soft.sdp.api.StreamInput;
public class Inprocess_Sender extends Thread{
//[2]
// Thread's startup status
private volatile boolean running = true;
//...[2]
//Connector to the SDP server
private SDPConnector connector;
// Generate a transmission thread.
public Inprocess_Sender(SDPConnector c) {
this.connector = c;
}
ArrayList<StreamTuple> list = new ArrayList<StreamTuple>();
public void run() {
final String group_name = "Inprocess_QueryGroupTest";
final String stream_name = "DATA0";
// Transmission stream object
StreamInput si = null;
// Connect to the transmission stream.
try {
si = connector.openStreamInput(group_name,stream_name);
} catch (SDPClientException sce) {
System.err.println("Sender :" + sce.getMessage());
running = false;
}
for(int i = 0; i < 50;i++){
if(!running) {
break;
}
// Generate the data to be transmitted.
Object[] data = new Object[]{
new String("ad"+i),
new Long(i)
};
// Generate a tuple object.
StreamTuple tuple = new StreamTuple(data);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
System.err.println("Sender : Thread is interrupted");
e.printStackTrace();
}
// Send a single tuple.
try {
si.put(tuple);
} catch (SDPClientQueryGroupStopException sce) {
//The query group is stopped. (sdpcqlstop command)
//No action takes place and the processing continues.
} catch (SDPClientQueryGroupHoldException sce) {
//The query group is held.
//No action takes place and the processing continues.
} catch (SDPClientException sce) {
System.err.println("Sender :" + sce.getMessage());
running = false;
}
}
//[1]
if(running) {
try {
si.putEnd();
//...[1]
} catch (SDPClientQueryGroupStopException sce) {
//The query group is stopped. (sdpcqlstop command)
//No action takes place and the processing continues.
} catch (SDPClientQueryGroupHoldException sce) {
//The query group is held.
//No action takes place and the processing continues.
} catch (SDPClientException sce) {
System.err.println("Sender :" + sce.getMessage());
}
}
try {
if (si != null) {
// Close the transmission stream.
si.close();
}
} catch (SDPClientException sce) {
System.err.println("Sender :" + sce.getMessage());
}
}
//[2]
public void terminate() {
System.out.println("Sender : terminate called");
this.running = false;
//...[2]
}
}
The content of the source code is explained as follows: