ストリームデータ処理基盤 uCosminexus Stream Data Platform - Application Framework アプリケーション開発ガイド

[目次][索引][前へ][次へ]

7.4.2 データソースモードでの時刻情報の設定

ストリームデータのタプルの時刻情報をデータソースモードで管理する場合,データ送信APで時刻情報を設定する必要があります。また,送信するタプルの時刻情報が昇順になるように,データ送信APで制御する必要があります。

ここでは,タプルに時刻情報を設定するためにデータ送信APで実装する内容について説明します。また,データ送信後のストリームデータ処理エンジンでの処理についても説明します。

なお,ここで説明する処理は,サーバモードの場合は不要な処理です。

<この項の構成>
(1) タプルへの時刻情報の設定
(2) タプル送信時の時刻情報の制御
(3) タプル入力完了後の処理

(1) タプルへの時刻情報の設定

データソースモードでは,送信するタプルのカラムデータに時刻情報を設定する必要があります。

時刻情報を設定するためには,クエリの定義と,データ送信APでの実装が必要です。それぞれの例を次に示します。

クエリの定義
クエリ定義ファイルに記述するストリーム定義のスキーマ指定文字列に,時刻情報をTIMESTAMP型で定義します。定義例を次に示します。
 
REGISTER STREAM s1(t1 TIMESTAMP, id INT, name VARCHAR(10));
 
データ送信APでの実装
データ送信APで,タプルのカラムデータに格納する時刻情報として,java.sql.TimestampクラスのJavaオブジェクトを作成します。実装例を次に示します。
    :
    :
//タプルオブジェクトを生成します
Object[] data = new Object[]{
  new Timestamp(System.currentTimeMillis()),
  new Integer(100),
  new String("data1")
};
 
//オブジェクトをタプルに設定します
StreamTuple tuple1 = new StreamTuple(data);
    :
    :
実際にデータ送信APを作成する場合,new Timestampには,ログファイルなどに含まれるデータソースの時刻情報を抽出して指定してください。

(2) タプル送信時の時刻情報の制御

データソースモードの場合,送信するタプルの時刻情報が昇順になるように,データ送信APで制御する必要があります。

ストリームデータ処理エンジンでは,入力ストリームごとに,時刻情報の昇順で送信されたタプルに対して処理を実行します。時刻が逆転したタプルが送信された場合,そのタプルはSDPサーバで破棄されますので注意してください。

次の図に,複数の入力ストリームに対して複数のデータ送信APからタプルを送信する場合の処理順序の例を示します。この図では,入力ストリーム1と入力ストリーム2は同じクエリグループに含まれています。また,図中のタプル下に示した数値(「09:00:00」など)は,そのタプルに設定された時刻情報です。

図7-7 複数の入力ストリームに対して複数のデータ送信APからタプルを送信する場合の処理順序の例

[図データ]

一つのクエリグループ内に複数の入力ストリームがある場合,ストリームデータ処理エンジンでは,最も過去の時刻情報が設定されたタプルから処理します。図の例の場合は,入力ストリーム1のタプルを処理したあとで,入力ストリーム2のタプルを処理します。なお,クエリの実行には,putEndメソッドで処理の終了を通知済みのストリーム以外,すべてのストリームにタプルがあることが必要です。一つでも空のストリームがある場合,最も過去のタプルがどのストリームから送信されるかが判断できないため,ストリームデータ処理エンジンは処理を実行しません。

また,時刻情報が逆転したタプルが到着した場合,ストリームデータ処理エンジンでは,そのタプルを破棄します。図の場合は,入力ストリーム3に到着したタプルの時刻情報の順序が逆転していたため,あとから到着したタプルは破棄します。

参考
データソースモードでデータを送信する場合,複数のデータ送信APから送信されたデータについて,誤差によってストリームデータ処理エンジンに到着する順序が逆転して,処理がタイムスタンプ順の昇順にならないことがあります。この場合に,順序が逆転したファイルを破棄しないで処理するためには,タイムスタンプ調整機能を使用して時刻調整を実行する必要があります。タイムスタンプ調整機能による時刻調整については,マニュアル「uCosminexus Stream Data Platform - Application Framework システム構築・運用ガイド」の,タプルのタイムスタンプの調整の検討についての説明を参照してください。

(3) タプル入力完了後の処理

データ送信APからのタプルの入力が完了したら,ストリームデータ処理エンジンに対してデータ送信の完了を通知してください。

データソースモードの場合,ストリームデータ処理エンジンでは,タプルの時刻情報を基に,時系列に処理を実行します。データ送信APからのタプルの入力が完了して,入力ストリームにタプルがなくなると,ストリームデータ処理エンジンの時刻は停止します。ウィンドウ演算などのクエリの処理は動作時刻が進んだときに実行されるため,入力ストリームにタプルがない場合,ストリームデータがストリーム処理エンジン内で滞留してしまうことがあります。

このような滞留を防ぐためには,データ送信AP側で,データ送信の完了を明示的に通知する必要があります。データ送信の完了が通知されると,ストリームデータ処理エンジンでは,滞留しているすべてのタプルの処理を実行して,結果を出力ストリームに出力します。

データ送信完了の通知方法については,「7.4.1 データ受信APの終了契機の把握(データ処理終了の通知)」を参照してください。