Extending the Aggregation sample

Find out how to modify the aggregation flow that is provided to fit different requirements. You might also find this sample useful if you are porting existing aggregation flows from Version 5 of the product and earlier.

Aggregation state control on fan-out

The Control terminal of the AggregateControl node is used to propagate the control message containing the status and tracking information for a particular aggregation operation. You have the following options for wiring the Control terminal:

  1. Leave it unwired.
  2. Wire it directly to the Control input terminal of the AggregateReply node. Direct wiring is only applicable if both nodes are in the same message flow.
  3. Wire it to an MQOutput node, and use a Compute node to add an MQMD, which writes the control message to a user-defined queue. The corresponding AggregateReply node, which is probably in a separate flow, has an MQInput node reading from this queue wired into its Control terminal. Existing aggregation flows from Version 5 of the product and earlier might work in this way.

Factors you must consider when choosing one of the options are:

Managing transactions on fan-out

You must set the Transaction Mode of the MQInput node on the fan-out flow to Yes to avoid the possibility of reply messages being received by the fan-in flow before the control information has been stored by the AggregateControl node.

When the fan-out flow is not run under transaction control, each aggregate fan-out request message that is written by the MQOutput nodes is eligible to be processed immediately by the application that is being invoked. Depending on the response from this application, the reply might be received by the AggregateReply node before the control information has been stored.

The same problem can occur if the Control terminal of the AggregateControl node is wired to a Compute node and an MQOutput node, that is, the fan-in and fan-out flows sit in different message flows. Even if the fan-out is done under a transaction, the extent of the transaction is the writing of the message by the MQOutput node, therefore replies can still be processed as unknown while the Control branch of the fan-in flow is processing. This scenario is discussed under option 3 in the preceding section.

In both cases the aggregation still completes correctly if Unknown Message Timeout on the AggregateReply node is not zero. Unknown replies are stored and then reprocessed after the number of seconds specified, and at this point they are consolidated in the control state information. The aggregation operation still completes correctly after the delay caused by the Unknown Message Timeout expiring. If you set the Unknown Message Timeout to zero, replies that come in ahead of the control message are propagated directly to the Unknown terminal of the AggregateReply node and are not consolidated with the rest of the aggregation data.

To summarize both this section and the previous one, the most efficient design for an Aggregation fan-out flow ensures that:

If you adopt the most efficient design options, it ensures that the timeout issues of the type described in the preceding text do not occur. However, if it is not possible to use these options, you can still set the Unknown Message Timeout parameter on the AggregateReply node to a non-zero value to ensure that the aggregation operations complete successfully.

Transactional context on fan-in

The AggregateReply node has three non-error output terminals: Out, Unknown, and Timeout. It is important to understand when and why messages are sent from these different terminals, paying particular attention to transactional context. The transactional context can either be owned by the MQInput node, which occurs when an incoming reply message triggers output from the AggregateReply node, or it can be owned by the AggregateReply node itself.

Where the transaction context is owned by the AggregateReply node, the transaction context has the typical semantics, but transactional control is centred on the AggregateReply node rather than the MQInput node, which has the implication that an error produced later in the message flow can cause the AggregateReply node to roll back and propagate a message to its Catch terminal, rather than the MQInput node. Messages propagated from the AggregateReply node within the context of its own transactions are not directly supplied from the MQInput node; the AggregateReply node is the input node for this flow invocation.

The transactional behavior of the AggregateReply node is controlled by its Transaction Mode configuration parameter. You must ensure that this setting and the setting on the MQInput node are the same, to ensure that all output from the AggregateReply node is under the same level of transactional control.

The following situations are under the transactional control of the MQInput node:

The following situations are under the transactional control of the AggregateReply node:

Avoiding thread starvation on fan-in

The AggregateReply node has two input terminals: In and Control. If you use both of these terminals, remembering that the use of the Control terminal is optional, the most efficient way to supply data to the AggregateReply node is to have a single MQInput node for the fan-in flow followed by a Filter node. The Filter node is used to route an incoming message to the In or Control terminals of the AggregateReply node as appropriate. Use this approach rather than using two MQInput nodes in the message flow: one for the In terminal and one for the Control terminal. The fan-in flow must look like the following diagram:
Truncated fan-in flow with filtering
The Filter node requires an ESQL module similar to the following example to ensure that the messages are routed to the appropriate terminal of the AggregateReply node:

CREATE FILTER MODULE FanIn_Filter
CREATE FUNCTION Main() RETURNS BOOLEAN
BEGIN
IF Root.XMLNSC.ComIbmAggregateControlNode IS NULL THEN
RETURN TRUE; -- wired to In
ELSE
RETURN FALSE; -- wired to Control
END IF;
END;
END MODULE;

You must use a single MQInput node because you cannot specify how any additional threads (made available by the use of additional instances) must be distributed between the two MQInput nodes. Traffic on the In terminal of the AggregateReply node is likely to be higher, therefore it is useful to have more threads running in its input node, but you cannot configure this scenario. It is therefore possible for the node to be starved of threads, backing up reply messages and stalling the aggregation mechanism.

This situation only applies if the Control terminal of the AggregateControl node is wired to output to a queue. By not wiring the Control terminal you can overcome these issues.

If the preceding solution cannot be implemented, you can force the MQInput node that is reading control messages to run single-threaded:

  1. In the Advanced panel of the MQInput node configuration, set Order Mode to By Queue Order.
  2. Select Logical Order, which frees up all of the configured additional instances so that they can be used by the other MQInput node.

However, the performance of the first MQInput node is severely limited as a result of this configuration.

Back to sample home