ストリームデータ処理基盤 uCosminexus Stream Data Platform - Application Framework アプリケーション開発ガイド
ストリームデータ処理エンジンとカスタムアダプター間でタプルをやり取りする際,通路として入力ストリームのキュー(入力ストリームキュー)と出力ストリームのキュー(出力ストリームキュー)を使用します。これらのキューは,FIFO方式でタプルを制御します。
入力ストリームキューおよび出力ストリームキューで管理できるタプルの最大値は,次の定義ファイルで指定できます。
指定した値を超えた数のタプルが送信されると,キューあふれが発生します。
ここでは,入力ストリームキューまたは出力ストリームキューでキューあふれが発生した場合の動作,およびキューあふれを事前に防止するための実装方法について説明します。
キューあふれが発生した場合の動作は,入力ストリームキューがあふれた場合と出力ストリームキューがあふれた場合で異なります。
入力ストリームキューでキューあふれが発生した場合の動作について,次の表に示します。
表7-1 入力ストリームキューでキューあふれが発生した場合の動作
| 項番 | タイムスタンプモード | タイムスタンプ調整機能 | 動作 |
|---|---|---|---|
| 1 | サーバモード | − |
|
| 2 | データソースモード | 有効 |
|
| 3 | 無効 |
|
出力ストリームキューのキューあふれが発生した場合の動作について,次の表に示します。動作は,定義ファイル(system_config.propertiesまたはクエリグループ用プロパティファイル)のquerygroup.sleepOnOverStoreRetryCountパラメーターの指定によって異なります。
表7-2 出力ストリームキューのキューあふれが発生した場合の動作
| 項番 | 定義ファイルの指定 | 動作 |
|---|---|---|
| 1 | querygroup.sleepOnOverStoreRetryCount≠0を指定した場合 | クエリグループが登録する出力タプル数と出力ストリームキューの空きサイズを比較して,出力ストリームキューがあふれる場合,クエリグループの実行を一時的に停止します(クエリを実行するスレッドをスリープさせます)。 スリープ後,出力ストリームキューに出力タプルを登録した場合に出力ストリームキューがあふれたときは,クエリグループを閉塞します。また,クエリグループの閉塞に伴って,入力ストリームキューに登録されていたすべてのタプルを破棄します。ただし,その閉塞処理の際に出力ストリームキューに登録されたタプルについては,破棄しません。 |
| 2 | querygroup.sleepOnOverStoreRetryCount=0を指定した場合 | クエリグループを閉塞します。また,クエリグループの閉塞に伴って入力ストリームキューに登録されたすべてのタプルを破棄します。ただし,その閉塞処理の際に出力ストリームキューに登録されたタプルについては,破棄しません。 |
ここでは,データ送信APの実装によって,入力ストリームキューのキューあふれを事前に防止する方法について説明します。
キューあふれを事前に防止するデータ送信APの実装方法について,概要を次の図に示します。
図7-8 入力ストリームキューのキューあふれを事前に防止するデータ送信APの実装方法
図で示した処理の内容について説明します。説明の番号は図中の番号と対応しています。
KFSP42032-W ストリームキューの空きサイズが閾値以下になりました。ストリーム名=…,要素数=…,上限値=…,閾値(%)=…
入力ストリームキューのキューあふれを事前に防止する実装例を次に示します。なお,実装例中の処理1と処理2は,判定方法に応じてどちらかの処理を実装してください。
:
try {
streamInput = connector.openStreamInput(TARGET_QUERYGROUP, TARGET_STREAM);
//キューの最大サイズを取得する。
int maxQueueSize = streamInput.getMaxQueueSize();
for (int i=0; i<1000; i++){
Object[] data = new Object[] {
new Integer(i),
new String("data:"+i)
};
StreamTuple tuple = new StreamTuple(data);
try {
streamInput.put(tuple);
} catch (SDPClientFreeInputQueueSizeThresholdOverException sce) {
//キューの空きサイズが20%以下になった。
//(stream.freeInputQueueSizeThreshold=20指定時)。
while (true) {
int freeQueueSize = streamInput.getFreeQueueSize();
//処理1:キューの空きサイズを確認してputメソッドを再開する場合。
//キューの空きサイズが500以上になったら,putメソッドを再開する。
if (freeQueueSize >= 500) {
break;
} else {
Thread.sleep(100);
}
//処理1はここまで。
//処理2:キューの空きサイズと最大サイズの割合を確認して
//putメソッドを再開する場合。
double freePerMax = ((double)freeQueueSize / (double)maxQueueSize) * 100;
//キューの空きサイズが50%以上となったら,putメソッドを再開する。
if (freePerMax >= 50) {
break;
} else {
Thread.sleep(100);
}
//処理2はここまで。
}
} catch (SDPClientFreeInputQueueSizeLackException sce2) {
//キュー空きサイズがない。
//(putメソッドで送信したタプルはストリームデータ処理エンジンに
//受け付けられなかった)。
//このため,タプルを再送する前にスリープする。
Thread.sleep(100);
try {
//タプルを再送する。
streamInput.put(tuple);
} catch (SDPClientFreeInputQueueSizeThresholdOverException sce3) {
//タプルの再送が成功したので,ここでのしきい値例外は無視する。
}
}
}
streamInput.putEnd();
} catch (SDPClientFreeInputQueueSizeLackException sce4) {
//キューの空きサイズがない。
//(putメソッドで送信したタプルはストリームデータ処理エンジンに
//受け付けられなかった)。
:(省略)
//送信を停止する。
streamInput.putEnd();
:(省略)
} catch (...) {
//その他の例外をcatchしたときの処理を実装する。
:(省略)
出力ストリームキューのキューあふれは,定義ファイルのquerygroup.sleepOnOverStoreRetryCountパラメーターおよびquerygroup.sleepOnOverStoreパラメーターの指定で防止します。これらのパラメーターの指定によって,出力ストリームキューがあふれそうな場合に,クエリグループの実行が一時停止(スリープ)されます。
また,「(2) 入力ストリームキューのキューあふれの事前防止」で示した実装をしておくことで,クエリグループの実行が一時停止した場合に,入力ストリームキューに対する入力タプルの送信も抑止できるため,キューをあふれさせることなく,システムの処理を正常に戻すことができます。
出力ストリームキューのキューあふれが事前に防止される処理の概要について,次の図に示します。データ送信APの実装は,「(2) 入力ストリームキューのキューあふれの事前防止」と同じです。
All Rights Reserved. Copyright (C) 2010, 2014, Hitachi, Ltd.