IBM Streams 4.2

User-defined parallelism

User-defined parallelism, available through the @parallel annotation, allows you to easily take advantage of data-parallelism in your IBM® Streams applications.
In a streaming context, data-parallelism means replicating specific operators, splitting the streams going into those operators, and processing different tuples in parallel in those replicated operators. The goal of data-parallelism is to improve overall application throughput by parallelizing specific regions of the application. Through user-defined parallelism, you can:
  • Define parallel regions in your application, which replicate all operators in that region and automatically creates and connects new streams to the replicated operators.
  • Specify the amount of parallelism (how many replicas to generate, also called the parallel width) at compile or submission time.
  • Optionally partition the parallel region based on tuple attribute values. The SPL runtime will ensure that tuples with the same attribute values are sent to the same operators.
  • Optionally broadcast tuples from selected streams to all operators in the parallel region.
User-defined parallelism does not:
  • Preserve the tuple or punctuation order in a parallel region.
  • Maintain state consistency across the replicated operators.

To use user-defined parallelism, add the @parallel annotation to the invocation of either a primitive or a composite operator. When applied to just a primitive operator, the parallel region consists of just that primitive operator. When applied to a composite operator, the parallel region consists of all operators inside of that composite operator. Parallelized composite operators can themselves contain parallel regions, resulting in nested parallel regions. Parallel regions are composed of parallel channels, where each channel is an independent set of replicated operators. Parallel channels can contain multiple operators, with multiple input and output streams.

The number of channels in a parallel region is determined by the width parameter to the @parallel annotation. The process of actually replicating the operators and all of the necessary streams inside of a parallel region is the parallel transformation. Because the parallel transformation is performed at submission time, you can specify the width as a submission time value. Delaying the decision of how much parallelism to use until submission time allows you to compile an application once, and change its level of parallelism on each resubmission without recompiling.

Operators that produce streams that flow into a parallel region must distribute those tuples across the channels of the parallel region. This distribution is done by a splitter, which is the runtime component that splits the stream of tuples and distributes them to the different channels of a parallel region. The splitters exist on the output ports that are immediately upstream from the parallel region. The splitters can also be nested and contain other splitters; a splitter submits the tuples and punctuation to all its nested splitters. If you want to ensure that tuples with specific attribute values are always routed to the same channel, you can partition the parallel regions. The partitions are defined by a set of user-defined partition keys that are composed of tuple attributes.

A splitter routes the tuples and punctuations on the streams that it controls in one of the following ways:
  1. Regardless of whether the parallel region is partitioned, if the incoming stream is not set to broadcast, each tuple is routed to a single channel only:
    • When the parallel region is not partitioned, the splitter routes the tuple to maintain an even distribution of tuples to the channels.
    • When the parallel region is partitioned, the splitter uses the partition keys, which are the attribute values of each tuple, to determine where it routes the tuple. The splitter routes each tuple by creating a hash from the set of tuple attributes.
  2. For streams that are set to broadcast, all tuples are routed to all channels.
  3. Each window punctuation and final punctuation is routed to all channels. Because punctuations are logical markers in a stream, all the channels require all the punctuation.

Parallel regions can contain source operators, sink operators, Import and Export operators. In the same way that other operators are replicated in the parallel region, source and sink operators are also replicated in the parallel region. However, since source operators have no incoming data streams, no splitter is involved. To use data parallelism, these parallel regions must invoke the source operators in a way that partitions the data before it enters the streams processing application.

The actual mechanisms used at runtime to achieve parallel execution will depend on both how the application was fused, and the threading model for the PEs in the application. When parallel operators are in separate PEs, they will naturally execute in parallel. How parallel operators are fused will depend on the fusion scheme and their partitionColocation, partitionExlocation and partitionIsolation placement configuration options. When the PE is under the manual threading model, and the splitter is in the same PE as operators in the parallel region that it sends tuples to, then the runtime injects threaded ports to ensure parallel execution. When the PE is under the dynamic threading model, the runtime does not inject threaded ports, but instead allows the parallelized operators to be executed by the dynamic thread pool. For more details about fusion schemes, see Specifying how operators are fused when you submit a job. For more on threading models, see the SPL threading annotation.

The parallel transformation connects all the outgoing physical streams from the end of each channel to the same input port that the logical stream originally connected to, outside of the parallel region. Because there is no merge operation performed for the tuples flowing out of a parallel region, tuple order is not preserved. For example, if tuples A, B, C, and D arrive at a parallel region in that order, the operators that are downstream from the parallel region can receive these tuples in any order, such as D, A, C, B. For the cases where IBM Streams typically preserves the tuple order, the use of user-defined parallelism breaks that tuple order. It is possible for you to implement your own merging operators outside of a parallel region, if your applications requires preserving tuple order.