IBM InfoSphere Streams Version 4.1.1

Source, sink, import, and export operators in applications with user-defined parallelism

You use the getChannel() and getMaxChannels() SPL built-in functions to change the behavior of the source, sink, import, and export operators inside a parallel region.

The getChannel() function is one of the SPL built-in functions that provides information about parallel regions at run time. The getChannel() function reports at run time, which parallel channel an operator is on. Other SPL built-in functions that provide information about parallel regions include getMaxChannels(), getThisOperatorName(), and getThisOperatorLogicalName(). The getChannel() and getMaxChannels() functions can be used in the streamId and properties parameters in the Export operator and the streamId, subscription, and filter parameters in the Import operator.

Example 1: Invocation of a FileSource operator
Example 1 uses the current semantics to run a FileSource operator inside a parallel region. The complete file parameter is not resolved until run time, and can change the actual file that it reads according to the parallel channel number.
stream<Type> Output = FileSource() {
      param file: "input" + (rstring)getChannel() + ".csv";
}
Example 2: Invocation of an Export operator
Other source and sink edge adapters can work inside a parallel region in a similar way. Example 2 shows that a similar technique also works for Export operators inside a parallel region. The individual Import operators that consume each of the replicated Export operators must pass the names, such as myName0, myName1, and myName2, to the streamID parameters. These Import operators can also be inside a parallel region, in which case they can also use the getChannel() function on the streamID parameter.
stream<Type> Published = Export(In) {
      param streamId: "myName" + (rstring)getChannel();
}
Example 3: Invocation of an Import operator inside a parallel region
The productive use of Import operators inside a parallel region does not require that the Export operator is also inside a parallel region. For example, if an Export operator in one application exports tuples that have a seqno attribute with type uint32, an Import operator in a different application can be written as shown in Example 3. The filter expression for each replicated Import operator is pushed to the Export operator in the other application. The Export operator effectively becomes a splitter for all the Import operators.
stream<Type> Data = Import() {
      param applicationName: "sample::Main";
            streamId: "allData";
            filter: (seqno % getMaxChannels()) == getChannel();
}