IBM InfoSphere Streams Version 4.1.1
Parallel annotation
To use user-defined parallelism, apply the @parallel
annotation to invocations of primitive or composite operators.
To apply user-defined
parallelism to an arbitrary group of operators, refactor those operators into a single composite
operator, and then apply the @parallel annotation to the invocation of that
composite operator.
>>-@parallel(width=number-| Optional elements |-)-------------->< Optional elements |--+----------------------------------------------------------------------------+--> | .-,---------------------------------------------------. | | | .-,--------------. | | | V V | | | '-,--partitionBy=[---{port=port_name,-attributes=[---attribute_name-+-]}-+-]-' >--+---------------------------------------+--------------------| | .-,--------. | | V | | '-,--replicateHostTags=[---host_tag-+-]-'
Required elements
- width
- Specifies the number of channels in a parallel region. For each channel in a parallel region, InfoSphere® Streams replicates the invoked operator. The width must be either an int32 literal, an expression<int32> composite parameter, or a submission-time value that is cast as an int32 data type.
Optional elements
- partitionBy
- Specifies how to partition tuples across the channels for specific ports that enter a parallel region. Ports are individually partitioned and each port can be different. There is one splitter per port. Partitioning a parallel region ensures that tuples with specific values are always routed to the same channel. These specific values are called partitioning keys, and are specified as tuple attributes. The partitioning specification for a parallel region is supplied as a list of port and attribute pairs. The pairs are specified as follows:
- attributes
- Specifies the partitioning keys as a list of attributes. The splitter routes tuples that have the same values for these attributes to the same parallel channel. The attributes must exist in the tuple type that is specified for the input port.
- port
- Specifies the name of the input port for a parallel region. The name must identify an input port of the invocation of the operator that the parallel annotation applies to.
- If a port is not partitioned, the splitter can route any tuple to any channel.
- replicateHostTags
- Specifies host tags that identify the host pools to replicate in a parallel region. When a host pool is replicated, it is used only for a single channel in a parallel region. By replicating host pools, you can assign sibling operators in a parallel region to different sets of hosts.
Examples
- Example 1: Parallel regions that are not partitioned
- Example 1 shows the simplest form of a parallel annotation for parallel regions that are not partitioned. This example applies the @parallel annotation to the invocation of the Op operator. The annotation is placed immediately before the first output stream that is associated with the operator invocation. The newline is not required. The width parameter specifies that the operator is replicated five times and that there are five channels. Therefore, the output port of the input that feeds the Op operator uses a splitter. The partitionBy element is not specified, therefore the splitter routes the tuples to any channel.
@parallel(width=5) stream<Type> Output = Op(Input) { // ... }
- Example 2: Partitions in a parallel region
- To partition a parallel region, you specify the partition keys for each input port. The partition keys are the tuple attributes that the splitter uses to create the partition. The valid attributes for the partitioning keys are from the stream type of the input port. In Example 2, the splitter to the Input1 port partitions the tuples based on the attr1 and attr2 attributes. The partitioning for the Input1 port is implemented with a hash, and the splitter routes tuples that are based on the hash value. The Input2 port does not specify a partitionBy element, therefore the splitter does not partition the parallel region and the tuples are evenly distributed among the parallel channels.
@parallel(width=5, partitionBy=[{port=Input1, attributes=[attr1, attr2]}]) stream<Type> Output = Op(Input1; Input2) { // ... }
- Example 3: Partitioning for two streams that are coming into one port
- Partitioning applies to all the streams that are incoming to the port. Example 3 shows two streams that are coming into one port. Notice that, the Input1 and Input2 streams are partitioned in the same way.
@parallel(width=5, partitionBy=[{port=Input1, attributes=[attr1, attr2]}]) stream<Type> Output = Op(Input1, Input2) { // ... }