ストリームデータ処理基盤 uCosminexus Stream Data Platform - Application Framework システム構築・運用ガイド
ここでは,次のサンプルファイルに記述されているアダプター構成定義ファイルの記述例を示したあとに,記述例の内容を説明します。
<インストールディレクトリ>\samples\httppacket\conf\xml\AdaptorCompositionDefinition.xml
<?xml version="1.0" encoding="UTF-8"?> <!-- All Rights Reserved. Copyright (C) 2010, Hitachi, Ltd. --> <root:AdaptorCompositionDefinition xmlns:root="http://www.hitachi.co.jp/soft/xml/sdp/adaptor/definition" xmlns:cmn="http://www.hitachi.co.jp/soft/xml/sdp/adaptor/definition/common" xmlns:adp="http://www.hitachi.co.jp/soft/xml/sdp/adaptor/definition/adaptor" xmlns:cb="http://www.hitachi.co.jp/soft/xml/sdp/adaptor/definition/callback" xmlns:hpicon="http://www.hitachi.co.jp/soft/xml/sdp/adaptor/definition/callback/HttpPacketInputConnectorDefinition" xmlns:map="http://www.hitachi.co.jp/soft/xml/sdp/adaptor/definition/callback/MappingDefinition" xmlns:filter="http://www.hitachi.co.jp/soft/xml/sdp/adaptor/definition/callback/FilterDefinition" xmlns:rex="http://www.hitachi.co.jp/soft/xml/sdp/adaptor/definition/callback/RecordExtractionDefinition" xmlns:docon="http://www.hitachi.co.jp/soft/xml/sdp/adaptor/definition/callback/DashboardOutputConnectorDefinition" > <!-- 共通定義 --> <cmn:CommonDefinition> <!-- アダプタトレース定義 --> <cmn:AdaptorTraceDefinition trace="OFF"/> </cmn:CommonDefinition> <!-- ↓↓↓↓↓↓↓↓ インプロセスグループ定義 ↓↓↓↓↓↓↓↓ --> <!-- インプロセスグループ定義1 --> <adp:InprocessGroupDefinition name="InprocessAPTest"> <!-- ↓↓↓↓↓↓ 入力アダプタ定義 複数定義可能 ↓↓↓↓↓↓ --> <!-- 入力アダプタ定義1 --> <adp:InputAdaptorDefinition name="InputAdaptor1" interval="0" charCode="MS932" lineFeed="CR_LF"> <!-- 入力用CB定義 --> <cb:InputCBDefinition class="jp.co.Hitachi.soft.sdp.adaptor.callback.io.packetinput.HttpPacketInputCBImpl" name="inputer"> <!-- パケット入力コネクタ定義 --> <hpicon:HttpPacketInputConnectorDefinition> <hpicon:input buffersize="4096" assemblingtime="2000"> <hpicon:packetdata globalheader="24" packetheader="16" packetoffset="8" packetlength="4" timeoffset="0"/> <hpicon:command path="C:\Program Files\WinDump\WinDump.exe" parameter=" -i 1 -s 2048 -w - -n "tcp port 80 or port 8080""/> </hpicon:input > <hpicon:output unit="100"> <hpicon:record name="REQUEST" type="REQUEST" > <hpicon:field name="SEND_IP"/> <hpicon:field name="RECEIVE_IP"/> <hpicon:field name="SEND_PORT"/> <hpicon:field name="RECEIVE_PORT"/> <hpicon:field name="TARGET_URI"/> <hpicon:field name="TIME"/> </hpicon:record > <hpicon:record name="RESPONSE" type="RESPONSE" > <hpicon:field name="SEND_IP"/> <hpicon:field name="RECEIVE_IP"/> <hpicon:field name="SEND_PORT"/> <hpicon:field name="RECEIVE_PORT"/> <hpicon:field name="TIME"/> </hpicon:record > </hpicon:output> </hpicon:HttpPacketInputConnectorDefinition> </cb:InputCBDefinition> <!-- データ編集用CB定義 --> <cb:DataEditCBDefinition class="jp.co.Hitachi.soft.sdp.adaptor.callback.dataedit.filter.FilterCBImpl" name="editor1"> <!-- フィルター定義 --> <filter:FilterDefinition> <!-- レコード条件 --> <filter:record source="REQUEST" conditionName="filterName1" condition="AND"> <!-- フィールド条件 --> <filter:field source="TARGET_URI" condition="eq" value="http:\/\/www.*"/> </filter:record> </filter:FilterDefinition> </cb:DataEditCBDefinition> <!-- データ編集用CB定義 --> <cb:DataEditCBDefinition class="jp.co.Hitachi.soft.sdp.adaptor.callback.dataedit.recordextract.RecordExtractionCBImpl" name="editor2"> <!-- レコード抽出定義 --> <rex:RecordExtractionDefinition> <!-- 抽出対象レコード群定義 --> <rex:targetrecords> <rex:targetrecord name="element1"> <rex:record source="REQUEST" timeposition="TIME" condition="AND"> <rex:field source="SEND_PORT" condition="ne" value="0"/> </rex:record> </rex:targetrecord> <rex:targetrecord name="element2"> <rex:record source="RESPONSE" timeposition="TIME" condition="AND"> <rex:field source="RECEIVE_PORT" condition="ne" value="0"/> </rex:record> </rex:targetrecord> </rex:targetrecords> <!-- 抽出条件群定義 --> <rex:extractions size="100000" timeout="OFF" samerecord="overwrite"> <rex:extraction name="BIND_PACKET" timelimit="10000"> <rex:targets> <rex:target sourceL="element1" sourceR="element2" condition="AND"> <rex:fieldcondition sourceL="SEND_IP" condition="eq" sourceR="RECEIVE_IP"/> <rex:fieldcondition sourceL="RECEIVE_IP" condition="eq" sourceR="SEND_IP"/> <rex:fieldcondition sourceL="SEND_PORT" condition="eq" sourceR="RECEIVE_PORT"/> <rex:fieldcondition sourceL="RECEIVE_PORT" condition="eq" sourceR="SEND_PORT"/> </rex:target> </rex:targets> <!-- 抽出レコード定義 --> <rex:extractrecord name="BINDRECORD"> <rex:select source="element1"/> <rex:select source="element2"/> </rex:extractrecord> </rex:extraction> </rex:extractions> </rex:RecordExtractionDefinition> </cb:DataEditCBDefinition> <!-- データ編集用CB定義 --> <cb:DataEditCBDefinition class="jp.co.Hitachi.soft.sdp.adaptor.callback.dataedit.mapping.InputMappingCBImpl" name="editor3"> <!-- マッピング定義 --> <map:MappingDefinition ioType="INPUT"> <map:source/> <map:target> <map:records> <map:record name="RESULT"> <map:field name="SEND_IP" type="STRING"/> <map:field name="RECEIVE_IP" type="STRING"/> <map:field name="SEND_PORT" type="INT"/> <map:field name="RECEIVE_PORT" type="INT"/> <map:field name="URI" type="STRING"/> <map:field name="SUBTIME" type="LONG"/> <map:field name="TIME" type="TIMESTAMP"/> </map:record> </map:records> </map:target> <map:intermediate> <map:mappings source="BINDRECORD" target="RESULT"> <map:map source="element1_SEND_IP" target="SEND_IP"/> <map:map source="element1_RECEIVE_IP" target="RECEIVE_IP"/> <map:map source="element1_SEND_PORT" target="SEND_PORT"/> <map:map source="element1_RECEIVE_PORT" target="RECEIVE_PORT"/> <map:map source="element1_TARGET_URI" target="URI"/> <map:map function="subTime" argument1="element1_TIME" argument2="element2_TIME" target="SUBTIME"/> <map:map source="element2_TIME" target="TIME"/> </map:mappings> </map:intermediate> </map:MappingDefinition> </cb:DataEditCBDefinition> <!-- データ編集用CB定義 --> <cb:DataEditCBDefinition class="jp.co.Hitachi.soft.sdp.adaptor.callback.dataedit.mapping.InputMappingCBImpl" name="editor4"> <!-- マッピング定義 --> <map:MappingDefinition ioType="INPUT"> <map:source/> <map:target> <map:streams> <map:stream name="s1" querygroup="Inprocess_QueryGroupTest"> <map:column name="sendip" type="STRING"/> <map:column name="receiveip" type="STRING"/> <map:column name="sendport" type="INT"/> <map:column name="receiveport" type="INT"/> <map:column name="uri" type="STRING"/> <map:column name="subtime" type="LONG"/> <map:column name="time" type="TIMESTAMP"/> </map:stream> </map:streams> </map:target> <map:intermediate> <map:mappings source="RESULT" querygroup="Inprocess_QueryGroupTest" target="s1"> <map:map source="SEND_IP" target="sendip"/> <map:map source="RECEIVE_IP" target="receiveip"/> <map:map source="SEND_PORT" target="sendport"/> <map:map source="RECEIVE_PORT" target="receiveport"/> <map:map source="URI" target="uri"/> <map:map source="SUBTIME" target="subtime"/> <map:map source="TIME" target="time"/> </map:mappings> </map:intermediate> </map:MappingDefinition> </cb:DataEditCBDefinition> <!-- 送信用CB定義 --> <cb:SendCBDefinition class="jp.co.Hitachi.soft.sdp.adaptor.callback.sendreceive.SendCBImpl" name="sender"> <cb:streamInfo name="s1" querygroup="Inprocess_QueryGroupTest"/> </cb:SendCBDefinition> </adp:InputAdaptorDefinition> <!-- ↑↑↑↑↑↑ 入力アダプタ定義 複数定義可能 ↑↑↑↑↑↑ --> <!-- ↓↓↓↓↓↓ 出力アダプタ定義 複数定義可能 ↓↓↓↓↓↓ --> <adp:OutputAdaptorDefinition name="OutputAdaptor1" charCode="MS932" lineFeed="CR_LF"> <!-- 受信用CB定義 --> <cb:ReceiveCBDefinition class="jp.co.Hitachi.soft.sdp.adaptor.callback.sendreceive.ReceiveCBImpl" name="receiver"> <cb:streamInfo name="q1" querygroup="Inprocess_QueryGroupTest"/> </cb:ReceiveCBDefinition> <!-- データ編集用CB定義 --> <cb:DataEditCBDefinition class="jp.co.Hitachi.soft.sdp.adaptor.callback.dataedit.mapping.OutputMappingCBImpl" name="editor1"> <!-- マッピング定義 --> <map:MappingDefinition ioType="OUTPUT"> <map:source> <map:streams> <map:stream name="q1" querygroup="Inprocess_QueryGroupTest"> <map:column name="sendip" type="STRING"/> <map:column name="receiveip" type="STRING"/> <map:column name="sendport" type="INT"/> <map:column name="receiveport" type="INT"/> <map:column name="uri" type="STRING"/> <map:column name="subtime" type="LONG"/> <map:column name="time" type="TIMESTAMP"/> </map:stream> </map:streams> </map:source> <map:target/> <map:intermediate> <map:mappings source="q1" querygroup="Inprocess_QueryGroupTest" target="RECORD1"> <map:map source="sendip" target="SEND_IP"/> <map:map source="receiveip" target="RECEIVE_IP"/> <map:map source="sendport" target="SEND_PORT"/> <map:map source="receiveport" target="RECEIVE_PORT"/> <map:map source="uri" target="URI"/> <map:map source="subtime" target="SUBTIME"/> <map:map source="time" target="TIME"/> </map:mappings> </map:intermediate> </map:MappingDefinition> </cb:DataEditCBDefinition> <!-- データ編集用CB定義 --> <cb:DataEditCBDefinition class="jp.co.Hitachi.soft.sdp.adaptor.callback.dataedit.mapping.InputMappingCBImpl" name="editor2"> <!-- マッピング定義 --> <map:MappingDefinition ioType="OUTPUT"> <map:source/> <map:target> <map:records> <map:record name="RECORD2" > <map:field name="SEND_IP" type="STRING"/> <map:field name="RECEIVE_IP" type="STRING"/> <map:field name="SEND_PORT" type="INT"/> <map:field name="RECEIVE_PORT" type="INT"/> <map:field name="URI" type="STRING"/> <map:field name="SUBTIME" type="LONG"/> <map:field name="TIME" type="TIMESTAMP"/> <map:field name="GET_TUPLE_TIME" type="TIMESTAMP"/> </map:record> </map:records> </map:target> <map:intermediate> <map:mappings source="RECORD1" target="RECORD2"> <map:map source="SEND_IP" target="SEND_IP"/> <map:map source="RECEIVE_IP" target="RECEIVE_IP"/> <map:map source="SEND_PORT" target="SEND_PORT"/> <map:map source="RECEIVE_PORT" target="RECEIVE_PORT"/> <map:map source="URI" target="URI"/> <map:map source="SUBTIME" target="SUBTIME"/> <map:map source="TIME" target="TIME"/> <map:map function="getTupleTime" target="GET_TUPLE_TIME"/> </map:mappings> </map:intermediate> </map:MappingDefinition> </cb:DataEditCBDefinition> <!-- 出力用CB定義 --> <cb:OutputCBDefinition class="jp.co.Hitachi.soft.sdp.adaptor.callback.io.dashboard.DashboardOutputCBImpl" name="outputer"> <!-- ダッシュボード出力コネクタ定義 --> <docon:DashboardOutputConnectorDefinition Record="RECORD2"> <docon:RecordHoldTime DateReference="LAST_UPDATE" RecordTime="300" DateFieldPosition="8" /> </docon:DashboardOutputConnectorDefinition> </cb:OutputCBDefinition> </adp:OutputAdaptorDefinition> <!-- ↑↑↑↑↑↑ 出力アダプタ定義 複数定義可能 ↑↑↑↑↑↑ --> </adp:InprocessGroupDefinition> <!-- ↑↑↑↑↑↑↑↑ インプロセスグループ定義 ↑↑↑↑↑↑↑↑ --> </root:AdaptorCompositionDefinition>
この記述例では,標準提供アダプターとSDPサーバは,インプロセスで連携します。入力アダプター「InputAdaptor1」では,HTTPパケット情報の中から対応するリクエスト情報とレスポンス情報を結合し,それぞれが持っていた時刻情報の差から通信時間を算出します。出力アダプター「OutputAdaptor1」では,HTTP通信情報を取得し,現在時刻から過去5分間の情報をダッシュボード出力コネクターで出力します。
入力アダプター「InputAdaptor1」では,次の表に示す処理を実施します。なお,括弧内は,アダプター構成定義ファイルでの定義名です。
コールバックの種類 | コールバックでの処理 |
---|---|
入力用コールバック(入力用CB定義) | HTTPパケットの入力(HTTPパケット入力コネクター定義) |
編集用コールバック(編集用CB定義) | レコードのフィルタリング(フィルター定義) |
レコードの抽出(レコード抽出定義) | |
レコード間のマッピング(マッピング定義) | |
レコードとストリーム間のマッピング(マッピング定義) | |
送信用コールバック(送信用CB定義) | タプル送信(入力ストリーム定義) |
入力アダプター「InputAdaptor1」の各定義の内容を次に示します。
出力アダプター「OutputAdaptor1」では,次の表に示す処理を実施します。なお,括弧内は,アダプター構成定義ファイルでの定義名です。
コールバックの種類 | コールバックでの処理 |
---|---|
受信用コールバック(受信用CB定義) | タプル受信(出力ストリーム定義) |
編集用コールバック(編集用CB定義) | レコードとストリーム間のマッピング(マッピング定義) |
レコード間のマッピング(マッピング定義) | |
出力用コールバック(出力用CB定義) | ダッシュボードへの出力(ダッシュボード出力コネクター定義) |
出力アダプター「OutputAdaptor1」の各定義の内容を次に示します。
この記述例の場合のクエリ定義ファイルを次に示します。
REGISTER STREAM s1(sendip VARCHAR(15),receiveip VARCHAR(15),sendport INTEGER,receiveport INTEGER,uri VARCHAR(255),subtime BIGINT,times TIMESTAMP(9)); REGISTER QUERY q1 RSTREAM(SELECT * FROM s1[RANGE 5 MINUTE]);
クエリの定義については,マニュアル「uCosminexus Stream Data Platform - Application Framework アプリケーション開発ガイド」を参照してください。
All Rights Reserved. Copyright (C) 2010, 2014, Hitachi, Ltd.