IBM Streams 4.2

Implementing an operator using the Java Operator API

An operator processes incoming tuples and submits outgoing tuples. The relationship between an input tuple and the submitted tuples varies according to the function of the operator.

For example, a filtering operator can submit a subset of the incoming tuples without modifying individual tuples. Another operator might submit a tuple to an output port that is derived from an input tuple but not a direct copy, such as enriching it or taking a subset of the input tuple's attributes.

Incoming tuples are passed to a Java™ operator as invocations of the process() method. Passing in the StreamingInput reference describes the port that the tuple arrived on and a reference to the tuple itself, as an immutable Tuple. Any Operator implementation must be thread-safe, which means that the process() method can be called concurrently by different threads for tuples that arrive on the same or different ports.

Tuples are submitted to output ports using the submit() method of the StreamingOutput reference for the required port. A StreamingOutput reference is obtained though the getStreamingOutputs() method of OperatorContext. Also, the AbstractOperator class provides direct access based on the output port number.

StreamingOutput.submit() is overloaded to accept a Tuple or an OutputTuple reference, thus allowing an incoming tuple to be directly submitted to an output port if the input and output ports have the same schema.

The following is an example of a process() method implementation that filters and submits any incoming Tuple to a single output port

@Override
public void process(StreamingInput<Tuple> port, Tuple tuple) throws Exception {
        
  final StreamingOutput<OutputTuple> output = getOutput(0);

  // Submit any tuple with a reading greater
  // than 0.8
  if (tuple.getFloat("reading") > 0.8)
      output.submit(tuple); 
}

If the input and output ports have a different schema, then your code must create a tuple instance that matches the schema. Typically, a mutable OutputTuple instance is created and then its attributes are set as required.

The OutputTuple.assign() method sets attributes in the tuple from the passed-in tuple for those attributes that match, that is, have a matching name and SPL type. Here the assign() method is used to copy fields from the input tuple to the output tuple. Any attributes that are not otherwise explicitly set are set to their default values.

// Submit any tuple with a sensor reading greater than 0.8
if (tuple.getFloat("sensorReading") > 0.8) {
    OutputTuple outputTuple = output.newTuple();
    outputTuple.assign(tuple);
    output.submit(outputTuple);
}

Thus this example behaves as the previous example, but allows the output port schema to be different from the input port's schema.

OutputTuple has a number of setter methods to allow individual attributes to be set. Thus it allows the outgoing tuple to be enriched with other data or data that is derived from the input tuple's data. Here the example is expanded to add a time stamp and a city name that is based on the incoming tuple's postal code to the outgoing tuple. Thus the expectation is that the output port's schema has a number of attributes in common with the input port schema and adds two attributes, ts and city.

// Submit any tuple with a reading greater than 0.8
if (tuple.getFloat("reading") > 0.8) {
    OutputTuple outputTuple = output.newTuple();
    outputTuple.assign(tuple);

    outputTuple.setTimestamp("ts", Timestamp.currentTime());
    outputTuple.setString("city",Location.getCity(tuple.getString("zipcode)")));
    output.submit(outputTuple);
}

The StreamSchema interface has a number of methods to create immutable Tuple objects from other objects include from Maps, Lists, and other Tuple objects. These methods can also be used to create tuples to be submitted to output ports.

A number of sample classes are provided to demonstrate operators that are implemented in Java:
  • com.ibm.streams.operator.samples.operators.DecimalScaleSetter: Operator that submits a tuple for every input tuple, modifying a single decimal attribute by setting its scale (see java.math.BigDecimal.scale()).
  • com.ibm.streams.operator.samples.patterns.TupleInTupleOut: An abstract pattern class that exposes filtering and transformation to a subclass but hides the mechanics of tuple reception and submission. For each incoming tuple, the filter() method of the subclass is called and if it returns false no further processing for that tuple is done. If true is returned, then transform() is called. Transform allows the subclass to set the attributes of the outgoing tuple and return true to deliver the tuple to the output port.
  • com.ibm.streams.operator.samples.operators.Regex: Operator that extends the TupleInTupleOut class to filter with regular expressions from the standard java.util.regex package.