IBM Streams 4.2

Implementing a sliding window operator with the Java Operator API

A sliding window differs from a tumbling window by having a trigger policy in addition to an eviction policy.

Window events are handled through the same mechanism of a StreamWindowListener, but two additional events are created to make a total of five:
  • INSERT: One or more tuples are being inserted into the window.
  • INITIAL_FULL: The window is full for the first time. This event allows listeners to delay submitting tuples that are based on the window state for trigger events that occur before the first time a window is considered full. For example, with an eviction policy of count(10) and trigger policy count(2), there are four trigger events before the window is considered full, and thus aggregation might not be applicable.
  • TRIGGER: The window is being triggered according to its trigger policy. Listeners typically submit tuples that are based on an aggregated state of the tuples that are currently in the window.
  • EVICT: Tuples in the window are being evicted, thus (typically) the listener needs to update its window state that is based on the tuples that are removed.
  • FINAL: Final punctuation is received on the input port and thus no more tuples arrive. Even though the final punctuation is received, the window might not ever been full or reached a trigger according to its policies. It is up to the specific operator or listener implementation as to how to handle this situation.
The Javadoc for StreamWindowListener.Type contains details on the ordering of events for the possible combinations of policy types.

Implementations of StreamWindowListener can be written to be independent of the types or sizes of eviction and trigger policies. Instead, just generically handle the events that indicate tuples are evicted and a trigger occurred. You can implement StreamWindowListener directly, as in the Implementing a tumbling window operator with the Java Operator API, or subclass StatefulWindowListener, which provides typed state management through use of Java generics. A sample implementation of StatefulWindowListener is provided which aggregates the sum of a decimal attribute and count of tuples in a window, supporting tumbling and sliding windows. The class is com.ibm.streams.operator.samples.windows.DecimalSumAggregatorListener. The DecimalSumAggregatorListener maintains the sum and count in a static inner class DecimalSumAggregatorListener.Aggregator, thus the class is declared as:

public class DecimalSumAggregatorListener extends
        StatefulWindowListener<DecimalSumAggregatorListener.AggregateInfo, Tuple> {

This example indicates that the state is of typeDecimalSumAggregatorListener.AggregateInfo and it is aggregating Tuple objects. This example then means the method getPartitionState() is typed to return an AggregateInfo.

INSERTION Event

The DecimalSumAggregatorListener handles a sliding window. Then on tuple insertion into the window, the sum and count are incrementally maintained using the AggregateInfo:
case INSERTION:
  /*
   * On insertion update the sum and count with each tuple being
   * inserted by adding its attribute's value to the sum.
   */
  for (Tuple tuple : event.getTuples()) {
    state.sum = state.sum.add(tuple.getBigDecimal(attributeName),mc);
    state.count++;
  }
  break;

This INSERTION logic works for sliding and tumbling windows.

EVICTION Event

For eviction, the logic is different for tumbling and sliding. With a tumbling window, the state is used to submit tuples with the sum and count to the output port. With a sliding window, the state must be incrementally modified by the specific tuples that are evicted:
case EVICTION:
  if (getWindow().getType() == StreamWindow.Type.TUMBLING) {
    // No need to maintain the state incrementally,
    // each tumble will allocate a new state via
    // getInitializedState().
    // Just submit the aggregation.
    submitAggregateOutput(state);
    break;
  }
            
  /*
   * Sliding window only.
   * On eviction update the sum and count with each tuple being
   * removed by subtracting its attribute's value from the sum.
   */
   for (Tuple tuple : event.getTuples()) {
     if (mc != null)
       state.sum = state.sum.subtract(tuple
                            .getBigDecimal(attributeName), mc);
     state.count--;
   }
   break;

TRIGGER Event

When a trigger event occurs (only for sliding window) a tuple is submitted to the output port that contains the sum and count, but only if the window becomes full at least once.
case TRIGGER:
  // Only trigger once a full window has been seen.
  if (!seenInitialFull(event.getPartition()))
    break;

  /**
   * Submit the aggregation to the output port.
   */
  submitAggregateOutput(state);
  break;

Here the class uses the methods of the super-class StatefulWindowListener, which is tracking if the INITIAL_FULL event is seen (on a per-partition basis). The submitAggregateOutput() populates and submits a tuple from the values in the state.

protected void submitAggregateOutput(AggregateInfo state) throws Exception {
        
// Only submit output if the window contains tuples.
  if (state.count == 0)
    return;
        
  OutputTuple tuple = outputPort.newTuple();
  setupOutputTuple(tuple, state);
        
  outputPort.submit(tuple);
  outputPort.punctuate(StreamingData.Punctuation.WINDOW_MARKER);
}

protected void setupOutputTuple(OutputTuple tuple, AggregateInfo state) {
  tuple.setBigDecimal("sum", state.sum);
  tuple.setInt("count", state.count);        
}

To use an implementation of StatefulWindowListener you construct an instance in an Operator's initialize() method, passing in the reference to the StreamWindow it applies to. The StatefulWindowListener itself registers StreamWindowListener with the window. With most implementations, the constructor is passed more information than just the window reference. For example, DecimalSumAggregatorListener has this example as its constructor:

public DecimalSumAggregatorListener(StreamWindow<Tuple> window,
            String attributeName,
            StreamingOutput<OutputTuple> outputPort) {
  super(window);
  …

Thus the use of this code in an operator is shown in this example.

@Override
public void initialize(OperatorContext context) throws Exception {
  super.initialize(context);
  
  String sumAttribute = context.getParameterValues("sumAttribute").get(0);
  
  new DecimalSumAggregatorListener(
          getInput(0).getStreamWindow(),
          sumAttribute, getOutput(0));