IBM Streams 4.2

Implementing a tumbling window operator with the Java Operator API

When an operator implemented with the Java™ Operator API is invoked, any window configurations for its input ports are made available through the instance of the StreamWindow interface that is accessed by StreamingInput.getStreamWindow(). Every input port has its own instance of StreamWindow, even when the input port is not windowed.

For more information about windowing, seeWindow handling. For more information about Java Operator API, see Java Operator API overview.

For a tumbling window, StreamWindow.getType() returns the enumeration StreamWindow.Type.TUMBLING, and StreamWindow.getEvictionPolicy() returns the tumbling window eviction policy.

User code acts upon window events by implementing a StreamWindowListener and typically a listener implementation can be written to be independent of the eviction policy. The StreamWindowListener implementation must be registered using StreamWindow.registerListener at Operator.initialize() time to ensure that all tuples are seen by the listener. For complete flexibility, tuples are delivered to the listener as window events and then also passed to the Operator using the process() method. A custom Java operator typically, though, needs only to handle tuples using the StreamWindowListener, such as an aggregator for a single input port. In this case, the Operator can extend from AbstractWindowOperator, which enforces that all tuples and punctuation are handled through the registered StreamWindowListener. The following examples all assume AbstractWindowOperator as a super class.

All window handling in the Java Operator API is through events on the window, where each event is a StreamWindowEvent with a specific type, enumerated by StreamWindowEvent.Type. For tumbling windows, three events can be passed to the StreamWindowListener:
  • INSERT: One or more tuples are being inserted into the window.
  • EVICT: All tuples in the window are being evicted, thus (typically) the listener needs to submit any tuples that are derived from the window state and then reset its state to an empty window.
  • FINAL: Final punctuation is received on the input port and thus no more tuples arrive. Even though the final punctuation is received, the window might not be tumbled according to its eviction policy. It is up to the specific operator or listener implementation as to how to handle this situation.

Here is an example that walks through building an aggregator operator in Java that determines the maximum value of an integer attribute in a window, and submits a single tuple on eviction that contains a single attribute with the maximum value. The general framework for the listener is as follows:

public class TumbleMaxAggregator implements StreamWindowListener<Tuple> {

  private int tupleCount;
  private int maxReading;

  // Single output port
  private final StreamingOutput<OutputTuple> output;

  @Override
  public synchronized void handleEvent(StreamWindowEvent<Tuple> event) throws Exception {

    switch (event.getType()) {
      case INSERTION:
           // handle insertion of tuples into the window
           break;
      case EVICTION:
           // handle the tumble
           break;
      case FINAL:
           // handle final mark
           break;
      }
  }
}

Since this example is a listener that supports tumbling windows only, it needs to handle three types of events.

INSERTION Event

For the INSERTION event, the code is simple, iterate through all the tuples that are inserted and if there is a higher maximum than the current state, then update it.
case INSERTION:
  // handle insertion of tuples into the window
  for (Tuple tuple : event.getTuples()) {
    float reading = tuple.getFloat("reading");
    if (tupleCount == 0)
        maxReading = reading;
    else if (reading > maxReading)
        maxReading = reading;
    tupleCount++;
  }
  break;

EVICTION Event

For the EVICTION event, an output tuple is created, populated with the maximum reading, and then submitted, but only if there is a tuple during this tumble. An empty window occurs when there is a time-based eviction policy and no tuples arrived during the time period. By convention, a window punctuation mark is submitted after the window tumble. Finally, the aggregator's state is reset for the next window.
case EVICTION:
  // handle the tumble
  if (tupleCount != 0) {
      OutputTuple tuple = output.newTuple();
      tuple.setFloat("maxReading", maxReading);
      output.submit(tuple);
      output.punctuate(Punctuation.WINDOW_MARKER);
      tupleCount= 0;
      maxReading = 0;
  }
  break;

FINAL Event

For the FINAL event, there are two implementation choices. First do nothing that would be handled by:
case FINAL:
  // handle final mark, do nothing
  break;

Or treat the final mark as an eviction event and submit using the latest (and last) maximum value, which would be handled by having the FINAL and EVICTION types share code:

case EVICTION:
case FINAL:
  // handle the tumble
  // treat end of the data as a tumble
  ...

There is a working simple maximum value aggregator, but to have it be run by a streams processing application, it must be used in the context of a Java operator. First, the TumbleMaxAggregator requires an output port to be specified to indicate where to submit tuples to, so it can be passed in the constructor.

public TumbleMaxAggregator(StreamingOutput<OutputTuple> output) {
  this.output = output;
}

Then, an operator is needed to registers this listener:

public class MaxAggregator extends AbstractWindowOperator {
    
  @Override
  public void initialize(OperatorContext context) throws Exception {
    super.initialize(context);

    getInput(0).getStreamWindow().registerListener(
                new TumbleMaxAggregator(getOutput(0)), false);
    }
}