6.4.3 Preventing queue overflow
Queues are used to exchange tuples between the stream data processing engine and a custom adaptor. For the input stream, it is called the input stream queue, and for the output stream, the output stream queue. The tuples in these queues are processed on a FIFO basis.
The maximum number of tuples that can be in the input stream queue or the output stream queue is defined in one of the following files:
- system_config.properties
- Query group property file
- Stream property file
If more tuples than the specified value are sent, a queue overflow occurs.
This section explains what happens when a queue overflow occurs in the input stream queue or the output stream queue, and an implementation method to prevent queue overflow.
- Organization of this subsection
- (1) What happens when a queue overflows
- (2) Preventing the input stream queue from overflowing
- (3) Preventing the output stream queue from overflowing
(1) What happens when a queue overflows
Exactly what happens when a queue overflows differs depending on whether it occurs in the input stream queue or in the output stream queue.
The following table shows what happens when the input stream queue overflows.
Table 6-1 What happens when the input stream queue overflows
No. | Timestamp mode | Timestamp adjustment function | Action |
---|
1 | Server mode | -- | - The exception SDPClientFreeInputQueueSizeLackException is thrown when the data transmission application calls the put method.
- The stream data processing engine compares the number of tuples sent from the data transmission application with the free size in the input stream queue, and does not register any of the sent tuples in the input stream queue if it would overflow. The stream data processing engine does not, however, prevent the query group from being processed.
|
2 | Data source mode | Valid | - The exception SDPClientException is thrown when the data transmission application calls the put method.
- The stream data processing engine prevents the query group from being processed. Also, because the query group is not processed, all input tuples in the input stream queue are discarded. This processing, however, does not discard any tuples already in the output stream queue.
|
3 | Invalid | - The exception SDPClientFreeInputQueueSizeLackException is thrown when the data transmission application calls the put method.
- The stream data processing engine compares the number of tuples sent from the data transmission application with the free size in the input stream queue, and does not register any of the sent tuples in the input stream queue if it would overflow. The stream data processing engine does not, however, prevent the query group from being processed.
|
- Legend:
- --: Not applicable
The table below shows what happens when the output stream queue overflows. The action varies depending on the specification of the querygroup.sleepOnOverStoreRetryCount parameter in the definition file (system_config.properties or the query group property file).
Table 6-2 What happens when the output stream queue overflows
No. | Definition file specification | Action |
---|
1 | If querygroup.sleepOnOverStoreRetryCount != 0 is specified | The number of output tuples to be registered by the query group is compared with the amount of free space in the output stream queue, and execution of the query group is temporarily stopped if the output stream queue would overflow (the thread for the executing query is put to sleep). After that, if the output stream queue would overflow if the output tuples were registered in the output stream queue, the query group is held. Also, because the query group is held, all input tuples registered in the input stream queue are discarded. This hold process, however, does not discard the tuples registered in the output stream queue. |
2 | If querygroup.sleepOnOverStoreRetryCount = 0 is specified | Holds the query group. Also, because the query group is held, all input tuples registered in the input stream queue are discarded. This hold process, however, does not discard the tuples registered in the output stream queue. |
(2) Preventing the input stream queue from overflowing
This subsection explains how to implement a procedure to prevent the input stream queue from overflowing in a data transmission application.
- Hint
- To enable the process explained here, you must specify the stream.freeInputQueueSizeThreshold parameter in the SDP server definition file (system_config.properties, the query group property file, or the stream property file) in advance.
- In the example described below, stream.freeInputQueueSizeThreshold=20 is specified.
The following figure provides an overview of a way to implement the data transmission application in order to prevent a queue overflow.
Figure 6-8 Coding the data transmission application to prevent the input stream queue from overflowing
![[Figure]](figure/zc060800.gif)
The details of the processes shown in the figure are explained below. The numbers in the explanation correspond to the numbers in the figure.
- The data transmission application calls the getMaxQueueSize method and acquires the maximum size in the input stream queue. This value is used in step 5 to calculate the remaining percentage of free space in the input stream queue.
- The data transmission application calls the put method to send the input tuples to the stream data processing engine. The sent tuples are stored in the input stream queue managed by the query group.
- If the percentage of free space in the input stream queue falls to or below the threshold specified in the definition file (the value specified in the stream.freeInputQueueSizeThreshold parameter), the stream data processing engine sends the exception SDPClientFreeInputQueueSizeThresholdOverException to the data transmission application.
- The notification from the stream data processing engine causes the data transmission application to catch the exception SDPClientFreeInputQueueSizeThresholdOverException that is thrown by the put method.
If stream.freeInputQueueSizeThresholdOutputMessage=true is specified in the definition file, the following message is issued to the message log:
KFSP42032-W The available space in the stream queue is now at or below the threshold. Stream name = ..., number of elements = ..., upper limit value = ..., threshold (%) = ... |
- The data transmission application calls the getFreeQueueSize method and acquires the amount of free space in the input stream queue. This value is used to determine whether calling of the put method can be resumed.
If it is determined that there is enough space, calling of the put method is resumed.
If it is determined that there is not enough space, the process is put to sleep for a set amount of time, after which the getFreeQueueSize method is called again to determine whether calling of the put method can be resumed.
While this determination process is going on, tuples are not sent until enough free space can be secured in the input stream queue.
- Note
- If the input stream queue overflows even though you specified stream.freeInputQueueSizeThreshold, the stream data processing engine throws the exception SDPClientFreeInputQueueSizeLackException to the data transmission application. If this happens, the exception SDPClientFreeInputQueueSizeThresholdOverException is not thrown.
An example of an implementation that prevents the input stream queue from overflowing is described below. In the implementation example, select either Process 1 or Process 2 according to the evaluation method used.
Implementation example
:
try {
streamInput = connector.openStreamInput(TARGET_QUERYGROUP, TARGET_STREAM);
// Acquire the maximum queue size.
int maxQueueSize = streamInput.getMaxQueueSize();
for (int i=0; i<1000; i++){
Object[] data = new Object[] {
new Integer(i),
new String("data:"+i)
};
StreamTuple tuple = new StreamTuple(data);
try {
streamInput.put(tuple);
} catch (SDPClientFreeInputQueueSizeThresholdOverException sce) {
// The free size in the queue has fallen to 20% or less.
// (When stream.freeInputQueueSizeThreshold=20
// is specified).
while (true) {
int freeQueueSize = streamInput.getFreeQueueSize();
// Process 1: Check the free size in the queue to see
// if you can resume using the put method.
// When the free size in the queue reaches 500 or more,
// resume the put method.
if (freeQueueSize >= 500) {
break;
} else {
Thread.sleep(100);
}
// End of Process 1
// Process 2: Check the ratio between the free size in
// the queue.
// and the maximum size to see if you can resume using
// the put method.
double freePerMax = ((double)freeQueueSize / (double)maxQueueSize) * 100;
// When the free size in the queue reaches 500 or more,
// resume the put method.
if (freePerMax >= 50) {
break;
} else {
Thread.sleep(100);
}
//End of Process 2
}
} catch (SDPClientFreeInputQueueSizeLackException sce2) {
// There is no free size in the queue.
// (Tuples sent using the put method were not accepted by
// the stream data processing engine).
// Therefore, the process is put to sleep before resending
// tuple.
Thread.sleep(100);
try {
// Resend tuples.
streamInput.put(tuple);
} catch (SDPClientFreeInputQueueSizeThresholdOverException sce3) {
// Since tuples were successfully resent, ignore
// the threshold exception here.
}
}
}
streamInput.putEnd();
} catch (SDPClientFreeInputQueueSizeLackException sce4) {
// There is no free size in the queue.
// (Tuples sent using the put method were not accepted by
// the stream data processing engine).
:(omitted)
// Stop the transmission.
streamInput.putEnd();
:(omitted)
} catch (...) {
// Implement the processes that happen when other exceptions
// are caught.
:(omitted)
(3) Preventing the output stream queue from overflowing
To prevent the output stream queue from overflowing, specify the querygroup.sleepOnOverStoreRetryCount and querygroup.sleepOnOverStore parameters in the definition file. If these parameters are specified, execution of the query group is temporarily stopped (put to sleep) if the output stream queue is about to overflow.
Furthermore, if the implementation described in (2) Preventing the input stream queue from overflowing is used, sending of input tuples to the input stream queue can be suppressed when the execution of the query group is temporarily stopped. This allows the system's processing to return to normal without causing a queue overflow.
The figure below provides an overview of a process that prevents the output stream queue from overflowing. Coding of the data transmission application is the same as in (2) Preventing the input stream queue from overflowing.
- Hint
- To enable the process explained here, you must specify the following parameters in the SDP server definition file:
- stream.freeInputQueueSizeThreshold
- querygroup.sleepOnOverStoreRetryCount
- querygroup.sleepOnOverStore
- In the example, stream.freeInputQueueSizeThreshold=20, querygroup.sleepOnOverStoreRetryCount=1, and querygroup.sleepOnOverStore=100 are specified.
Figure 6-9 Overview of a process that prevents the output stream queue from overflowing
![[Figure]](figure/zc060900.gif)
- The details of the processes shown in the figure are explained below. The numbers in the explanation correspond to the numbers in the figure.
- The data transmission application calls the getMaxQueueSize method and acquires the maximum size of the input stream queue. This value is used to calculate the remaining percentage of free space in the input stream queue.
- The data transmission application calls the put method to send tuples to the stream data processing engine. The sent tuples are stored in the input stream queue managed by the query group.
- If the stream data processing engine detects that the output tuples sent by query execution will cause the output stream queue to overflow, it temporarily stops (puts to sleep) execution of the query group. Whether the query group is put to sleep and how long it will be kept sleeping are specified in the querygroup.sleepOnOverStoreRetryCount and querygroup.sleepOnOverStore parameters.
If the query group goes to sleep, the stream data processing engine outputs the following message to the message log:
KFSP42033-W The system is retrying registration of a tuple in the stream queue. Query group name = ..., stream name = ..., sleep time until retry = ..., retry iterations = ... |
Note that the status of the sleeping query group is still set to Executing in this case. The data transmission application is still allowed to send tuples to the input stream queue, and the data reception application is still allowed to acquire results from the output stream queue.
- While execution of the query group is sleeping, if the data transmission application continues to call the put method, unprocessed tuples continue to accumulate in the input stream queue. When the percentage of free space in the input stream queue falls to or below the threshold specified in the definition file (the value specified in the stream.freeInputQueueSizeThreshold parameter), the stream data processing engine sends the exception SDPClientFreeInputQueueSizeThresholdOverException to the data transmission application.
- The notification from the stream data processing engine causes the data transmission application to catch the exception SDPClientFreeInputQueueSizeThresholdOverException that is thrown by the put method.
- The data transmission application calls the getFreeQueueSize method and acquires the amount of free space in the input stream queue. This value is used to determine whether calling of the put method can be resumed.
If it is determined that there is enough space in the input stream queue, calling of the put method is resumed.
If it is determined that there is not enough space in the input stream queue, the process is put to sleep for a set amount of time, after which the getFreeQueueSize method is called again to determine whether calling of the put method can be resumed.
In this way, the sending of input tuples using the put method is suppressed until a sufficient amount of free space is available in the input stream queue. When a sufficient amount of free space is available in the output stream queue and the execution of the query group is resumed to process the tuples in the input stream queue, input tuples can be sent again, and the system returns to its normal state.