ストリームデータ処理基盤 uCosminexus Stream Data Platform - Application Framework アプリケーション開発ガイド
ストリームデータのタプルの時刻情報をデータソースモードで管理する場合,データ送信APで時刻情報を設定する必要があります。また,送信するタプルの時刻情報が昇順になるように,データ送信APで制御する必要があります。
ここでは,タプルに時刻情報を設定するためにデータ送信APで実装する内容について説明します。また,データ送信後のストリームデータ処理エンジンでの処理についても説明します。
なお,ここで説明する処理は,サーバモードの場合は不要な処理です。
データソースモードでは,送信するタプルのカラムデータに時刻情報を設定する必要があります。
時刻情報を設定するためには,クエリの定義と,データ送信APでの実装が必要です。それぞれの例を次に示します。
REGISTER STREAM s1(t1 TIMESTAMP, id INT, name VARCHAR(10));
:
:
//タプルオブジェクトを生成します
Object[] data = new Object[]{
new Timestamp(System.currentTimeMillis()),
new Integer(100),
new String("data1")
};
//オブジェクトをタプルに設定します
StreamTuple tuple1 = new StreamTuple(data);
:
:データソースモードの場合,送信するタプルの時刻情報が昇順になるように,データ送信APで制御する必要があります。
ストリームデータ処理エンジンでは,入力ストリームごとに,時刻情報の昇順で送信されたタプルに対して処理を実行します。時刻が逆転したタプルが送信された場合,そのタプルはSDPサーバで破棄されますので注意してください。
次の図に,複数の入力ストリームに対して複数のデータ送信APからタプルを送信する場合の処理順序の例を示します。この図では,入力ストリーム1と入力ストリーム2は同じクエリグループに含まれています。また,図中のタプル下に示した数値(「09:00:00」など)は,そのタプルに設定された時刻情報です。
図7-7 複数の入力ストリームに対して複数のデータ送信APからタプルを送信する場合の処理順序の例
一つのクエリグループ内に複数の入力ストリームがある場合,ストリームデータ処理エンジンでは,最も過去の時刻情報が設定されたタプルから処理します。図の例の場合は,入力ストリーム1のタプルを処理したあとで,入力ストリーム2のタプルを処理します。なお,クエリの実行には,putEndメソッドで処理の終了を通知済みのストリーム以外,すべてのストリームにタプルがあることが必要です。一つでも空のストリームがある場合,最も過去のタプルがどのストリームから送信されるかが判断できないため,ストリームデータ処理エンジンは処理を実行しません。
また,時刻情報が逆転したタプルが到着した場合,ストリームデータ処理エンジンでは,そのタプルを破棄します。図の場合は,入力ストリーム3に到着したタプルの時刻情報の順序が逆転していたため,あとから到着したタプルは破棄します。
データ送信APからのタプルの入力が完了したら,ストリームデータ処理エンジンに対してデータ送信の完了を通知してください。
データソースモードの場合,ストリームデータ処理エンジンでは,タプルの時刻情報を基に,時系列に処理を実行します。データ送信APからのタプルの入力が完了して,入力ストリームにタプルがなくなると,ストリームデータ処理エンジンの時刻は停止します。ウィンドウ演算などのクエリの処理は動作時刻が進んだときに実行されるため,入力ストリームにタプルがない場合,ストリームデータがストリーム処理エンジン内で滞留してしまうことがあります。
このような滞留を防ぐためには,データ送信AP側で,データ送信の完了を明示的に通知する必要があります。データ送信の完了が通知されると,ストリームデータ処理エンジンでは,滞留しているすべてのタプルの処理を実行して,結果を出力ストリームに出力します。
データ送信完了の通知方法については,「7.4.1 データ受信APの終了契機の把握(データ処理終了の通知)」を参照してください。
All Rights Reserved. Copyright (C) 2010, 2014, Hitachi, Ltd.