ストリームデータ処理基盤 uCosminexus Stream Data Platform - Application Framework アプリケーション開発ガイド
ストリームデータ処理エンジンとカスタムアダプター間でタプルをやり取りする際,通路として入力ストリームのキュー(入力ストリームキュー)と出力ストリームのキュー(出力ストリームキュー)を使用します。これらのキューは,FIFO方式でタプルを制御します。
入力ストリームキューおよび出力ストリームキューで管理できるタプルの最大値は,次の定義ファイルで指定できます。
指定した値を超えた数のタプルが送信されると,キューあふれが発生します。
ここでは,入力ストリームキューまたは出力ストリームキューでキューあふれが発生した場合の動作,およびキューあふれを事前に防止するための実装方法について説明します。
キューあふれが発生した場合の動作は,入力ストリームキューがあふれた場合と出力ストリームキューがあふれた場合で異なります。
入力ストリームキューでキューあふれが発生した場合の動作について,次の表に示します。
表7-1 入力ストリームキューでキューあふれが発生した場合の動作
項番 | タイムスタンプモード | タイムスタンプ調整機能 | 動作 |
---|---|---|---|
1 | サーバモード | − |
|
2 | データソースモード | 有効 |
|
3 | 無効 |
|
出力ストリームキューのキューあふれが発生した場合の動作について,次の表に示します。動作は,定義ファイル(system_config.propertiesまたはクエリグループ用プロパティファイル)のquerygroup.sleepOnOverStoreRetryCountパラメーターの指定によって異なります。
表7-2 出力ストリームキューのキューあふれが発生した場合の動作
項番 | 定義ファイルの指定 | 動作 |
---|---|---|
1 | querygroup.sleepOnOverStoreRetryCount≠0を指定した場合 | クエリグループが登録する出力タプル数と出力ストリームキューの空きサイズを比較して,出力ストリームキューがあふれる場合,クエリグループの実行を一時的に停止します(クエリを実行するスレッドをスリープさせます)。 スリープ後,出力ストリームキューに出力タプルを登録した場合に出力ストリームキューがあふれたときは,クエリグループを閉塞します。また,クエリグループの閉塞に伴って,入力ストリームキューに登録されていたすべてのタプルを破棄します。ただし,その閉塞処理の際に出力ストリームキューに登録されたタプルについては,破棄しません。 |
2 | querygroup.sleepOnOverStoreRetryCount=0を指定した場合 | クエリグループを閉塞します。また,クエリグループの閉塞に伴って入力ストリームキューに登録されたすべてのタプルを破棄します。ただし,その閉塞処理の際に出力ストリームキューに登録されたタプルについては,破棄しません。 |
ここでは,データ送信APの実装によって,入力ストリームキューのキューあふれを事前に防止する方法について説明します。
キューあふれを事前に防止するデータ送信APの実装方法について,概要を次の図に示します。
図7-8 入力ストリームキューのキューあふれを事前に防止するデータ送信APの実装方法
図で示した処理の内容について説明します。説明の番号は図中の番号と対応しています。
KFSP42032-W ストリームキューの空きサイズが閾値以下になりました。ストリーム名=…,要素数=…,上限値=…,閾値(%)=…
入力ストリームキューのキューあふれを事前に防止する実装例を次に示します。なお,実装例中の処理1と処理2は,判定方法に応じてどちらかの処理を実装してください。
: try { streamInput = connector.openStreamInput(TARGET_QUERYGROUP, TARGET_STREAM); //キューの最大サイズを取得する。 int maxQueueSize = streamInput.getMaxQueueSize(); for (int i=0; i<1000; i++){ Object[] data = new Object[] { new Integer(i), new String("data:"+i) }; StreamTuple tuple = new StreamTuple(data); try { streamInput.put(tuple); } catch (SDPClientFreeInputQueueSizeThresholdOverException sce) { //キューの空きサイズが20%以下になった。 //(stream.freeInputQueueSizeThreshold=20指定時)。 while (true) { int freeQueueSize = streamInput.getFreeQueueSize(); //処理1:キューの空きサイズを確認してputメソッドを再開する場合。 //キューの空きサイズが500以上になったら,putメソッドを再開する。 if (freeQueueSize >= 500) { break; } else { Thread.sleep(100); } //処理1はここまで。 //処理2:キューの空きサイズと最大サイズの割合を確認して //putメソッドを再開する場合。 double freePerMax = ((double)freeQueueSize / (double)maxQueueSize) * 100; //キューの空きサイズが50%以上となったら,putメソッドを再開する。 if (freePerMax >= 50) { break; } else { Thread.sleep(100); } //処理2はここまで。 } } catch (SDPClientFreeInputQueueSizeLackException sce2) { //キュー空きサイズがない。 //(putメソッドで送信したタプルはストリームデータ処理エンジンに //受け付けられなかった)。 //このため,タプルを再送する前にスリープする。 Thread.sleep(100); try { //タプルを再送する。 streamInput.put(tuple); } catch (SDPClientFreeInputQueueSizeThresholdOverException sce3) { //タプルの再送が成功したので,ここでのしきい値例外は無視する。 } } } streamInput.putEnd(); } catch (SDPClientFreeInputQueueSizeLackException sce4) { //キューの空きサイズがない。 //(putメソッドで送信したタプルはストリームデータ処理エンジンに //受け付けられなかった)。 :(省略) //送信を停止する。 streamInput.putEnd(); :(省略) } catch (...) { //その他の例外をcatchしたときの処理を実装する。 :(省略)
出力ストリームキューのキューあふれは,定義ファイルのquerygroup.sleepOnOverStoreRetryCountパラメーターおよびquerygroup.sleepOnOverStoreパラメーターの指定で防止します。これらのパラメーターの指定によって,出力ストリームキューがあふれそうな場合に,クエリグループの実行が一時停止(スリープ)されます。
また,「(2) 入力ストリームキューのキューあふれの事前防止」で示した実装をしておくことで,クエリグループの実行が一時停止した場合に,入力ストリームキューに対する入力タプルの送信も抑止できるため,キューをあふれさせることなく,システムの処理を正常に戻すことができます。
出力ストリームキューのキューあふれが事前に防止される処理の概要について,次の図に示します。データ送信APの実装は,「(2) 入力ストリームキューのキューあふれの事前防止」と同じです。
All Rights Reserved. Copyright (C) 2010, 2014, Hitachi, Ltd.