ストリームデータ処理基盤 uCosminexus Stream Data Platform - Application Framework アプリケーション開発ガイド
外部定義ストリーム間演算関数を使用する場合の,外部定義関数の作成例を次に示します。
package samples; import java.sql.Timestamp; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import jp.co.Hitachi.soft.sdp.plugin.function.stream.SDPExternalStreamFunction; public class ExternalStreamFunction implements SDPExternalStreamFunction { // 合計値格納マップ Map<String, Long> tempMap = new HashMap<String, Long>(); // 乗数 int mul; // 実行回数 int loopCount; // 外部定義関数の実装クラスのコンストラクタ。 // mul:乗数。クエリに記述した外部定義関数の初期化パラメーターに指定した値。 public ExternalStreamFunction(Integer mul) { this.mul = mul; loopCount = 0; } // 外部定義関数の処理を実装するメソッド。 // ID毎にVALの合計値を保持し、乗数で乗算した結果を出力します。 // inputs:タプルの集合。入力ストリーム毎にタプルが格納されています。 // ts :inputsに格納されているタプルの時刻(タイムスタンプ)。 public List<Object[]> executeStreamFunc(Collection<Object[]>[] inputs, Timestamp ts) { // 戻り値格納リスト List<Object[]> res = new ArrayList<Object[]>(); // 入力ストリーム数分ループ for (int i = 0; i < inputs.length; i++) { // 入力ストリームを取得します Collection<Object[]> values = inputs[i]; // タプル数分ループ for (Object[] value : values) { // IDを取得します String id = (String) value[0]; // VALを取得します Long val = (Long) value[1]; // 合計値をID毎に合計値格納マップに保存します if (tempMap.containsKey(id)) { // 既出IDの場合、加算します tempMap.put(id, tempMap.get(id) + val); } else { // 新規IDの場合、新規に登録します tempMap.put(id, val); } } } // IDの数分ループ for (String key : tempMap.keySet()) { // 戻り値格納リストに出力タプルを格納します res.add(new Object[] { loopCount, key, tempMap.get(key) * mul, ts }); } loopCount++; return res; } // 初期化メソッド public void initialize() { tempMap.clear(); loopCount = 0; } // 終了メソッド public void terminate() { // 処理なし } }
<?xml version="1.0" encoding="UTF-8"?> <!-- All Rights Reserved. Copyright (C) 2013, Hitachi, Ltd. --> <root:ExternalFunctionDefinition xmlns:root="http://www.hitachi.co.jp/soft/xml/sdp/function" xmlns:group="http://www.hitachi.co.jp/soft/xml/sdp/function/functiongroup"> <!-- 外部定義関数グループ定義 --> <group:FunctionGroup name="FG1" path="samples/external/src"> <!-- 関数定義 --> <group:StreamFunction name="FUNC1" class="samples.ExternalStreamFunction"> <!-- 戻り値定義 --> <group:ReturnInformation name="R1" type="INT" /> <group:ReturnInformation name="R2" type="VARCHAR(10)" /> <group:ReturnInformation name="R3" type="BIGINT" /> <group:ReturnInformation name="R4" type="TIMESTAMP(9)" /> </group:StreamFunction> </group:FunctionGroup> </root:ExternalFunctionDefinition>
REGISTER STREAM DATA0(ID VARCHAR(10), VAL BIGINT); // 入力ストリームDATA0から、カラムID毎に最新のタプルを保持するクエリ REGISTER QUERY Q1 SELECT ID, VAL FROM DATA0[PARTITION BY ID ROWS 1]; // 最新のタプルを出力するクエリ REGISTER QUERY Q2 ISTREAM (SELECT * FROM Q1); // 直前のタプルを出力するクエリ REGISTER QUERY Q3 DSTREAM (SELECT * FROM Q1); // 入力タプルのカラムID毎にカラムVALの値を合計し、乗数3で乗算した結果を出力するクエリ REGISTER QUERY SUM1 $*FG1.FUNC1[3](Q2,Q3);
上記の作成例での入力ストリームDATA0と出力ストリームSUM1の関係を,一例として次の図に示します。
図10-2 入力ストリームDATA0と出力ストリームSUM1の関係
図中のクエリでは,次の四つの列を持つ出力ストリームSUM1を出力します。
図に示した一例でのexecuteStreamFuncメソッドの引数を次に示します。
外部定義関数の呼び出し回数 | 第1引数 | 第2引数 | |||
---|---|---|---|---|---|
Collection<Object[]>[0] (出力ストリームQ2) |
Collection<Object[]>[1] (出力ストリームQ3) |
Timestamp (タプルのタイムスタンプ) |
|||
Object[0] (ID) |
Object[1] (VAL) |
Object[0] (ID) |
Object[1] (VAL) |
||
1回目 | A | 100 | なし | なし | 10:00:00 |
2回目 | B | 200 | A | 100 | 10:00:01 |
A | 300 | なし | なし |
All Rights Reserved. Copyright (C) 2010, 2014, Hitachi, Ltd.