ストリームデータ処理基盤 uCosminexus Stream Data Platform - Application Framework アプリケーション開発ガイド
外部定義ストリーム間演算関数を使用する場合の,外部定義関数の作成例を次に示します。
package samples;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import jp.co.Hitachi.soft.sdp.plugin.function.stream.SDPExternalStreamFunction;
public class ExternalStreamFunction implements SDPExternalStreamFunction {
// 合計値格納マップ
Map<String, Long> tempMap = new HashMap<String, Long>();
// 乗数
int mul;
// 実行回数
int loopCount;
// 外部定義関数の実装クラスのコンストラクタ。
// mul:乗数。クエリに記述した外部定義関数の初期化パラメーターに指定した値。
public ExternalStreamFunction(Integer mul) {
this.mul = mul;
loopCount = 0;
}
// 外部定義関数の処理を実装するメソッド。
// ID毎にVALの合計値を保持し、乗数で乗算した結果を出力します。
// inputs:タプルの集合。入力ストリーム毎にタプルが格納されています。
// ts :inputsに格納されているタプルの時刻(タイムスタンプ)。
public List<Object[]> executeStreamFunc(Collection<Object[]>[] inputs, Timestamp ts) {
// 戻り値格納リスト
List<Object[]> res = new ArrayList<Object[]>();
// 入力ストリーム数分ループ
for (int i = 0; i < inputs.length; i++) {
// 入力ストリームを取得します
Collection<Object[]> values = inputs[i];
// タプル数分ループ
for (Object[] value : values) {
// IDを取得します
String id = (String) value[0];
// VALを取得します
Long val = (Long) value[1];
// 合計値をID毎に合計値格納マップに保存します
if (tempMap.containsKey(id)) {
// 既出IDの場合、加算します
tempMap.put(id, tempMap.get(id) + val);
} else {
// 新規IDの場合、新規に登録します
tempMap.put(id, val);
}
}
}
// IDの数分ループ
for (String key : tempMap.keySet()) {
// 戻り値格納リストに出力タプルを格納します
res.add(new Object[] { loopCount, key, tempMap.get(key) * mul, ts });
}
loopCount++;
return res;
}
// 初期化メソッド
public void initialize() {
tempMap.clear();
loopCount = 0;
}
// 終了メソッド
public void terminate() {
// 処理なし
}
}<?xml version="1.0" encoding="UTF-8"?>
<!-- All Rights Reserved. Copyright (C) 2013, Hitachi, Ltd. -->
<root:ExternalFunctionDefinition
xmlns:root="http://www.hitachi.co.jp/soft/xml/sdp/function"
xmlns:group="http://www.hitachi.co.jp/soft/xml/sdp/function/functiongroup">
<!-- 外部定義関数グループ定義 -->
<group:FunctionGroup name="FG1" path="samples/external/src">
<!-- 関数定義 -->
<group:StreamFunction name="FUNC1" class="samples.ExternalStreamFunction">
<!-- 戻り値定義 -->
<group:ReturnInformation name="R1" type="INT" />
<group:ReturnInformation name="R2" type="VARCHAR(10)" />
<group:ReturnInformation name="R3" type="BIGINT" />
<group:ReturnInformation name="R4" type="TIMESTAMP(9)" />
</group:StreamFunction>
</group:FunctionGroup>
</root:ExternalFunctionDefinition>REGISTER STREAM DATA0(ID VARCHAR(10), VAL BIGINT); // 入力ストリームDATA0から、カラムID毎に最新のタプルを保持するクエリ REGISTER QUERY Q1 SELECT ID, VAL FROM DATA0[PARTITION BY ID ROWS 1]; // 最新のタプルを出力するクエリ REGISTER QUERY Q2 ISTREAM (SELECT * FROM Q1); // 直前のタプルを出力するクエリ REGISTER QUERY Q3 DSTREAM (SELECT * FROM Q1); // 入力タプルのカラムID毎にカラムVALの値を合計し、乗数3で乗算した結果を出力するクエリ REGISTER QUERY SUM1 $*FG1.FUNC1[3](Q2,Q3);
上記の作成例での入力ストリームDATA0と出力ストリームSUM1の関係を,一例として次の図に示します。
図10-2 入力ストリームDATA0と出力ストリームSUM1の関係
図中のクエリでは,次の四つの列を持つ出力ストリームSUM1を出力します。
図に示した一例でのexecuteStreamFuncメソッドの引数を次に示します。
| 外部定義関数の呼び出し回数 | 第1引数 | 第2引数 | |||
|---|---|---|---|---|---|
| Collection<Object[]>[0] (出力ストリームQ2) |
Collection<Object[]>[1] (出力ストリームQ3) |
Timestamp (タプルのタイムスタンプ) |
|||
| Object[0] (ID) |
Object[1] (VAL) |
Object[0] (ID) |
Object[1] (VAL) |
||
| 1回目 | A | 100 | なし | なし | 10:00:00 |
| 2回目 | B | 200 | A | 100 | 10:00:01 |
| A | 300 | なし | なし | ||
All Rights Reserved. Copyright (C) 2010, 2014, Hitachi, Ltd.