IBM Streams 4.2

Java operator lifecycle

Java™ operators have a runtime lifecycle that starts when an instance of the operator is initialized and ends when the processing element (PE) that hosts the operator is shut down.

Operators that are implemented with the Java Operator API are instances of com.ibm.streams.operator.Operator and must have a no-argument constructor. An operator instance starts its runtime lifecycle with a call to its initialize(OperatorContext) method. The operator cannot receive or send tuples and punctuation until its ports are ready. When the operator ports are ready, the operator starts receiving calls to its tuple and punctuation processing methods and can submit tuples accordingly. The allPortsReady method is called as a notification to the operator that its ports are ready to receive and submit tuples. The operator's tuple and punctuation processing methods can be invoked before the allPortsReady notification is received. If a tuple submission is to be made outside the context of a tuple or punctuation processing method, then the allPortsReady notification must be received first. When the processing element (PE) that hosts the operator is being shut down, the operator receives a call to its shutdown method, which can take place while tuple and punctuation processing methods are active. The shutdown method completes any asynchronous activity and releases any resources.

All incoming tuples are sent to the operator through the single process method that is passed the StreamingInput describing the port that the tuple arrived on and the tuple as an instance of Tuple. Tuple provides a JDBC ResultSet like interface for obtaining attribute values, such as getInt, getFloat, getString, getObject. Each method takes an attribute index (zero-based) or an attribute name. The Tuple object is immutable. Thus, it can be passed around many layers of software without its changing its value. It can safely be retained by the operator (for example, for batching) after the process method returns.

Outgoing tuples are represented by the OutputTuple interface, which extends Tuple to provide methods to set its attributes. Instances of OutputTuple are created through StreamingOutput<OutputTuple>.newTuple() and submitted through the submit method on the same interface. Again, the methods to set attribute values are similar to JDBC, setInt, setFloat, setString, setObject. An assign method exists on OutputTuple to allow bulk copying of attribute values from another Tuple where the names and types of the attributes match.

Tuples represented by immutable Tuples instances can also be submitted to an output port using StreamingOutput.submit(). When an input port's schema matches an output port, the Tuple reference that is passed into the process() method can be submitted directly to the output port. Otherwise, the interface StreamSchema provides a number of utility methods to generate Tuple instances specific to its schema.

Here is an example of a process method in an operator that shows the use of methods on Tuple and OutputTuple.

@Override
public void process(StreamingInput stream, Tuple tuple) throws Exception {
      OutputTuple outputTuple = getOutput(0).newTuple();

      // Copy across all matching attributes.
      outputTuple.assign(tuple);

      long timestamp = tuple.getLong(“timestamp”);
      float temperature = tuple.getFloat(“temperature”);

      // calculate data from timestamp & temperature
      double threshold = …

      outputTuple.setDouble(“threshold”, threshold);

      getOutput(0).submit(outputTuple);
}
An operator that is implemented in Java can be written to support arbitrary schemas for input and output port, which allows operators to be implemented in Java that can be used in different streams processing application. Such an operator can adapt itself to a specific invocation by:
  • Using parameters for configuration, which allows for an operator that performs a generic calculation on an integer. Also, double attributes to use a parameter to indicate which attributes in the input stream to perform the calculations on.
  • Using the stream definitions available from the OperatorContext to determine its action. For example, calculations on any attribute whose name ended in _Sample.
  • Imposing requirements on the schema of the stream. For example, an operator that calculates the attributes price and volume and copies all other attributes unchanged to the output stream.
  • Coping with arbitrary stream schemas. For example, an operator that produces a generic representation of tuples that include name-value pairs for each attribute, such as an XML representation or an HTTP query.

More detailed descriptions of the APIs are provided in the com.ibm.streams.operator Javadoc.