About the Aggregation sample

The Aggregation sample demonstrates a simple four-way aggregation operation that uses the AggregateControl, AggregateRequest, and AggregateReply nodes. It contains three message flows to implement a four-way aggregation: FanOut, RequestReplyApp, and FanIn.

FanOut message flow

This flow takes the incoming request message, generates four different request messages, sends them out on request/reply, and starts the tracking of the aggregation operation.

Aggregation FanOut flow
The Control terminal of the AggregateControl node is not wired, and the Transaction Mode of the MQInput node is set to Yes. You can find out more about the reasons for this design in Extending the Aggregation sample.

The FanOut flow contains the AggregateControl and AggregateRequest nodes, which are used to start the aggregation processing. The AggregateControl node propagates the request message down each of the four branches that are connected to its Out terminal (in no defined order). Each branch has a BuildRequest Compute node to generate the individual request. The following ESQL code is used in the BuildRequest1 Compute node:

CREATE COMPUTE MODULE FanOut_CreateRequest1
CREATE FUNCTION Main() RETURNS BOOLEAN
BEGIN
SET OutputLocalEnvironment = InputLocalEnvironment;
CALL CopyQuarter(InputRoot, OutputRoot, 0);
RETURN TRUE;
END;
END MODULE;

The CopyQuarter procedure copies the message headers from the input message, and then extracts one quarter of the <SaleList> elements. In the supplied example, eight <SaleList> elements are provided, each request message contains two <SaleList> elements. The following example shows the ESQL code for this procedure:

CREATE PROCEDURE CopyQuarter(IN input REFERENCE,
							 IN output REFERENCE,
							 IN jumps INTEGER)
BEGIN
	CALL CopyMessageHeaders(input, output);
	CREATE LASTCHILD OF output DOMAIN 'XMLNSC';
	CREATE LASTCHILD OF output.XMLNSC NAME 'SaleEnvelope';
	DECLARE xmlIn REFERENCE TO input.XMLNSC.SaleEnvelope;
	DECLARE xmlOut REFERENCE TO output.XMLNSC.SaleEnvelope;
	IF LASTMOVE(xmlOut) <> TRUE THEN
		THROW USER EXCEPTION CATALOG 'BIPv610' MESSAGE 2959 VALUES ('could not create output message');
	END IF;

	DECLARE invoices INTEGER CAST (xmlIn.Header.SaleListCount AS INTEGER);
	DECLARE quarter INTEGER invoices/4;
	IF invoices <> (quarter*4) THEN
		THROW USER EXCEPTION CATALOG 'BIPv610' MESSAGE 2959 VALUES ('not divisible by 4', invoices);
	END IF;	
	
	IF jumps > 3 THEN
		THROW USER EXCEPTION CATALOG 'BIPv610' MESSAGE 2959 VALUES ('too many jumps', jumps);
	END IF;		
	
	DECLARE count INTEGER 1;
	DECLARE copyRef REFERENCE TO xmlIn.SaleList[(jumps*quarter)+count];
	WHILE count <= quarter DO
		SET xmlOut.SaleList[count] = copyRef;
		MOVE copyRef NEXTSIBLING;
		SET count = count + 1;
	END WHILE;
END;

Some initial verification about the status of the inputs takes place (the number of <SaleList> elements must be divisible by four, and the required quarter is selected by 0, 1, 2, or 3) before the appropriate number of <SaleList> elements are copied from the input message into the output message.

The CopyMessageHeaders procedure, as called in the CopyQuarter procedure, is based on the supplied standard CopyMessageHeaders procedure provided in the generated ESQL for a new Compute node. To maximize reuse, this CopyMessageHeaders procedure is moved up to the scope of the ESQL file, so that all of the Compute nodes can call the same procedure.

This rescoping has an important implication, necessitating a change to the procedure. Within a Compute node, the OutputRoot reference has special properties that automatically ensure domain information is preserved when message tree elements are copied from InputRoot to OutputRoot. However in this case, OutputRoot is passed as a reference to an external procedure, hence the domain information must be explicitly preserved. This preservation is accomplished by the addition of the CREATE LASTCHILD command:

CREATE PROCEDURE CopyMessageHeaders(IN input REFERENCE,
IN output REFERENCE)
BEGIN
DECLARE I INTEGER 2;
DECLARE J INTEGER CARDINALITY(input.*[]);
WHILE I < J DO
CREATE LASTCHILD OF output DOMAIN FIELDNAME(input.*[I]); -- preserve domain information
SET output.*[I] = input.*[I];
SET I = I + 1;
END WHILE;
END;

After the BuildRequest Compute node has generated the request message by setting the Compute node to Pass LocalEnvironment and Message, it is output by an MQOutput node to the AGGR_SAMPLE_REQUEST queue. (For simplicity in this sample, all four requests are put to the same queue, but this scenario is probably not realistic for a real application.) Each AggregateRequest node has a folder name specified as a configuration parameter, which is used by the AggregateReply node when appending the various replies into the aggregated reply message. The AggregateRequest1 node uses the first quarter of the input message, the AggregateRequest2 node uses the second quarter, and so on.

The MQOutput nodes are set to specify AGGR_SAMPLE_REPLY as the ReplyTo queue on the request messages, which is used by the RequestReplyApp message flow.

After all four request messages are output, the AggregateControl node stores the state of the aggregation internally in the integration node, as shown in the following steps:

To find out about other ways of accomplishing this scenario, see Extending the sample.

The whole flow must be done under a transaction, with Transaction Mode set to YES on the MQInput node, because it is most efficient if the last operation (the storing of the aggregation operation state) is complete before any replies are received.

RequestReplyApp message flow

Use this flow to simulate the back-end service applications that typically processes the request messages from the aggregation operation. In a real system, these back-end service applications can be other message flows or existing applications, but this level of complexity is not required for the Aggregation sample, the flow contains the minimum required for correct request/reply processing. This flow reads from the same queue that the MQOutput nodes in the FanOut flow write to, and it outputs to the queue that the input node in the FanIn flow reads from, it provides a messaging bridge between the two flows. The messages are put to their reply-to queue (as set by the MQOutput nodes in the FanOut flow).

Reply flow
The RequestReplyApp flow is specified with three additional instances in the broker archive (BAR) file, resulting in four threads in total, which ensures that all four requests are processed as quickly as possible.

FanIn message flow

This flow receives all of the replies from the RequestReplyApp flow, and aggregates them into a single output message. The output message from the AggregateReply node cannot be output by an MQOutput node, therefore a Compute node is added to tweak the data into a format where it can be written out to a queue.
Aggregation fan-in flow

The FanIn message flow also has three additional instances, for the same reasons as the RequestReplyApp flow. The first three incoming replies are stored internally by the Integration node, and the stored aggregation state is updated. When the fourth reply is processed, the three stored replies are extracted and all four reply messages are built into an output message. This message is not in a state where it can be output to a queue, therefore the BuildReply Compute node invokes the following ESQL:

CREATE COMPUTE MODULE FanIn_BuildReply
	CREATE FUNCTION Main() RETURNS BOOLEAN
	BEGIN
		SET OutputRoot.Properties = InputRoot.Properties;
		CREATE NEXTSIBLING OF OutputRoot.Properties DOMAIN 'MQMD';
		SET OutputRoot.MQMD.Version = MQMD_CURRENT_VERSION;
		CREATE LASTCHILD OF OutputRoot DOMAIN 'XMLNSC';
		CREATE LASTCHILD OF OutputRoot.XMLNSC NAME 'ComIbmAggregateReplyBody';
		DECLARE next INTEGER 1;
		DECLARE repliesIn REFERENCE TO InputRoot.ComIbmAggregateReplyBody.*[next];
		DECLARE repliesOut REFERENCE TO OutputRoot.XMLNSC.ComIbmAggregateReplyBody;
		WHILE next <= 4 DO -- 4-way aggregation
			CREATE LASTCHILD OF repliesOut NAME FIELDNAME(repliesIn);
			SET repliesOut.*[next].ReplyIdentifier = CAST(repliesIn.Properties.ReplyIdentifier AS CHAR);
			SET repliesOut.*[next].SaleEnvelope = repliesIn.XMLNSC.SaleEnvelope;
			MOVE repliesIn NEXTSIBLING;
			SET next = next + 1;
		END WHILE;
		RETURN TRUE;
	END;
END MODULE;

The ESQL adds a rudimentary MQMD, before copying the data from ComIbmAggregateReplyBody in the input message into an XML tree in the output message, while maintaining the aggregate request identifiers and folders. The order of the replies is not specified.

Test message

The test message that is used to drive the aggregation message flow is an XML message that contains invoice details for a customer. It contains approximately 8 KB of data, in eight separate <SaleList> elements.

<SaleEnvelope>
<Header>
<SaleListCount>8</SaleListCount>
</Header>
<SaleList>
<Invoice>
<Initial>K</Initial><Initial>A</Initial>
<Surname>Braithwaite</Surname>
<Item><Code>00</Code><Code>01</Code><Code>02</Code>
<Description>Twister</Description>
<Category>Games</Category>
<Price>00.30</Price><Quantity>01</Quantity>
</Item>
<Item><Code>02</Code><Code>03</Code><Code>01</Code>
<Description>The Times Newspaper</Description>
<Category>Books and Media</Category>
<Price>00.20</Price><Quantity>01</Quantity>
</Item>
<Balance>00.50</Balance><Currency>Sterling</Currency>
</Invoice>
<Invoice>
<Initial>T</Initial><Initial>J</Initial>
<Surname>Dunnwin</Surname>
<Item><Code>04</Code><Code>05</Code><Code>01</Code>
<Description>The Origin of Species</Description>
<Category>Books and Media</Category>
<Price>22.34</Price><Quantity>02</Quantity>
</Item>
<Item><Code>06</Code><Code>07</Code><Code>01</Code>
<Description>Microscope</Description>
<Category>Miscellaneous</Category>
<Price>36.20</Price><Quantity>01</Quantity>
</Item>
<Balance>81.84</Balance><Currency>Euros</Currency>
</Invoice>
</SaleList>
<SaleList>....</SaleList>
<SaleList>....</SaleList>
<SaleList>....</SaleList>
<SaleList>....</SaleList>
<SaleList>....</SaleList>
<SaleList>....</SaleList>
<SaleList>....</SaleList>
<Trailer>
<CompletionTime>12.00.00</CompletionTime>
</Trailer>
</SaleEnvelope>

Back to sample home