Putting and getting a group that spans units of work
In the previous case, messages or segments cannot start to leave the node (if its destination is remote) or start to be retrieved until the whole group has been put and the unit of work is committed. This might not be what you want if it takes a long time to put the whole group, or if queue space is limited on the node. To overcome this, put the group in several units of work.
If the group is put within multiple units of work, it is possible for some of the group to commit even when the putting application fails. The application must therefore save status information, committed with each unit of work, which it can use after a restart to resume an incomplete group. The simplest place to record this information is in a STATUS queue. If a complete group has been successfully put, the STATUS queue is empty.
If segmentation is involved, the logic is similar. In this case, the StatusInfo must include the Offset
.
PMO.Options = MQPMO_LOGICAL_ORDER | MQPMO_SYNCPOINT
/* First UOW */
MQPUT MD.MsgFlags = MQMF_MSG_IN_GROUP
MQPUT MD.MsgFlags = MQMF_MSG_IN_GROUP
MQPUT MD.MsgFlags = MQMF_MSG_IN_GROUP
StatusInfo = GroupId,MsgSeqNumber from MQMD
MQPUT (StatusInfo to STATUS queue) PMO.Options = MQPMO_SYNCPOINT
MQCMIT
/* Next and subsequent UOWs */
MQPUT MD.MsgFlags = MQMF_MSG_IN_GROUP
MQPUT MD.MsgFlags = MQMF_MSG_IN_GROUP
MQPUT MD.MsgFlags = MQMF_MSG_IN_GROUP
MQGET (from STATUS queue) GMO.Options = MQGMO_SYNCPOINT
StatusInfo = GroupId,MsgSeqNumber from MQMD
MQPUT (StatusInfo to STATUS queue) PMO.Options = MQPMO_SYNCPOINT
MQCMIT
/* Last UOW */
MQPUT MD.MsgFlags = MQMF_MSG_IN_GROUP
MQPUT MD.MsgFlags = MQMF_MSG_IN_GROUP
MQPUT MD.MsgFlags = MQMF_LAST_MSG_IN_GROUP
MQGET (from STATUS queue) GMO.Options = MQGMO_SYNCPOINT
MQCMIT
If all the units of work have been committed, the entire group has been put successfully, and the STATUS queue is empty. If not, the group must be resumed at the point indicated by the status information. MQPMO_LOGICAL_ORDER cannot be used for the first put, but can thereafter.
MQGET (StatusInfo from STATUS queue) GMO.Options = MQGMO_SYNCPOINT
if (Reason == MQRC_NO_MSG_AVAILABLE)
/* Proceed to normal processing */
...
else
/* Group was terminated prematurely */
Set GroupId, MsgSeqNumber in MQMD to values from Status message
PMO.Options = MQPMO_SYNCPOINT
MQPUT MD.MsgFlags = MQMF_MSG_IN_GROUP
/* Now normal processing is resumed.
Assume this is not the last message */
PMO.Options = MQPMO_LOGICAL_ORDER | MQPMO_SYNCPOINT
MQPUT MD.MsgFlags = MQMF_MSG_IN_GROUP
MQPUT MD.MsgFlags = MQMF_MSG_IN_GROUP
StatusInfo = GroupId,MsgSeqNumber from MQMD
MQPUT (StatusInfo to STATUS queue) PMO.Options = MQPMO_SYNCPOINT
MQCMIT
From the getting application, you might want to start processing the messages in a group before the whole group has arrived. This improves response times on the messages within the group, and also means that storage is not required for the entire group. In order to realize the benefits, use several units of work for each group of messages. For recovery reasons, you must retrieve each message within a unit of work.
As with the corresponding putting application, this requires status information to be recorded somewhere automatically as each unit of work is committed. Again, the simplest place to record this information is on a STATUS queue. If a complete group has been successfully processed, the STATUS queue is empty.
During restart processing, instead of using a single MQGET to get a possible status message, browse the status queue with MQGMO_LOGICAL_ORDER until you reach the last segment (that is, until no further segments are returned). In the first unit of work after restart, also specify the offset explicitly when putting the status segment.
In the following example, we consider only messages within a group, assuming that the application's buffer is always large enough to hold the entire message, whether or not the message has been segmented. MQGMO_COMPLETE_MSG is therefore specified on each MQGET. The same principles apply if segmentation is involved (in this case, the StatusInfo must include the Offset
).
msgs = 0 /* Counts messages retrieved within UOW */
/* Should be no status message at this point */
/* Retrieve remaining messages in the group */
do while ( GroupStatus == MQGS_MSG_IN_GROUP )
/* Process up to 4 messages in the group */
GMO.Options = MQGMO_SYNCPOINT | MQGMO_WAIT
| MQGMO_LOGICAL_ORDER
do while ( (GroupStatus == MQGS_MSG_IN_GROUP) && (msgs < 4) )
MQGET
msgs = msgs + 1
/* Process this message */
...
/* end while
/* Have retrieved last message or 4 messages */
/* Update status message if not last in group */
MQGET (from STATUS queue) GMO.Options = MQGMO_SYNCPOINT
if ( GroupStatus == MQGS_MSG_IN_GROUP )
StatusInfo = GroupId,MsgSeqNumber from MQMD
MQPUT (StatusInfo to STATUS queue) PMO.Options = MQPMO_SYNCPOINT
MQCMIT
msgs = 0
/* end while
if ( msgs > 0 )
/* Come here if there was only 1 message in the group */
MQCMIT
If all the units of work have been committed, the entire
group has been retrieved successfully, and the STATUS queue is empty.
If not, the group must be resumed at the point indicated by the status
information. MQGMO_LOGICAL_ORDER cannot be used for the first retrieve,
but can thereafter.
MQGET (from STATUS queue) GMO.Options = MQGMO_SYNCPOINT
if (Reason == MQRC_NO_MSG_AVAILABLE)
/* Proceed to normal processing */
...
else
/* Group was terminated prematurely */
/* The next message on the group must be retrieved by matching
the sequence number and group id with those retrieved from the
status information. */
GMO.Options = MQGMO_COMPLETE_MSG | MQGMO_SYNCPOINT | MQGMO_WAIT
MQGET GMO.MatchOptions = MQMO_MATCH_GROUP_ID | MQMO_MATCH_MSG_SEQ_NUMBER,
MQMD.GroupId = value from Status message,
MQMD.MsgSeqNumber = value from Status message plus 1
msgs = 1
/* Process this message */
...
/* Now normal processing is resumed */
/* Retrieve remaining messages in the group */
do while ( GroupStatus == MQGS_MSG_IN_GROUP )
/* Process up to 4 messages in the group */
GMO.Options = MQGMO_COMPLETE_MSG | MQGMO_SYNCPOINT | MQGMO_WAIT
| MQGMO_LOGICAL_ORDER
do while ( (GroupStatus == MQGS_MSG_IN_GROUP) && (msgs < 4) )
MQGET
msgs = msgs + 1
/* Process this message */
...
/* Have retrieved last message or 4 messages */
/* Update status message if not last in group */
MQGET (from STATUS queue) GMO.Options = MQGMO_SYNCPOINT
if ( GroupStatus == MQGS_MSG_IN_GROUP )
StatusInfo = GroupId,MsgSeqNumber from MQMD
MQPUT (StatusInfo to STATUS queue) PMO.Options = MQPMO_SYNCPOINT
MQCMIT
msgs = 0