ストリームデータ処理基盤 uCosminexus Stream Data Platform - Application Framework アプリケーション開発ガイド

[目次][索引][前へ][次へ]

10.3 外部定義関数の作成例

外部定義ストリーム間演算関数を使用する場合の,外部定義関数の作成例を次に示します。

外部定義関数のクラスファイルの作成例
package samples;
 
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
 
import jp.co.Hitachi.soft.sdp.plugin.function.stream.SDPExternalStreamFunction;
 
public class ExternalStreamFunction implements SDPExternalStreamFunction {
    // 合計値格納マップ
    Map<String, Long> tempMap = new HashMap<String, Long>();
    // 乗数
    int mul;
    // 実行回数
    int loopCount;
    
    // 外部定義関数の実装クラスのコンストラクタ。
    // mul:乗数。クエリに記述した外部定義関数の初期化パラメーターに指定した値。
    public ExternalStreamFunction(Integer mul) {
        this.mul = mul;
        loopCount = 0;
    }
    
    // 外部定義関数の処理を実装するメソッド。
    // ID毎にVALの合計値を保持し、乗数で乗算した結果を出力します。
    // inputs:タプルの集合。入力ストリーム毎にタプルが格納されています。
    // ts    :inputsに格納されているタプルの時刻(タイムスタンプ)。
    public List<Object[]> executeStreamFunc(Collection<Object[]>[] inputs, Timestamp ts) {
        // 戻り値格納リスト
        List<Object[]> res = new ArrayList<Object[]>();
        // 入力ストリーム数分ループ
        for (int i = 0; i < inputs.length; i++) {
            // 入力ストリームを取得します
            Collection<Object[]> values = inputs[i];
            // タプル数分ループ
            for (Object[] value : values) {
                // IDを取得します
                String id = (String) value[0];
                // VALを取得します
                Long val = (Long) value[1];
                // 合計値をID毎に合計値格納マップに保存します
                if (tempMap.containsKey(id)) {
                    // 既出IDの場合、加算します
                    tempMap.put(id, tempMap.get(id) + val);
                } else {
                    // 新規IDの場合、新規に登録します
                    tempMap.put(id, val);
                }
            }
        }
        // IDの数分ループ
        for (String key : tempMap.keySet()) {
            // 戻り値格納リストに出力タプルを格納します
            res.add(new Object[] { loopCount, key, tempMap.get(key) * mul, ts });
        }
        loopCount++;
        return res;
    }
    // 初期化メソッド
    public void initialize() {
        tempMap.clear();
        loopCount = 0;
    }
 
    // 終了メソッド
    public void terminate() {
        // 処理なし
    }
}
外部定義関数定義ファイルの作成例
<?xml version="1.0" encoding="UTF-8"?>
<!-- All Rights Reserved. Copyright (C) 2013, Hitachi, Ltd.  -->
<root:ExternalFunctionDefinition
xmlns:root="http://www.hitachi.co.jp/soft/xml/sdp/function"
xmlns:group="http://www.hitachi.co.jp/soft/xml/sdp/function/functiongroup">
    <!-- 外部定義関数グループ定義 -->
    <group:FunctionGroup name="FG1" path="samples/external/src">
        <!-- 関数定義 -->
        <group:StreamFunction name="FUNC1" class="samples.ExternalStreamFunction">
            <!-- 戻り値定義 -->
            <group:ReturnInformation name="R1" type="INT" />
            <group:ReturnInformation name="R2" type="VARCHAR(10)" />
            <group:ReturnInformation name="R3" type="BIGINT" />
            <group:ReturnInformation name="R4" type="TIMESTAMP(9)" />
        </group:StreamFunction>
    </group:FunctionGroup>
</root:ExternalFunctionDefinition>
クエリ定義ファイルの作成例
REGISTER STREAM DATA0(ID VARCHAR(10), VAL BIGINT);
 
// 入力ストリームDATA0から、カラムID毎に最新のタプルを保持するクエリ
REGISTER QUERY Q1 SELECT ID, VAL FROM DATA0[PARTITION BY ID ROWS 1];
// 最新のタプルを出力するクエリ
REGISTER QUERY Q2 ISTREAM (SELECT * FROM Q1);
// 直前のタプルを出力するクエリ
REGISTER QUERY Q3 DSTREAM (SELECT * FROM Q1);
// 入力タプルのカラムID毎にカラムVALの値を合計し、乗数3で乗算した結果を出力するクエリ
REGISTER QUERY SUM1 $*FG1.FUNC1[3](Q2,Q3);

上記の作成例での入力ストリームDATA0と出力ストリームSUM1の関係を,一例として次の図に示します。

図10-2 入力ストリームDATA0と出力ストリームSUM1の関係

[図データ]

図中のクエリでは,次の四つの列を持つ出力ストリームSUM1を出力します。

図に示した一例でのexecuteStreamFuncメソッドの引数を次に示します。

外部定義関数の呼び出し回数 第1引数 第2引数
Collection<Object[]>[0]
(出力ストリームQ2)
Collection<Object[]>[1]
(出力ストリームQ3)
Timestamp
(タプルのタイムスタンプ)
Object[0]
(ID)
Object[1]
(VAL)
Object[0]
(ID)
Object[1]
(VAL)
1回目 A 100 なし なし 10:00:00
2回目 B 200 A 100 10:00:01
A 300 なし なし