uCosminexus Stream Data Platform - Application Framework Application Development Guide
Queues are used to exchange tuples between the stream data processing engine and a custom adaptor. For the input stream, it is called the input stream queue, and for the output stream, the output stream queue. The tuples in these queues are processed on a FIFO basis.
The maximum number of tuples that can be in the input stream queue or the output stream queue is defined in one of the following files:
If more tuples than the specified value are sent, a queue overflow occurs.
This section explains what happens when a queue overflow occurs in the input stream queue or the output stream queue, and an implementation method to prevent queue overflow.
Exactly what happens when a queue overflows differs depending on whether it occurs in the input stream queue or in the output stream queue.
The following table shows what happens when the input stream queue overflows.
Table 6-1 What happens when the input stream queue overflows
| No. | Timestamp mode | Timestamp adjustment function | Action |
|---|---|---|---|
| 1 | Server mode | -- |
|
| 2 | Data source mode | Valid |
|
| 3 | Invalid |
|
The table below shows what happens when the output stream queue overflows. The action varies depending on the specification of the querygroup.sleepOnOverStoreRetryCount parameter in the definition file (system_config.properties or the query group property file).
Table 6-2 What happens when the output stream queue overflows
| No. | Definition file specification | Action |
|---|---|---|
| 1 | If querygroup.sleepOnOverStoreRetryCount != 0 is specified | The number of output tuples to be registered by the query group is compared with the amount of free space in the output stream queue, and execution of the query group is temporarily stopped if the output stream queue would overflow (the thread for the executing query is put to sleep). After that, if the output stream queue would overflow if the output tuples were registered in the output stream queue, the query group is held. Also, because the query group is held, all input tuples registered in the input stream queue are discarded. This hold process, however, does not discard the tuples registered in the output stream queue. |
| 2 | If querygroup.sleepOnOverStoreRetryCount = 0 is specified | Holds the query group. Also, because the query group is held, all input tuples registered in the input stream queue are discarded. This hold process, however, does not discard the tuples registered in the output stream queue. |
This subsection explains how to implement a procedure to prevent the input stream queue from overflowing in a data transmission application.
The following figure provides an overview of a way to implement the data transmission application in order to prevent a queue overflow.
Figure 6-8 Coding the data transmission application to prevent the input stream queue from overflowing
The details of the processes shown in the figure are explained below. The numbers in the explanation correspond to the numbers in the figure.
KFSP42032-W The available space in the stream queue is now at or below the threshold. Stream name = ..., number of elements = ..., upper limit value = ..., threshold (%) = ... |
An example of an implementation that prevents the input stream queue from overflowing is described below. In the implementation example, select either Process 1 or Process 2 according to the evaluation method used.
:
try {
streamInput = connector.openStreamInput(TARGET_QUERYGROUP, TARGET_STREAM);
// Acquire the maximum queue size.
int maxQueueSize = streamInput.getMaxQueueSize();
for (int i=0; i<1000; i++){
Object[] data = new Object[] {
new Integer(i),
new String("data:"+i)
};
StreamTuple tuple = new StreamTuple(data);
try {
streamInput.put(tuple);
} catch (SDPClientFreeInputQueueSizeThresholdOverException sce) {
// The free size in the queue has fallen to 20% or less.
// (When stream.freeInputQueueSizeThreshold=20
// is specified).
while (true) {
int freeQueueSize = streamInput.getFreeQueueSize();
// Process 1: Check the free size in the queue to see
// if you can resume using the put method.
// When the free size in the queue reaches 500 or more,
// resume the put method.
if (freeQueueSize >= 500) {
break;
} else {
Thread.sleep(100);
}
// End of Process 1
// Process 2: Check the ratio between the free size in
// the queue.
// and the maximum size to see if you can resume using
// the put method.
double freePerMax = ((double)freeQueueSize / (double)maxQueueSize) * 100;
// When the free size in the queue reaches 500 or more,
// resume the put method.
if (freePerMax >= 50) {
break;
} else {
Thread.sleep(100);
}
//End of Process 2
}
} catch (SDPClientFreeInputQueueSizeLackException sce2) {
// There is no free size in the queue.
// (Tuples sent using the put method were not accepted by
// the stream data processing engine).
// Therefore, the process is put to sleep before resending
// tuple.
Thread.sleep(100);
try {
// Resend tuples.
streamInput.put(tuple);
} catch (SDPClientFreeInputQueueSizeThresholdOverException sce3) {
// Since tuples were successfully resent, ignore
// the threshold exception here.
}
}
}
streamInput.putEnd();
} catch (SDPClientFreeInputQueueSizeLackException sce4) {
// There is no free size in the queue.
// (Tuples sent using the put method were not accepted by
// the stream data processing engine).
:(omitted)
// Stop the transmission.
streamInput.putEnd();
:(omitted)
} catch (...) {
// Implement the processes that happen when other exceptions
// are caught.
:(omitted)
To prevent the output stream queue from overflowing, specify the querygroup.sleepOnOverStoreRetryCount and querygroup.sleepOnOverStore parameters in the definition file. If these parameters are specified, execution of the query group is temporarily stopped (put to sleep) if the output stream queue is about to overflow.
Furthermore, if the implementation described in (2) Preventing the input stream queue from overflowing is used, sending of input tuples to the input stream queue can be suppressed when the execution of the query group is temporarily stopped. This allows the system's processing to return to normal without causing a queue overflow.
The figure below provides an overview of a process that prevents the output stream queue from overflowing. Coding of the data transmission application is the same as in (2) Preventing the input stream queue from overflowing.
Figure 6-9 Overview of a process that prevents the output stream queue from overflowing
KFSP42033-W The system is retrying registration of a tuple in the stream queue. Query group name = ..., stream name = ..., sleep time until retry = ..., retry iterations = ... |
All Rights Reserved. Copyright (C) 2011, Hitachi, Ltd.