IBM InfoSphere Streams Version 4.1.1

Parallel annotation

To use user-defined parallelism, apply the @parallel annotation to invocations of primitive or composite operators. To apply user-defined parallelism to an arbitrary group of operators, refactor those operators into a single composite operator, and then apply the @parallel annotation to the invocation of that composite operator.
Read syntax diagramSkip visual syntax diagram
>>-@parallel(width=number-| Optional elements |-)--------------><

Optional elements

|--+----------------------------------------------------------------------------+-->
   |                  .-,---------------------------------------------------.   |   
   |                  |                               .-,--------------.    |   |   
   |                  V                               V                |    |   |   
   '-,--partitionBy=[---{port=port_name,-attributes=[---attribute_name-+-]}-+-]-'   

>--+---------------------------------------+--------------------|
   |                        .-,--------.   |   
   |                        V          |   |   
   '-,--replicateHostTags=[---host_tag-+-]-'   

Required elements

width
Specifies the number of channels in a parallel region. For each channel in a parallel region, InfoSphere® Streams replicates the invoked operator. The width must be either an int32 literal, an expression<int32> composite parameter, or a submission-time value that is cast as an int32 data type.

Optional elements

partitionBy
Specifies how to partition tuples across the channels for specific ports that enter a parallel region. Ports are individually partitioned and each port can be different. There is one splitter per port. Partitioning a parallel region ensures that tuples with specific values are always routed to the same channel. These specific values are called partitioning keys, and are specified as tuple attributes. The partitioning specification for a parallel region is supplied as a list of port and attribute pairs. The pairs are specified as follows:
attributes
Specifies the partitioning keys as a list of attributes. The splitter routes tuples that have the same values for these attributes to the same parallel channel. The attributes must exist in the tuple type that is specified for the input port.
port
Specifies the name of the input port for a parallel region. The name must identify an input port of the invocation of the operator that the parallel annotation applies to.
If a port is not partitioned, the splitter can route any tuple to any channel.
replicateHostTags
Specifies host tags that identify the host pools to replicate in a parallel region. When a host pool is replicated, it is used only for a single channel in a parallel region. By replicating host pools, you can assign sibling operators in a parallel region to different sets of hosts.

Examples

Example 1: Parallel regions that are not partitioned
Example 1 shows the simplest form of a parallel annotation for parallel regions that are not partitioned. This example applies the @parallel annotation to the invocation of the Op operator. The annotation is placed immediately before the first output stream that is associated with the operator invocation. The newline is not required. The width parameter specifies that the operator is replicated five times and that there are five channels. Therefore, the output port of the input that feeds the Op operator uses a splitter. The partitionBy element is not specified, therefore the splitter routes the tuples to any channel.
@parallel(width=5)
stream<Type> Output = Op(Input) {
        // ...
}
Example 2: Partitions in a parallel region
To partition a parallel region, you specify the partition keys for each input port. The partition keys are the tuple attributes that the splitter uses to create the partition. The valid attributes for the partitioning keys are from the stream type of the input port. In Example 2, the splitter to the Input1 port partitions the tuples based on the attr1 and attr2 attributes. The partitioning for the Input1 port is implemented with a hash, and the splitter routes tuples that are based on the hash value. The Input2 port does not specify a partitionBy element, therefore the splitter does not partition the parallel region and the tuples are evenly distributed among the parallel channels.
@parallel(width=5, partitionBy=[{port=Input1, attributes=[attr1, attr2]}])
stream<Type> Output = Op(Input1; Input2) {
         // ...
}
Example 3: Partitioning for two streams that are coming into one port
Partitioning applies to all the streams that are incoming to the port. Example 3 shows two streams that are coming into one port. Notice that, the Input1 and Input2 streams are partitioned in the same way.
@parallel(width=5, partitionBy=[{port=Input1, attributes=[attr1, attr2]}])
stream<Type> Output = Op(Input1, Input2) {    
	// ...
}