IBM Integration Bus, Version 9.0.0.8 Operating Systems: AIX, HP-Itanium, Linux, Solaris, Windows, z/OS

See information about the latest product version

Creating the aggregation fan-in flow

The aggregation fan-in flow receives the responses to the request messages that are sent out by the fan-out flow, and constructs a combined response message containing all the responses received.

Before you start:

You can include the fan-out and fan-in flow within the same message flow. However, you might prefer to create two separate flows. For more information about the benefits of configuring separate message flows, see Associating fan-out and fan-in aggregation flows. Do not deploy multiple copies of the same fan-in flow either to the same or to different integration servers.

If you do not configure the fan-out flow to be transactional, the timeout values that you have specified might result in the combined response message being generated before the fan-in flow has received all the replies. For more information, see Creating the aggregation fan-out flow.

To review an example of a fan-in flow see the following sample:

You can view information about samples only when you use the product documentation that is integrated with the IBM® Integration Toolkit or the online product documentation. You can run samples only when you use the product documentation that is integrated with the IBM Integration Toolkit.

To create the fan-in flow:

  1. Create a message flow to provide the fan-in processing.
  2. Add the following nodes in the editor view and configure and connect them as described:
    Input node
    The input node receives the responses to the multiple request messages that are generated from the fan-out flow.

    This node must be an input node that supports the request/reply model, such as an MQInput node, or a mixture of these nodes (depending on the requirements of the applications that send these responses). The response that is received by each input node must be sent across the same protocol as the request to which it corresponds. For example, if you include an MQOutput node in the fan-out flow, the response to that request must be received by an MQInput node in this fan-in flow.

    1. Select the input node to open the Properties view. The node properties are displayed.
    2. Specify the source of input messages for this node; for example, specify the name of a WebSphere® MQ queue in the Basic property Queue name from which the MQInput node retrieves messages.
    3. Optional: specify values for any other properties that you want to configure for this node.
    4. Connect the Out terminal of the input node to the In terminal of an AggregateReply node.

      Connect the terminals in this way to create the simplest configuration; if appropriate, you can include other nodes between the input node and the AggregateReply node; for example, you might want to store the replies for audit purposes (in a Database node).

    Include just one input node that receives all the aggregation response messages at the beginnings of the fan-in flow as described previously. If you include multiple input nodes, threads that are started by a specific reply input node might complete the aggregation and execution of the message flow while other threads are sending their response messages to the AggregateReply node and becoming eligible to timeout. Use a single input node to enable sequential processing of replies for each aggregation. Specify additional instances to provide greater processing throughput in this single node, see Configurable message flow properties.

    AggregateReply node
    The AggregateReply node receives the inbound responses from the input node through its In terminal. The AggregateReply node stores each reply message for subsequent processing.

    When all the replies for a particular group of aggregation requests have been collected, the AggregateReply node creates an aggregated reply message, and propagates it through the Out terminal.

    1. Select the AggregateReply node to open the Properties view. The node properties are displayed.
    2. Set the Aggregate name property of the AggregateReply node to identify this aggregation. Set this value to be the same value that you set for the Aggregate name property in the corresponding AggregateControl node in the fan-out flow.
    3. Optional: to retain an unrecognized message before propagating it to the Unknown terminal, set a value for the Unknown message timeout property. If you are using separate fan-out and fan-in flows, set this value to a non-zero number if the control message might be delayed.
    4. Optional: to explicitly handle unrecognized messages, connect the Unknown terminal to another node, or sequence of nodes. If you do not connect this terminal to another node in the message flow, messages propagated through this terminal are discarded.
    5. Optional: if you have specified a timeout value for this aggregation in the AggregateControl node, and you want to explicitly handle timeouts that expire before all replies are received, connect the Timeout terminal to another node, or sequence of nodes. Partially complete aggregated replies are sent to the Timeout terminal if the timer expires. If you do not connect this terminal to another node in the message flow, messages propagated through this terminal are discarded.
    6. Optional: specify values for any other properties that you want to configure for this node.
    7. Connect the Out terminal of the AggregateReply node to the In terminal of a Compute node.
    Attention: The Control terminal of the AggregateReply node was deprecated at Version 6.0, and by default any connections to this terminal (either direct or indirect) are ignored. This change maximizes the efficiency of aggregation flows and does not damage the reliability of aggregations. This configuration provides the optimum content.

    However, if you want the AggregateReply node to receive, on its Control terminal, the control message that was sent by the corresponding AggregateControl node on the fan-out flow, you must make the necessary connections as described in Creating the aggregation fan-out flow. Keep the path from the AggregateReply node to the output node as direct as possible to maximize the performance of aggregations. Do not modify the content of this control message.

    In addition, for the Control terminal and connections to it to be recognized, you must enable the environment variable MQSI_AGGR_COMPAT_MODE. If you choose this option, the performance and behavior of message aggregations might be affected; for a full description of these implications and the environment variable, see Using control messages in aggregation flows.

    Aggregated messages which are sent from the AggregateReply node output terminals (Out and Timeout) are not validated. Validation of data must be done before messages are sent to the AggregateReply node, because it ignores validation options when reconstructing the stored messages.

    Compute node
    The Compute node receives the message that contains the combined responses. Typically, the format of this combined message is not valid for output, because the aggregated reply message has an unusual structure and cannot be parsed into the bit stream required by some nodes, for example the MQOutput node. The Out and Timeout terminals always propagate an aggregated reply message, which always requires further processing before it can be propagated to an MQOutput node. Therefore you must include a Compute node and configure this node to create a valid output message.
    1. Select the Compute node to open the Properties view. The node properties are displayed.
    2. Specify in the Basic property ESQL module the name of the ESQL module that customizes the function of this node.
    3. Right-click the node and click Open ESQL to open the ESQL file that contains the module for this node. The module is highlighted in the ESQL editor view.
    4. Code the ESQL to create a single output message from the aggregated replies in the input message.

      The aggregated reply message is propagated through the Out terminal. Information about how you can access its contents is provided in Accessing the combined message contents.

    5. Optional: specify values for any other properties that you want to configure for this node.
    6. Connect the Out terminal of the Compute node to the In terminal of the output node that represents the destination of the single response message.
    Output node
    Include an output node for your fan-in flow. This node can be any of the built-in nodes, or a user-defined output node.
    1. Select the output node to open the Properties view. The node properties are displayed.
    2. Specify the destination for the output message for this node; for example, specify in the Basic property Queue name the name of a WebSphere MQ queue to which the MQOutput node sends messages.
    3. Optional: specify values for any other properties that you want to configure for this node.
  3. To save the message flow and validate its configuration, press Ctrl-S or click File > Save.

Accessing the combined message contents

The AggregateReply node creates a folder in the combined message tree below Root, called ComIbmAggregateReplyBody. Below this folder, the node creates a number of subfolders using the names that you set in the AggregateRequest nodes. These subfolders are populated with the associated reply messages.

For example, the request messages might have folder names:

  • TAXI
  • HOTEL

The resulting aggregated reply message created by the AggregateReply node might have a structure like the following example:

Diagram shows the tree for the aggregated message content created under element ComIbmAggregateReplyBody under Root. Its contents are described in the surrounding text.

Use ESQL within a Compute node to access the reply from the taxi company using the following correlation name:

InputRoot.ComIbmAggregateReplyBody.TAXI.xyz

The folder name does not have to be unique. If you have multiple requests with the folder name TAXI, you can access the separate replies using the array subscript notation, for example:

InputRoot.ComIbmAggregateReplyBody.TAXI[1].xyz
InputRoot.ComIbmAggregateReplyBody.TAXI[2].xyz

ac12300_.htm | Last updated Friday, 21 July 2017