ストリームデータ処理基盤 uCosminexus Stream Data Platform - Application Framework アプリケーション開発ガイド

[目次][索引][前へ][次へ]

2.6.1 基本的なクエリの定義例

ここでは,ウィンドウ演算,関係演算およびストリーム化演算を使用した,基本的なクエリの定義例として,株価情報を対象としたクエリの定義例を示します。

この例で扱う株価情報ストリームデータの例を次の図に示します。

図2-18 株価情報ストリームデータの例

[図データ]

この例で扱うストリームデータは,銘柄コード(stockID),銘柄名(stockName),株価(currentPrice)および出来高(tradingVolume)という株価情報を持つデータに対して,Stream Data Platform - AFでタイムスタンプを設定したストリームデータです。

このストリームデータを使用して,株価の値上がり率を計算します。値上がり率は,現在のデータと1分前のデータを比較して算出します。

値上がり率を算出するためには,次の2種類のクエリを使用します。

1分間データ算出クエリ
 
REGISTER QUERY stockDStream
DSTREAM ( SELECT * FROM stock[RANGE 1 MINUTE] );
 
  • 範囲を1分間と指定したRANGEウィンドウによって,入力リレーションのタプルを1分間保持します。
  • DSTREAMを指定して,1分間の生存期間が終了したタプルを出力します。これによって,現在から1分前のデータが,ストリームデータとして出力されます。
値上がり率計算クエリ
 
REGISTER QUERY risingRate
ISTREAM ( SELECT stock.stockID, stock.stockName,
         stock.currentPrice / stockDStream.currentPrice AS stockRate
         FROM stock[PARTITION BY stock.stockID ROWS 1],
         stockDStream[PARTITION BY stockDStream.stockID ROWS 1]
         WHERE stock.stockID = stockDStream.stockID );
 
  • PARTITION BYウィンドウを使用して,銘柄ごとに最新の1件のタプルを保持します。
  • 現在の株価情報ストリームデータと1分前のストリームデータを統合して,値上がり率を計算します。

これらのクエリを使用した演算処理の流れを次の図に示します。

図2-19 株価値上がり率を算出する演算処理の流れ

[図データ]

処理の流れについて説明します。説明の番号は図中の番号と対応しています。

  1. [クエリstockDStreamのウィンドウ演算]
    ウィンドウ演算を実行します。
    「FROM stock」の指定によって入力したストリームstockを,「[RANGE 1 MINUTE]」の指定によって1分間保持します。
  2. [クエリstockDStreamのストリーム化演算]
    1分経過したデータは,ストリーム化演算「DSTREAM」によってストリームデータ(1分前データ)として出力されます。
  3. [クエリrisingRateのウィンドウ演算]
    ストリームstockと,2.で出力した結果ストリーム(1分前データ)の2種類のデータを「FROM stock…, stockDStream…」の指定によって入力します。
    「[PARTITION BY… ROWS 1]」の指定によって,入力データを銘柄コード(stockID)ごとにグルーピングした上で,銘柄コードごとの最新1件のデータを保持します。これによって,銘柄コードごとに,次のデータが保持されます。
    • 入力ストリームstockによる現在の株価データのうち,最新の1件
    • 入力ストリームstockDStream(クエリstockDStreamの結果ストリーム)による1分前の株価データのうち,最新の1件
  4. [クエリstockDStreamの関係演算]
    銘柄コードごとに,銘柄コード,銘柄名,および値上がり率(現在の株価/1分前の株価)を結合します。
  5. [クエリstockDStreamストリーム化演算]
    同一銘柄の1分前データと現在データが到着したタイミングで,4.の結合データを結果ストリームとして出力します。