IBM InfoSphere Streams Version 4.1.1

User-defined parallelism

User-defined parallelism provides parallel processing of streaming data in specific regions of your InfoSphere® Streams applications.
By using user-defined parallelism, you can:
  • Replicate particular subgraphs of an application a specified number of times.
  • Automatically replicate and connect all the incoming and outgoing connections for a subgraph.
  • Route the tuples that come into the region of replicated operators so that different replicated operators process different tuples.
  • Route the tuples that are based on the tuple attribute values.
User-defined parallelism does not:
  • Preserve the tuple or punctuation order
  • Maintain state consistency across the replicated operators

To use data parallelism, add the @parallel annotation to either a primitive operator or a composite operator. To use data parallelism on a specific subgraph, wrap the subgraph in a composite operator, and then add the @parallel annotation to the invocation of that composite operator. The subgraph to which data parallelism is applied is called a parallel region. A parallel region is composed of parallel channels, which represent independent replicas of the subgraph.

Operators that produce data streams that flow into a parallel region must distribute those tuples across the channels of the parallel region. This distribution is done by a splitter, which is the runtime component that splits the stream of tuples and distributes them to the different channels of a parallel region. The splitters exist on the output ports that are immediately upstream from the parallel region. If you want to ensure that tuples with specific attribute values are always routed to the same channel, you can partition the parallel regions. The partitions are defined by a set of user-defined partition keys that are composed of tuple attributes.

A splitter routes the tuples and punctuation on the connections that it controls in one of the following ways:
  1. Regardless of whether the parallel region is partitioned, each tuple is routed to a single channel only:
    • When the parallel region is not partitioned, the splitter routes the tuple to maintain an even distribution of tuples to the channels.
    • When the parallel region is partitioned, the splitter uses the partition keys, which are the attribute values of each tuple, to determine where it routes the tuple. The splitter achieves the routing process by creating a hash from the set of tuple attributes.
  2. Each window punctuation and final punctuation is routed to all the channels. Because punctuation is logical markers in a stream, all the channels require all the punctuation.

Parallel regions can contain source operators, sink operators, import operators, and export operators. In the same way that other operators are replicated in the parallel region, the source and sink operators are also replicated in the parallel region. However, since source operators have no incoming data streams, no splitter is involved. To use data parallelism, these parallel regions must invoke the source operators in a way that partitions the data before it enters the streams processing application.

Data parallelism is achieved through one of two mechanisms. Operators in a parallel region are replicated and placed in either a replicated processing element (PE), or the operators are given threaded ports and placed in the same PE. In general, operators are placed in a replicated PE unless they are fused with an operator that is outside the parallel region. If they are fused with an operator that is outside the parallel region, the operators are all fused into the PE with the operator that is outside the parallel region. The replication of PEs and operators occurs during parallel transformation, which occurs at submission time.

The parallel transformation connects all the outgoing physical streams from the end of each channel to the same input port that the logical stream originally connected to. As a result, the tuple order is not preserved. For example, if tuples A, B, C, and D arrive at a parallel region in that order, the operators that are downstream from the parallel region can receive these tuples in any order, such as D, A, C, B. For the cases where InfoSphere Streams typically preserves the tuple order, the use of user-defined parallelism breaks that tuple order.