IBM InfoSphere Streams Version 4.1.0

Primitive C++ operators with windows in consistent regions

To checkpoint and reset the state of operators that use the C++ windowing library in a consistent region, operators must call window state management functions.

Window API functions

The C++ windowing library provides the following functions to support state management within consistent regions.

virtual void drain();
Drain outstanding window processing. Wait until pending tuple eviction and window trigger actions finish.
virtual void checkpoint(Checkpoint & data);
Checkpoint the window state into the specified object. The window state includes window contents and state that is associated with the eviction and trigger policies.
virtual void reset(Checkpoint & data);
Restore the window to the state read from the specified checkpoint.
virtual void resetToInitialState();
Restore the window to the initial state.

Window events

A window can be associated with one or more listeners that implement the WindowEvent interface (two listeners cannot receive the same event). When a window checkpoints, resets, or resets to its initial state, one single listener, which is registered as the window’s serialization handler, receives events immediately after the window checkpoints, resets, or resets to its initial state. The WindowEvent interface contains the following functions.

virtual void onCheckpointEvent(Checkpoint & ckpt) const {}
This event is fired after the window is saved to the specified checkpoint. The event handler is responsible for writing its state to the checkpoint stream.
virtual void onResetEvent(Checkpoint & ckpt) {}
This event is fired after the window's state is read from the specified checkpoint. The event handler is responsible for reading its state, which was saved to the checkpoint stream.
virtual void onResetToInitialStateEvent() {}
This event is fired after the window's state is initialized. The event handler is responsible for initializing its state.

C++ primitive operator with window

State handler
C++ operators that use windows require a StateHandler implementation to drain, checkpoint, and reset the windows. The following code sample illustrates how the StateHandler implementation invokes the window API.
void MY_OPERATOR::drain() {
    . . .
    _window.drain();
}

void MY_OPERATOR::checkpoint(Checkpoint & ckpt) {
    . . .
    _window.checkpoint(ckpt);
}

void MY_OPERATOR::reset(Checkpoint & ckpt) {
    . . .
    _window.reset(ckpt);
}

void MY_OPERATOR::resetToInitialState() {
    . . .
    _window.resetToInitialState();
}
Tumbling window summarizer
A tumbling window summarizer can be used to remove the need for the windowing library to retain all the tuples in a tumbling window. A summarizer can contain state, therefore it needs to save or load the state when the window checkpoints or resets. The TumblingWindowSummarizer interface contains the following events.
virtual void onCheckpointEvent(Checkpoint & ckpt) {}
This event is fired when the current window’s state checkpoints. Write the summarizer code to serialize its state into the specified checkpoint stream.
virtual void onResetEvent(Checkpoint & ckpt) {}
This event is fired when the current window is restored to the state that is provided by the specified checkpoint. Write the summarizer code to restore its state by reading from the specified checkpoint stream.

A window re-creates its summarizers when it is reset to its initial state.

The following example illustrates how to update a tumbling window summarizer that calculates the count of windowed tuples to participate in consistent regions:

#define MY$OP MY_OPERATOR_SCOPE::MY_OPERATOR
struct MY_OPERATOR::MyCountSummarizer : public  
            SPL::TumblingWindowSummarizer<MY$OP::IPort0Type,MY$OP::PartitionByType>  {
  MY$OP& operator_;
  uint64_t count_;

  MyCountSummarizer(SPL::Operator& oper): operator_(static_cast<MY$OP&>(oper)) {
    Count_ = 0;
  }

  void onTupleInsertionEvent(MY$OP::IPort0Type const& tuple) {
      count_++;
  }

  void onCheckpointEvent(SPL::Checkpoint & ckpt) const {
      ckpt << count_;
  }

  void onResetEvent(SPL::Checkpoint & ckpt) {
      ckpt >> count_;
  }
};

Partition type helper class
The PartitionType helper class generated that uses the emitClass function within the SPL::CodeGen module provides serialization support.