IBM Streams 4.2

Stateful primitive operators that participate in consistent regions

To have primitive operators that restore their internal states during a reset of a consistent region, use the StateHandler interface. You can write C++ and Java™ primitive operators that checkpoint and restore their internal states by implementing the StateHandler interface. The IBM® Streams instance automatically calls the methods of this interface when the operator needs to drain, checkpoint, and reset its state. If the operator persists and restores its internal state on these callbacks, the state of the operator is consistent with its input and output streams and the state of other operators in the consistent region.

StateHandler interface

Start operators of a consistent region replay tuples when the region resets after a failure. This behavior implies that downstream operators receive repeated tuples after a reset. If the operators are stateful, the same tuple might be applied to the operator state. If this action does not meet the requirements of the application, primitive operators can persist and restore their states after a drain or during a reset.

A drain and a reset have sequence numbers. If the persisted state is associated with these sequence numbers, the state is consistent with the state of other operators that use the same sequence numbers. The state is also consistent to all an operator's input and output streams at the point after the drain. If the operator state is restored during a reset, the operator does not need to handle duplicates because restoring the state is as if all the tuples that were sent after the last successful drain (the duplicates) are discarded.

The StateHandler interface enables operators to persist and restore their states to be consistent with other operators in the region. The logical division of the output streams of operators is established after the drain() callback in the SPL StateHandler interface returns. Any tuple or punctuation that is submitted to output ports before or during the drain method call is processed by the consistent region before the successful conclusion of the drain. If the drain callback is not implemented by an operator, the logical division is established automatically by the IBM Streams instance. The IBM Streams instance calls the checkpoint() and reset() callbacks at the appropriate time.

Consistent region permits

If the primitive operator has background threads that are submitting tuples or punctuation, operators must acquire tuple or punctuation submission permits by creating a ConsistentRegionPermit. These permits can be created using the ConsistentRegionContext. The permits enable the operator to pause tuple or punctuation submissions during a checkpoint or a reset. Pausing tuple submission ensures that the internal state of the operator is consistent with its input and output streams and the state of other operators in the consistent region. Enabling threads to pause and resume submission requires writing nontrivial thread coordination code that involves mutual exclusion and conditional variables. Acquiring a tuple or punctuation submission permit omits the need to write that coordination code.

Sample C++ primitive operator

This example shows a providesSingleThreadedContext=Always operator, where tuples are submitted only from process(Tuple) and process(Punctuation) functions.

You can find the sample operator in the installation directory at $STREAMS_INSTALL/samples/spl/feature/ConsistentRegion/sample/TupleCounter. This operator maintains an integer to count the number of tuples that were processed by the operator (class member numTuples_). If checkpointing and resetting code is not implemented by the operator, then the following outcomes can occur:
  • On the failure of other operators, the TupleCounter operator over counts tuples on a replay.
  • On the failure of the operator itself, the TupleCounter operator loses the value of the counter and under counts tuples.
By doing the checkpoint and reset, the number of counted tuples is exact.

Header file

In the following code sample, the header file includes only the methods that are related to checkpointing when the operator is in a consistent region. This behavior is achieved by using the Perl code generation APIs.
<%
   my $isInConsistentRegion = 
     $model->getContext()->getOptionalContext("ConsistentRegion");
   my @includes;
   if ($isInConsistentRegion) {
     push @includes, "#include <SPL/Runtime/Operator/State/StateHandler.h>";
   }
   SPL::CodeGen::headerPrologue($model, \@includes);
%>

The checkpoint and reset virtual methods are inherited from the StateHandler interface. These methods are parameterized with a reference to an object of the Checkpoint class. This reference can be used to serialize and deserialize operator state into a configured checkpoint backend store. The resetToInitialState() method is used to reset the operator state to the state of the operator after the Operator::allPortsReady() routine is invoked. This method is used when the consistent region must reset before the completion of the first successful consistent state.

The following code sample implements the drain(), the checkpoint(), the reset(), and the resetToInitialState() methods.
class MY_OPERATOR : public MY_BASE_OPERATOR
<%if ($isInConsistentRegion) {%>
  , public StateHandler
<%}%>{
public:
  ...

  <%if ($isInConsistentRegion) {%>
    // Callbacks from StateHandler.h
    virtual void drain();
    virtual void checkpoint(Checkpoint & ckpt);
    virtual void reset(Checkpoint & ckpt);
    virtual void resetToInitialState();
  <%}%>
private:
  uint32_t numTuples_;
  Mutex mutex_;
  };

C++ file

Similar to the header file, as shown in the following code sample, the C++ file uses the code generation API to conditionally compile code that is used only when the operator is in a consistent region.
...
<%
   my $isInConsistentRegion = 
     $model->getContext()->getOptionalContext("ConsistentRegion");
%>
On the constructor, the operator must register the object implementing the StateHandler interface. In this example, the object is the operator itself, making the registration step optional.
MY_OPERATOR::MY_OPERATOR()
    : numTuples_(0)
{
  <%if ($isInConsistentRegion) {%>
    getContext().registerStateHandler(*this);
  <%}%>
}
On the process function, the operator updates its internal state and submits a resulting tuple.
void MY_OPERATOR::process(Tuple const & tuple, uint32_t port)
{
  AutoPortMutex am(mutex_, *this);
  numTuples_++;
  OPort0Type otuple(tuple.getAttributeValue(0), numTuples_);
  submit(otuple, 0);
}
The drain method is empty, as there are no pending actions to be done by the operator prior to the establishment of a consistent state. The checkpoint and reset methods serialize (checkpoint) and deserialize (reset) the state of the operator into and from the checkpoint backend.
<%if ($isInConsistentRegion) {%>
void MY_OPERATOR::drain()
{
}

void MY_OPERATOR::checkpoint(Checkpoint & ckpt)
{
  AutoMutex am(mutex_);
  ckpt << numTuples_;
}

void MY_OPERATOR::reset(Checkpoint & ckpt)
{
  AutoMutex am(mutex_);
  ckpt >> numTuples_;
}
The resetToInitialState method resets the operator to its initial state.
void MY_OPERATOR::resetToInitialState
{
    AutoMutex am(mutex_);
    numTuples_ = 0;
}
<%}%>

For information about how to create a sample application that uses the TupleCounter primitive operator, see the sample::StatefulPrimitive sample application that is located at $STREAMS_INSTALL/samples/spl/feature/ConsistentRegion/sample/.