ストリームデータ処理基盤 uCosminexus Stream Data Platform - Application Framework アプリケーション開発ガイド

[目次][索引][前へ][次へ]

7.4.3 キューあふれの事前防止

ストリームデータ処理エンジンとカスタムアダプター間でタプルをやり取りする際,通路として入力ストリームのキュー(入力ストリームキュー)と出力ストリームのキュー(出力ストリームキュー)を使用します。これらのキューは,FIFO方式でタプルを制御します。

入力ストリームキューおよび出力ストリームキューで管理できるタプルの最大値は,次の定義ファイルで指定できます。

指定した値を超えた数のタプルが送信されると,キューあふれが発生します。

ここでは,入力ストリームキューまたは出力ストリームキューでキューあふれが発生した場合の動作,およびキューあふれを事前に防止するための実装方法について説明します。

<この項の構成>
(1) キューあふれが発生した場合の動作
(2) 入力ストリームキューのキューあふれの事前防止
(3) 出力ストリームキューのキューあふれの事前防止

(1) キューあふれが発生した場合の動作

キューあふれが発生した場合の動作は,入力ストリームキューがあふれた場合と出力ストリームキューがあふれた場合で異なります。

入力ストリームキューでキューあふれが発生した場合の動作について,次の表に示します。

表7-1 入力ストリームキューでキューあふれが発生した場合の動作

項番 タイムスタンプモード タイムスタンプ調整機能 動作
1 サーバモード
  • データ送信APでは,putメソッドの呼び出しに対して例外SDPClientFreeInputQueueSizeLackExceptionがスローされます。
  • ストリームデータ処理エンジンでは,データ送信APから送信されたタプル数と入力ストリームキューの空きサイズを比較して入力ストリームキューがあふれる場合,送信されたタプルを一切入力ストリームキューに登録しません。ただし,クエリグループは閉塞しません。
2 データソースモード 有効
  • データ送信APでは,putメソッドの呼び出しに対して例外SDPClientExceptionがスローされます。
  • ストリームデータ処理エンジンでは,クエリグループを閉塞します。また,クエリグループの閉塞に伴って,入力ストリームキューに登録されていたすべての入力タプルを破棄します。ただし,その閉塞処理によって出力ストリームキューに登録されたタプルは破棄しません。
3 無効
  • データ送信APでは,putメソッドの呼び出しに対して例外SDPClientFreeInputQueueSizeLackExceptionがスローされます。
  • ストリームデータ処理エンジンでは,データ送信APから送信されたタプル数と入力ストリームキューの空きサイズを比較して入力ストリームキューがあふれる場合,送信されたタプルを一切入力ストリームキューに登録しません。ただし,クエリグループは閉塞しません。
(凡例)
−:該当しません。

出力ストリームキューのキューあふれが発生した場合の動作について,次の表に示します。動作は,定義ファイル(system_config.propertiesまたはクエリグループ用プロパティファイル)のquerygroup.sleepOnOverStoreRetryCountパラメーターの指定によって異なります。

表7-2 出力ストリームキューのキューあふれが発生した場合の動作

項番 定義ファイルの指定 動作
1 querygroup.sleepOnOverStoreRetryCount≠0を指定した場合 クエリグループが登録する出力タプル数と出力ストリームキューの空きサイズを比較して,出力ストリームキューがあふれる場合,クエリグループの実行を一時的に停止します(クエリを実行するスレッドをスリープさせます)。
スリープ後,出力ストリームキューに出力タプルを登録した場合に出力ストリームキューがあふれたときは,クエリグループを閉塞します。また,クエリグループの閉塞に伴って,入力ストリームキューに登録されていたすべてのタプルを破棄します。ただし,その閉塞処理の際に出力ストリームキューに登録されたタプルについては,破棄しません。
2 querygroup.sleepOnOverStoreRetryCount=0を指定した場合 クエリグループを閉塞します。また,クエリグループの閉塞に伴って入力ストリームキューに登録されたすべてのタプルを破棄します。ただし,その閉塞処理の際に出力ストリームキューに登録されたタプルについては,破棄しません。

(2) 入力ストリームキューのキューあふれの事前防止

ここでは,データ送信APの実装によって,入力ストリームキューのキューあふれを事前に防止する方法について説明します。

ポイント
ここで説明する処理を有効にするためには,SDPサーバの定義ファイル(system_config.properties,クエリグループ用プロパティファイル,またはストリーム用プロパティファイル)でstream.freeInputQueueSizeThresholdパラメーターを指定しておく必要があります。
なお,以降の例は,stream.freeInputQueueSizeThreshold=20と指定した場合の例です。

キューあふれを事前に防止するデータ送信APの実装方法について,概要を次の図に示します。

図7-8 入力ストリームキューのキューあふれを事前に防止するデータ送信APの実装方法

[図データ]

図で示した処理の内容について説明します。説明の番号は図中の番号と対応しています。

  1. データ送信APでgetMaxQueueSizeメソッドを呼び出し,入力ストリームのキューの最大サイズを取得します。この値は,手順5.で入力ストリームキューの空きサイズが全体の何割残っているかを算出するときに使用します。
  2. データ送信APでは,putメソッドによって,ストリームデータ処理エンジンに入力タプルを送信します。送信されたタプルは,ストリームデータ処理エンジンのクエリグループで管理されている入力ストリームキューに格納されます。
  3. 入力ストリームキューの空きサイズが,定義ファイルに指定したしきい値(stream.freeInputQueueSizeThresholdの指定値)の割合以下になると,ストリームデータ処理エンジンからデータ送信APに例外SDPClientFreeInputQueueSizeThresholdOverExceptionが通知されます。
  4. データ送信APでは,ストリームデータ処理エンジンからの通知によってputメソッドに対してスローされた例外SDPClientFreeInputQueueSizeThresholdOverExceptionをキャッチします。
    なお,このとき,定義ファイルにstream.freeInputQueueSizeThresholdOutputMessage=trueを指定していた場合は,メッセージログに次のメッセージが出力されます。
     
    KFSP42032-W ストリームキューの空きサイズが閾値以下になりました。ストリーム名=…,要素数=…,上限値=…,閾値(%)=…
     
  5. データ送信APで,getFreeQueueSizeメソッドを呼び出して,入力ストリームキューの空きサイズを取得します。この値を基に,putメソッドの呼び出しを再開できるかどうかを判定します。
    再開できると判定した場合,putメソッドの呼び出しを再開します。
    再開できないと判定した場合,一定時間スリープしたあとで,再度getFreeQueueSizeメソッドを呼び出して,putメソッドの呼び出しを再開できるか判定します。
    この判定処理によって,入力ストリームキューの空きサイズが確保できるまで,putメソッドによるタプルの送信は抑止されます。

 

注意
stream.freeInputQueueSizeThresholdを指定しても入力ストリームキューがあふれてしまった場合,ストリームデータ処理エンジンは,データ送信APに対して,例外SDPClientFreeInputQueueSizeLackExceptionをスローします。この場合,例外SDPClientFreeInputQueueSizeThresholdOverExceptionはスローされません。

入力ストリームキューのキューあふれを事前に防止する実装例を次に示します。なお,実装例中の処理1と処理2は,判定方法に応じてどちらかの処理を実装してください。

実装例

:
try {
  streamInput = connector.openStreamInput(TARGET_QUERYGROUP, TARGET_STREAM);
 
  //キューの最大サイズを取得する。
  int maxQueueSize = streamInput.getMaxQueueSize();
  
  for (int i=0; i<1000; i++){
    Object[] data = new Object[] {
      new Integer(i),
      new String("data:"+i)
      };
    StreamTuple tuple = new StreamTuple(data);
    
    try {
      streamInput.put(tuple);
    } catch (SDPClientFreeInputQueueSizeThresholdOverException sce) {
      //キューの空きサイズが20%以下になった。
      //(stream.freeInputQueueSizeThreshold=20指定時)。
      while (true) {
        int freeQueueSize = streamInput.getFreeQueueSize();
  
  
        //処理1:キューの空きサイズを確認してputメソッドを再開する場合。
        //キューの空きサイズが500以上になったら,putメソッドを再開する。
        if (freeQueueSize >= 500) {
          break;
        } else {
          Thread.sleep(100);
        }
        //処理1はここまで。
  
        //処理2:キューの空きサイズと最大サイズの割合を確認して
        //putメソッドを再開する場合。
        double freePerMax = ((double)freeQueueSize / (double)maxQueueSize) * 100;
        //キューの空きサイズが50%以上となったら,putメソッドを再開する。
        if (freePerMax >= 50) {
          break;
        } else {
          Thread.sleep(100);
        }
        //処理2はここまで。
  
      }
    } catch (SDPClientFreeInputQueueSizeLackException sce2) {
      //キュー空きサイズがない。
      //(putメソッドで送信したタプルはストリームデータ処理エンジンに
      //受け付けられなかった)。
      //このため,タプルを再送する前にスリープする。
      Thread.sleep(100);
      try {
        //タプルを再送する。
        streamInput.put(tuple);
      } catch (SDPClientFreeInputQueueSizeThresholdOverException sce3) {
        //タプルの再送が成功したので,ここでのしきい値例外は無視する。
      }
    }
  }
  streamInput.putEnd();
} catch (SDPClientFreeInputQueueSizeLackException sce4) {
  //キューの空きサイズがない。
  //(putメソッドで送信したタプルはストリームデータ処理エンジンに
  //受け付けられなかった)。
  :(省略)
  //送信を停止する。
  streamInput.putEnd();
  :(省略)
} catch (...) {
  //その他の例外をcatchしたときの処理を実装する。
  :(省略)

(3) 出力ストリームキューのキューあふれの事前防止

出力ストリームキューのキューあふれは,定義ファイルのquerygroup.sleepOnOverStoreRetryCountパラメーターおよびquerygroup.sleepOnOverStoreパラメーターの指定で防止します。これらのパラメーターの指定によって,出力ストリームキューがあふれそうな場合に,クエリグループの実行が一時停止(スリープ)されます。

また,「(2) 入力ストリームキューのキューあふれの事前防止」で示した実装をしておくことで,クエリグループの実行が一時停止した場合に,入力ストリームキューに対する入力タプルの送信も抑止できるため,キューをあふれさせることなく,システムの処理を正常に戻すことができます。

出力ストリームキューのキューあふれが事前に防止される処理の概要について,次の図に示します。データ送信APの実装は,「(2) 入力ストリームキューのキューあふれの事前防止」と同じです。

ポイント
ここで説明する処理を有効にするためには,SDPサーバの定義ファイルで次のパラメーターを指定しておく必要があります。
  • stream.freeInputQueueSizeThreshold
  • querygroup.sleepOnOverStoreRetryCount
  • querygroup.sleepOnOverStore
なお,この例は,それぞれ,stream.freeInputQueueSizeThreshold=20,querygroup.sleepOnOverStoreRetryCount=1,querygroup.sleepOnOverStore=100と指定した場合の例です。

図7-9 出力ストリームキューのキューあふれが事前に防止される処理の概要

[図データ]
図で示した処理の内容について説明します。説明の番号は図中の番号と対応しています。
  1. データ送信APでgetMaxQueueSizeメソッドを呼び出し,入力ストリームのキューの最大サイズを取得します。この値は,入力ストリームキューの空きサイズが全体の何割残っているかを算出するときに使用します。
  2. データ送信APでは,putメソッドによって,ストリームデータ処理エンジンにタプルを送信します。送信されたタプルは,ストリームデータ処理エンジンのクエリグループで管理されている入力ストリームキューに格納されます。
  3. ストリームデータ処理エンジンでは,クエリ実行によって送信された出力タプルによって出力ストリームキューがあふれることを検知すると,クエリグループの実行を一時停止(スリープ)します。スリープするかどうかの設定,およびスリープさせる時間については,querygroup.sleepOnOverStoreRetryCountパラメーターおよびquerygroup.sleepOnOverStoreパラメーターの指定値に従います。
    クエリグループがスリープすると,ストリームデータ処理エンジンは,メッセージログに次のメッセージを出力します。
     
    KFSP42033-W ストリームキューへのタプルの登録をリトライします。クエリグループ名=…,ストリーム名=…,リトライまでのスリープ時間=…,リトライ回数=…
     
    なお,このとき,スリープ中のクエリグループのステータスは「実行中」となっています。データ送信APによる入力ストリームキューに対するタプルの送信,およびデータ受信APによる出力ストリームキューからのタプルの取得は許可されています。
  4. クエリグループの実行がスリープしている間,データ送信APがputメソッドを呼び出し続けると,入力ストリームキューに処理されていないタプルが蓄積されていきます。入力ストリームキューの空きサイズが,定義ファイルに指定したしきい値(stream.freeInputQueueSizeThresholdパラメーターの指定値)の割合以下になると,ストリームデータ処理エンジンからデータ送信APに例外SDPClientFreeInputQueueSizeThresholdOverExceptionが通知されます。
  5. データ送信APでは,ストリームデータ処理エンジンからの通知によってputメソッドに対してスローされた例外SDPClientFreeInputQueueSizeThresholdOverExceptionをキャッチします。
  6. データ送信APで,getFreeQueueSizeメソッドを呼び出して,入力ストリームキューの空きサイズを取得します。この値を基に,putメソッドの呼び出しを再開できるかどうかを判定します。
    再開できると判定した場合,putメソッドの呼び出しを再開します。
    再開できないと判定した場合,一定時間スリープしたあとで,再度getFreeQueueSizeメソッドを呼び出して,putメソッドの呼び出しを再開できるか判定します。
    この処理によって,入力ストリームキューの空きサイズが確保できるまで,putメソッドによる入力タプルの送信は抑止されます。出力ストリームキューの空きサイズが確保され,クエリグループの実行が再開されて入力ストリームキューのタプルが処理されると,再度,入力タプルの送信ができるようになり,システムが正常な状態に戻ります。