ストリームデータ処理基盤 uCosminexus Stream Data Platform - Application Framework システム構築・運用ガイド
ここでは,次のサンプルファイルに記述されているアダプター構成定義ファイルの記述例を示したあとに,記述例の内容を説明します。
<インストールディレクトリ>\samples\file\conf\xml\AdaptorCompositionDefinition.xml
<?xml version="1.0" encoding="UTF-8"?> <!-- All Rights Reserved. Copyright (C) 2010, Hitachi, Ltd. --> <root:AdaptorCompositionDefinition xmlns:root="http://www.hitachi.co.jp/soft/xml/sdp/adaptor/definition" xmlns:cmn="http://www.hitachi.co.jp/soft/xml/sdp/adaptor/definition/common" xmlns:adp="http://www.hitachi.co.jp/soft/xml/sdp/adaptor/definition/adaptor" xmlns:cb="http://www.hitachi.co.jp/soft/xml/sdp/adaptor/definition/callback" xmlns:ficon="http://www.hitachi.co.jp/soft/xml/sdp/adaptor/definition/callback/FileInputConnectorDefinition" xmlns:focon="http://www.hitachi.co.jp/soft/xml/sdp/adaptor/definition/callback/FileOutputConnectorDefinition" xmlns:form="http://www.hitachi.co.jp/soft/xml/sdp/adaptor/definition/callback/FormatDefinition" xmlns:map="http://www.hitachi.co.jp/soft/xml/sdp/adaptor/definition/callback/MappingDefinition"> <!-- 共通定義 --> <cmn:CommonDefinition> <!-- アダプタトレース定義 --> <cmn:AdaptorTraceDefinition trace="OFF"/> </cmn:CommonDefinition> <!-- ↓↓↓↓↓↓↓↓ インプロセスグループ定義 ↓↓↓↓↓↓↓↓ --> <!-- インプロセスグループ定義1 --> <adp:InprocessGroupDefinition name="InprocessAPTest"> <!-- ↓↓↓↓↓↓ 入力アダプタ定義 複数定義可能 ↓↓↓↓↓↓ --> <!-- 入力アダプタ定義1 --> <adp:InputAdaptorDefinition name="InputAdaptor1" interval="0" charCode="MS932" lineFeed="CR_LF"> <!-- 入力用CB定義 --> <cb:InputCBDefinition class="jp.co.Hitachi.soft.sdp.adaptor.callback.io.FileInputCBImpl" name="inputer"> <!-- 入力コネクタ定義 --> <ficon:FileInputConnectorDefinition> <ficon:input readType="REAL_TIME" interval="1000" retryCount="100" readUnit="1"/> <ficon:file path="C:\temp\input\Inprocess\" name="Inprocess_Data1.csv" messageLog="OFF"/> </ficon:FileInputConnectorDefinition> </cb:InputCBDefinition> <!-- データ編集用CB定義 --> <cb:DataEditCBDefinition class="jp.co.Hitachi.soft.sdp.adaptor.callback.dataedit.formattranslate.InputFormatTranslatorCBImpl" name="editor1"> <!-- フォーマット変換定義 --> <form:FormatDefinition ioType="INPUT"> <form:common> <form:unmatchedFormat>ERROR</form:unmatchedFormat> </form:common> <form:records> <form:record name="RECORD0" exp="($_name),($_num)"> <form:field name="name" type="STRING"/> <form:field name="num" type="LONG"/> </form:record> </form:records> </form:FormatDefinition> </cb:DataEditCBDefinition> <!-- データ編集用CB定義 --> <cb:DataEditCBDefinition class="jp.co.Hitachi.soft.sdp.adaptor.callback.dataedit.mapping.InputMappingCBImpl" name="editor3"> <!-- マッピング定義 --> <map:MappingDefinition ioType="INPUT"> <map:source/> <map:target> <map:streams> <map:stream name="DATA0" querygroup="Inprocess_QueryGroupTest"> <map:column name="name" type="STRING"/> <map:column name="num" type="LONG"/> </map:stream> </map:streams> </map:target> <map:intermediate> <map:mappings source="RECORD0" querygroup="Inprocess_QueryGroupTest" target="DATA0"> <map:map source="name" target="name"/> <map:map source="num" target="num"/> </map:mappings> </map:intermediate> </map:MappingDefinition> </cb:DataEditCBDefinition> <!-- 送信用CB定義 --> <cb:SendCBDefinition class="jp.co.Hitachi.soft.sdp.adaptor.callback.sendreceive.SendCBImpl" name="sender"> <cb:streamInfo name="DATA0" querygroup="Inprocess_QueryGroupTest"/> </cb:SendCBDefinition> </adp:InputAdaptorDefinition> <!-- ↑↑↑↑↑↑ 入力アダプタ定義 複数定義可能 ↑↑↑↑↑↑ --> <!-- ↓↓↓↓↓↓ 出力アダプタ定義 複数定義可能 ↓↓↓↓↓↓ --> <adp:OutputAdaptorDefinition name="OutputAdaptor1" charCode="MS932" lineFeed="CR_LF"> <!-- 受信用CB定義 --> <cb:ReceiveCBDefinition class="jp.co.Hitachi.soft.sdp.adaptor.callback.sendreceive.ReceiveCBImpl" name="receiver"> <cb:streamInfo name="FILTER1" querygroup="Inprocess_QueryGroupTest"/> </cb:ReceiveCBDefinition> <!-- データ編集用CB定義 --> <cb:DataEditCBDefinition class="jp.co.Hitachi.soft.sdp.adaptor.callback.dataedit.mapping.OutputMappingCBImpl" name="editor1"> <!-- マッピング定義 --> <map:MappingDefinition ioType="OUTPUT"> <map:source> <map:streams> <map:stream name="FILTER1" querygroup="Inprocess_QueryGroupTest"> <map:column name="name" type="STRING"/> <map:column name="num" type="LONG"/> </map:stream> </map:streams> </map:source> <map:target/> <map:intermediate> <map:mappings source="FILTER1" querygroup="Inprocess_QueryGroupTest" target="RECORD1"> <map:map source="name" target="name"/> <map:map source="num" target="num"/> </map:mappings> </map:intermediate> </map:MappingDefinition> </cb:DataEditCBDefinition> <!-- データ編集用CB定義 --> <cb:DataEditCBDefinition class="jp.co.Hitachi.soft.sdp.adaptor.callback.dataedit.formattranslate.OutputFormatTranslatorCBImpl" name="editor3"> <!-- フォーマット変換定義 --> <form:FormatDefinition ioType="OUTPUT"> <form:common/> <form:records> <form:record name="RECORD1" exp="($_name),($_num)"> <form:field name="name" type="STRING"/> <form:field name="num" type="LONG"/> </form:record> </form:records> </form:FormatDefinition> </cb:DataEditCBDefinition> <!-- 出力用CB定義 --> <cb:OutputCBDefinition class="jp.co.Hitachi.soft.sdp.adaptor.callback.io.FileOutputCBImpl" name="outputer"> <!-- 出力コネクタ定義 --> <focon:FileOutputConnectorDefinition> <focon:output compositionType="WRAP_AROUND" maxNumber="256" maxSize="1000"/> <focon:file path="C:\temp\output\Inprocess\" prefix="out" addDate="OFF" extension="csv"/> </focon:FileOutputConnectorDefinition> </cb:OutputCBDefinition> </adp:OutputAdaptorDefinition> <!-- ↑↑↑↑↑↑ 出力アダプタ定義 複数定義可能 ↑↑↑↑↑↑ --> </adp:InprocessGroupDefinition> <!-- ↑↑↑↑↑↑↑↑ インプロセスグループ定義 ↑↑↑↑↑↑↑↑ --> </root:AdaptorCompositionDefinition>
この記述例では,標準提供アダプターとSDPサーバは,インプロセスで連携します。入力アダプター「InputAdaptor1」でファイルを入力し,ストリームデータの集計・分析結果のデータは,出力アダプター「OutputAdaptor1」でファイルへ出力します。
入力アダプター「InputAdaptor1」では,次の表に示す処理を実施します。なお,括弧内は,アダプター構成定義ファイルでの定義名です。
コールバックの種類 | コールバックでの処理 |
---|---|
入力用コールバック(入力用CB定義) | ファイルの入力(ファイル入力コネクター定義) |
編集用コールバック(編集用CB定義) | フォーマット変換(フォーマット変換定義) |
レコードとストリーム間のマッピング(マッピング定義) | |
送信用コールバック(送信用CB定義) | タプル送信(入力ストリーム定義) |
入力アダプター「InputAdaptor1」の各定義の内容を次に示します。
出力アダプター「OutputAdaptor1」では,次の表に示す処理を実施します。なお,括弧内は,アダプター構成定義ファイルでの定義名です。
コールバックの種類 | コールバックでの処理 |
---|---|
受信用コールバック(受信用CB定義) | タプル受信(出力ストリーム定義) |
編集用コールバック(編集用CB定義) | レコードとストリーム間のマッピング(マッピング定義) |
フォーマット変換(フォーマット変換定義) | |
出力用コールバック(出力用CB定義) | ファイルへの出力(ファイル出力コネクター定義) |
出力アダプター「OutputAdaptor1」の各定義での定義内容を次に示します。
この記述例の場合のクエリ定義ファイルを次に示します。
register stream DATA0(name VARCHAR(10), num BIGINT); register query FILTER1 ISTREAM(SELECT * FROM DATA0[ROWS 1] WHERE DATA0.NUM <= 24);
クエリの定義については,マニュアル「uCosminexus Stream Data Platform - Application Framework アプリケーション開発ガイド」を参照してください。
All Rights Reserved. Copyright (C) 2010, 2014, Hitachi, Ltd.