IBM Streams 4.2

Dynamic stream importing and exporting

The Export operator exports a stream using a set of export properties, which can be modified at run time. Similarly, the Import operator is used to import a stream using an import subscription that is defined over the export properties. Import subscriptions can also be changed at run time.

The export properties of a stream can be updated at run time only by the operator that produces the stream. Such an operator must first get a handle on the output port that produces the stream, which can be achieved by the getOutputPortAt member function of the Operator class. Given an output port object of type OperatorOutputPort, operations such as adding and removing stream properties can be done with the provided APIs. These APIs take objects of type StreamPropertyCollection and StreamProperty to update the properties of the exported stream. The following is an example that adds an export property at run time. The property values must be defined with SPL C++ types (rstring in the example).

OperatorOutputPort & oport = getOutputPortAt(0);
assert(oport.isExported() &&
       oport.getExportType()==OperatorOutputPort::ByProperties);
std::string propName = "color";
rstring propValue = "orange";
StreamProperty colorProp(propName, propValue);
oport.addStreamProperty(colorProp);

The import subscription for an imported stream can be updated at run time only by an operator that subscribes to the stream. Unlike an exported stream, which is necessarily produced by a specific output port, an imported stream can be consumed by multiple operators. Any changes that are performed on the subscription at run time apply only to the operator that performs the update. Such an operator needs to first get a handle on the input port that subscribes to the stream, which can be achieved by the getInputPortAt function of the Operator class. Given an input port object of type OperatorInputPort, operations such as updating or modifying the subscription expressions can be done with the provided APIs. These APIs take objects of type SubscriptionExpression, which holds a modifiable expression. The following is an example that creates the subscription expression ids[0]<5 && color=="orange" at run time.

OperatorInputPort & iport = getInputPortAt(0);
SubscriptionExpressionPtr idPred = SubscriptionExpression::createPredicate("ids",
   SubscriptionExpression::LessThan, SubscriptionExpression::Literal(5), 0);
SubscriptionExpressionPtr colorPred = SubscriptionExpression::createPredicate("color",
   SubscriptionExpression::Equal, SubscriptionExpression::Literal("orange"));
SubscriptionExpressionPtr subscription =
   SubscriptionExpression::createAndClause(idPred, colorPred);
iport.setSubscriptionExpression(*subscription);

In the example, SubscriptionExpressionPtr is a shared pointer, and thus the memory does not need to be managed explicitly. SubscriptionExpressionPtr objects can also be created from string representations of the expression. The following is an example:

SubscriptionExpressionPtr
   exp = SubscriptionExpression::fromString("ids[0]<5 && color==\"orange\"");