Hitachi

OpenTP1 Version 7 OpenTP1 メッセージキューイング機能 TP1/Message Queue プログラム作成の手引


10.3.2 論理的順序と物理的順序

メッセージは各優先度内で,論理的順序または物理的順序で格納されます。

グループ,メッセージ,およびセグメントの概要については,「3.6 メッセージグループ」を参照してください。物理的順序と論理的順序では次に示す点が異なります。

キューでの論理的順序について,次の図に示します。

図10‒1 キューでの論理的順序

[図データ]

キューでのこれらのメッセージの順序を次に示します。

  1. メッセージA(グループにない)

  2. グループYの論理メッセージ1

  3. グループYの論理メッセージ2

  4. グループYの(最終)論理メッセージ3のセグメント1

  5. グループYの(最終)論理メッセージ3の(最終)セグメント2

  6. グループZの論理メッセージ1

  7. グループZの(最終)論理メッセージ2

  8. メッセージB(グループにない)

物理的順序は論理的順序とは異なります。各グループ内で最初の項目の物理的な位置が,グループ全体の論理的な位置を決定します。ここでは,グループYとZが同時に到着し,グループZのメッセージ2が,同じグループのメッセージ1に先行する場合でのキューでの物理的順序について,次の図に示します。

図10‒2 キューでの物理的順序

[図データ]

キューでのこれらのメッセージの順序を次に示します。

  1. メッセージA(グループにない)

  2. グループYの論理メッセージ1

  3. グループZの論理メッセージ2

  4. グループYの論理メッセージ2

  5. グループYの(最終)論理メッセージ3のセグメント1

  6. グループYの(最終)論理メッセージ3の(最終)セグメント2

  7. グループZの論理メッセージ1

  8. メッセージB(グループにない)

メッセージを取り出すときに,物理的順序ではなく論理的順序でメッセージを取り出すことを指定するには,MQGMO_LOGICAL_ORDERを指定します。

MQGMO_BROWSE_FIRSTおよびMQGMO_LOGICAL_ORDERを指定するMQGET命令を発行する場合は,以降のMQGMO_BROWSE_NEXT指定のMQGET命令でもMQGMO_LOGICAL_ORDERを指定してください。逆に,MQGMO_LOGICAL_ORDERを指定しないMQGMO_BROWSE_FIRSTのMQGET命令を発行する場合は,以降のMQGMO_BROWSE_NEXT指定のMQGET命令でもMQGMO_LOGICAL_ORDERを指定しないでください。

キューのメッセージを検索するMQGET命令でキューマネジャが保存するグループ情報とセグメント情報は,キューからメッセージを削除するMQGET命令でキューマネジャが保存する情報とは異なります。MQGMO_BROWSE_FIRST指定時には,キューマネジャは検索用のグループ情報とセグメント情報を無視し,現在のグループと現在の論理メッセージがないかのようにキューを検索します。

注意

MQGMO_LOGICAL_ORDERを指定しないときにメッセージグループまたはグループ内にない論理メッセージの最後を越えて,MQGET命令を使用して検索する場合には,特に注意が必要です。例えば,グループ内で最後のメッセージがグループ内で最初のメッセージよりキューで先行する場合に,グループの最後を越えるMQGMO_BROWSE_NEXTを指定し,MsgSeqNumberフィールドに1を設定しながらMQMO_MATCH_MSG_SEQ_NUMBERを指定するときは,検索済みのグループ内で最初のメッセージが再度返されます。このことは,間に他グループを挟んで複数のMQGET命令を発行する場合にも発生します。

キューを検索用に2回オープンすることによって,無限ループを回避できます。

通常は,検索時にアプリケーションで論理的順序または物理的順序を選択します。しかし,二つのモードを切り替える場合は,最初に検索をMQGMO_LOGICAL_ORDERで発行したときに論理的順序での位置が設定されることに注意してください。

検索カーソルがグループ内にいったん入ると,最初のメッセージが削除されても同じグループ内にあり続けます。また,最初の項目がないグループにはMQGMO_LOGICAL_ORDERを使用して移動できません。

〈この項の構成〉

(1) 論理メッセージのグループ化

グループ内の論理メッセージを使用するのは,次に示す場合です。

どちらの場合も,グループ全体の取り出しは,同じ取り出しアプリケーションのインスタンスで実行してください。

例えば,グループを四つの論理メッセージで構成するとします。登録アプリケーションの記述例を次に示します。

PMO.Options = MQPMO_LOGICAL_ORDER | MQPMO_SYNCPOINT
 
dc_trn_begin()
 
MQPUT MD.MsgFlags = MQMF_MSG_IN_GROUP
MQPUT MD.MsgFlags = MQMF_MSG_IN_GROUP
MQPUT MD.MsgFlags = MQMF_MSG_IN_GROUP
MQPUT MD.MsgFlags = MQMF_LAST_MSG_IN_GROUP
 
dc_trn_unchained_commit()

取り出しアプリケーションではグループ内の全メッセージが到着するまで処理を開始しないことにします。そのため,グループ内の最初のメッセージにはMQGMO_ALL_MSGS_AVAILABLEを指定します。グループ内の以降のメッセージではこのオプションは無視されます。

グループで最初の論理メッセージを取り出したら,グループの残りの論理メッセージを順に取り出すためにMQGMO_LOGICAL_ORDERを使用します。

例えば,取り出しアプリケーションの記述例を次に示します。

/* グループ内の最初のメッセージ,
   またはグループ外のメッセージ待ち合わせ */
GMO.Options = MQGMO_SYNCPOINT | MQGMO_WAIT
            | MQGMO_ALL_MSGS_AVAILABLE | MQGMO_LOGICAL_ORDER
dc_trn_begin()
MQGET /* 先頭メッセージの処理 */
while ( GMO.GroupStatus == MQGS_MSG_IN_GROUP ) {
  MQGET
  /* グループ内の残りのメッセージの処理 */
  ...
}
if(GMO.GroupStatus != MQGS_LAST_MSG_IN_GROUP) 
  MQGET /* MQGS_LAST_MSG_IN_GROUPメッセージの取り出し */
 
dc_trn_unchained_commit()

(2) 複数トランザクションにわたるグループの登録と取り出し

説明したように,すべてのグループが登録されトランザクションがコミットされるまでは,ノード(あて先がリモートの場合)を離れられないし,メッセージまたはセグメントの取り出しを開始できません。グループ全体を登録するのに時間が掛かる場合や,ノード上でキューのスペースが限られている場合には,不便なときがあります。これを回避するために,グループを複数のトランザクションで登録できます。

グループが複数のトランザクションで登録される場合には,登録アプリケーションの失敗が発生するときでも幾つかのグループでコミットが可能です。そのためにアプリケーションでは,ステータス情報を保存して各トランザクションをコミットすることによって,完了しなかったグループをあとで開始するときにステータス情報を使用できるようにします。最も簡単な方法はステータスキューにステータス情報を保存することです。グループ全体が登録に成功すると,ステータスキューは空になります。

セグメント分割を使用する場合も同様です。ステータス情報にOffsetを含めてください。

複数のトランザクションに分けたグループの登録例を次に示します。

PMO.Options = MQPMO_LOGICAL_ORDER | MQPMO_SYNCPOINT
 
/* 最初のトランザクション */
 
dc_trn_begin()
MQPUT MD.MsgFlags = MQMF_MSG_IN_GROUP
MQPUT MD.MsgFlags = MQMF_MSG_IN_GROUP
MQPUT MD.MsgFlags = MQMF_MSG_IN_GROUP
StatusInfo = MQMD構造体のGroupIdとMsgSeqNumber
MQPUT (ステータス情報をステータスキューへ) PMO.Options =
                                              MQPMO_SYNCPOINT
dc_trn_chained_commit()
 
/* 2回目以降のトランザクション */
 
MQPUT MD.MsgFlags = MQMF_MSG_IN_GROUP
MQPUT MD.MsgFlags = MQMF_MSG_IN_GROUP
MQPUT MD.MsgFlags = MQMF_MSG_IN_GROUP
MQGET (ステータスキューから) GMO.Options = MQGMO_SYNCPOINT
StatusInfo = MQMD構造体のGroupIdとMsgSeqNumber
MQPUT (ステータス情報をステータスキューへ) PMO.Options =
                                              MQPMO_SYNCPOINT
dc_trn_chained_commit()
 
/* 最後のトランザクション */
 
MQPUT MD.MsgFlags = MQMF_MSG_IN_GROUP
MQPUT MD.MsgFlags = MQMF_MSG_IN_GROUP
MQPUT MD.MsgFlags = MQMF_LAST_MSG_IN_GROUP
MQGET (ステータスキューから) GMO.Options = MQGMO_SYNCPOINT
dc_trn_unchained_commit()

すべてのトランザクションがコミットされると,グループ全体が成功し,ステータスキューは空になります。そうでない場合は,ステータス情報によって示される時点の状態で再度開始してください。MQPMO_LOGICAL_ORDERは最初の登録時には使用できませんが,以降の登録で使用できます。

再度開始するときの記述例を次に示します。

dc_trn_begin()
MQGET (ステータスキューから) GMO.Options = MQGMO_SYNCPOINT
if (Reason == MQRC_NO_MSG_AVAILABLE)
  /* 通常の処理 */
  ...
else
  /* グループが早期に中断 */
  MQMD構造体のGroupIdとMsgSeqNumberにステータス情報の値を設定
  PMO.Options = MQPMO_SYNCPOINT
  MQPUT MD.MsgFlags = MQMF_MSG_IN_GROUP
 
  /* 通常処理を再度開始
     最終メッセージではない */
  PMO.Options = MQPMO_LOGICAL_ORDER | MQPMO_SYNCPOINT
  MQPUT MD.MsgFlags = MQMF_MSG_IN_GROUP
  MQPUT MD.MsgFlags = MQMF_MSG_IN_GROUP
  StatusInfo = MQMD構造体のGroupIdとMsgSeqNumber
  MQPUT (ステータス情報をステータスキューへ) PMO.Options =
                                              MQPMO_SYNCPOINT
  dc_trn_unchained_commit()

取り出しアプリケーションでは,グループ全体が到着する前にメッセージの処理を開始できます。これによって,グループ内のメッセージについて応答時間を改善できます。また,グループ全体を格納する領域も不要になります。

回復のために,各メッセージはトランザクション内で取り出してください。また,上記の利点を使用できるように,メッセージの各グループに複数のトランザクションを使用してください。

対応する登録アプリケーションと同様に取り出しアプリケーションでも,各トランザクションがコミットされるときに自動的に保存されるステータス情報が必要です。最も簡単な方法はステータスキューに保存することです。グループ全体の処理に成功すると,ステータスキューは空になります。

注意

中間のトランザクションでは,各トランザクションで完全な新規メッセージを登録する代わりに,ステータスキューへの各MQPUT命令にメッセージのセグメントであることを指定(MQMF_SEGMENTを設定)することによって,ステータスキューからのMQGET命令を回避できます。最後のトランザクションでは,最終セグメントをステータスキューにMQMF_LAST_SEGMENT指定で登録し,その後のMQGMO_COMPLETE_MSG指定のMQGET命令でステータス情報を消去します。

再度開始するときの処理では,使用可能なステータスメッセージを取り出すためMQGET命令を1回発行する代わりに,最終セグメントに届くまでステータスキューをMQGMO_LOGICAL_ORDERで検索してください。再度開始したあとの最初のトランザクションでは,ステータスセグメントを登録するときにオフセットを明示的に指定してください。

次に示す例では,1グループのメッセージだけを考慮します。アプリケーションのバッファは,メッセージのセグメント分割の有無に関係なく,メッセージ全体を格納するのに十分な大きさがあるとします。そのため,各MQGET命令にはMQGMO_COMPLETE_MSGを指定します。セグメント分割する場合にも同じ方法を適用できます。その場合は,ステータス情報にOffsetを含めます。

単純化のため,一つのトランザクション内で最大四つのメッセージが取り出されるとします。

msgs = 0 /* トランザクション内での取り出しメッセージをカウント */
/* この時点でステータスメッセージなし */
dc_trn_begin()
MQGET
/* グループ内の残りのメッセージを取り出し */
while ( GroupStatus == MQGS_MSG_IN_GROUP ) {
 
  /* グループ内で最大4メッセージを処理 */
  GMO.Options = MQGMO_SYNCPOINT | MQGMO_WAIT
              | MQGMO_LOGICAL_ORDER
  while ( (GroupStatus == MQGS_MSG_IN_GROUP) && (msgs < 4) ) {
    MQGET
    msgs = msgs + 1
    /* 該当メッセージの処理 */
    ...
  /* whileの終わり */
  }
 
  /* 最終メッセージまたは4メッセージの取り出し完了 */
  /* グループの最終でない場合にステータスメッセージを更新 */
  MQGET (ステータスキューから) GMO.Options = MQGMO_SYNCPOINT
  if ( GroupStatus == MQGS_MSG_IN_GROUP )
    StatusInfo = MQMD構造体のGroupIdとMsgSeqNumber
    MQPUT (ステータス情報をステータスキューへ) PMO.Options =
                                              MQPMO_SYNCPOINT
  dc_trn_chained_commit()
  msgs = 0
/* whileの終わり */
}
 
If ( msgs > 0 )
  /* グループ内で1メッセージの場合はここ */
  dc_trn_chained_commit()

すべてのトランザクションがコミットされると,グループ全体が成功し,ステータスキューは空になります。コミットされない場合は,ステータス情報によって示される時点の状態で再度開始してください。MQGMO_LOGICAL_ORDERは最初の取り出し時には使用できませんが,以降の取り出しで使用できます。

再度開始するときの記述例を次に示します。

dc_trn_begin()
MQGET (ステータスキューから) GMO.Options = MQGMO_SYNCPOINT
if (Reason == MQRC_NO_MSG_AVAILABLE)
  /* 通常の処理 */
  ...
else
  /* グループが早期に中断 */
  /* グループの次メッセージは,ステータス情報からの
     シーケンス番号とグループIDに合わせて取り出して
     ください。 */
  GMO.Options = MQGMO_COMPLETE_MSG | MQGMO_SYNCPOINT | MQGMO_WAIT
  MQGET GMO.MatchOptions = MQMO_MATCH_GROUP_ID |
        MQMO_MATCH_MSG_SEQ_NUMBER,
        MQMD.GroupId = ステータス情報からの値
        MQMD.MsgSeqNumber = ステータス情報からの値 + 1
  msgs = 1
  /* 該当メッセージの処理 */
  ...
 
  /* 通常処理を再度開始 */
  /* グループ内の残りのメッセージを取り出し */
  while ( GroupStatus == MQGS_MSG_IN_GROUP ) {
 
    /* グループ内で最大4メッセージを処理 */
    GMO.Options = MQGMO_COMPLETE_MSG | MQGMO_SYNCPOINT |
                  MQGMO_WAIT | MQGMO_LOGICAL_ORDER
    while ( (GroupStatus == MQGS_MSG_IN_GROUP) && (msgs < 4) ) {
      MQGET
      msgs = msgs + 1
      /* 該当メッセージの処理 */
      ...
 
    /* 最終メッセージまたは4メッセージの取り出し完了 */
    /* グループの最終でない場合にステータスメッセージを更新 */
    MQGET (ステータスキューから) GMO.Options = MQGMO_SYNCPOINT
    if ( GroupStatus == MQGS_MSG_IN_GROUP )
      StatusInfo = MQMD構造体のGroupIdとMsgSeqNumber
      MQPUT (ステータス情報をステータスキューへ) PMO.Options =
                                              MQPMO_SYNCPOINT
    dc_trn_unchained_commit()
    msgs = 0