IBM Streams 4.2

Understanding threading in a Java operator

A Java™ operator must always be written to be threadsafe as its methods might be called by different threads and might be called concurrently. Standard Java threading concepts apply including synchronization, locking, and visibility.

Operator overview

An Operator can be viewed as an event processor, handling five event types:
  • Operator initialization: Operator.initialize()
  • Port ready notification: Operator.allPortsReady()
  • Process a tuple: Operator.process()
  • Process a punctuation mark: Operator.processPunctuation()
  • Operator shutdown: Operator.shutdown()

From a threading point of view, take the approach that any of these events can occur at any time (subject to the lifecycle of the operator). They can be invoked by any thread, and they can occur concurrently.

Different threads calling the Operator's methods

The operator's methods that are called by different threads relates to the visibility of the Operator's state. Where visibility is defined by the Java Memory Model (JMM). JMM provides the semantics on how a thread is guaranteed to see changes in state that is made by other threads.

The simplest approach to ensure visibility of state changes across threads is to synchronize access to the state using Java synchronization. For example, in this code extract, the threshold instance field (part of the Operator's state) is set while the Operator's synchronization is held, and read through the getThreshold() method, which also synchronizes on the same Operator object. Any thread that reads threshold after initialize() completes is guaranteed to see the correct initialized value.

private int threshold;

@Override
public synchronized void initialize(OperatorContext context) throws Exception {
      super.initialize(context);
      threshold = calculateThreshold(context);
}

private synchronized int getThreshold() {
      return threshold;
}

Concurrent operator method calls

Except for the operator initialization event, all the other events can occur concurrently with each other, which lead to these methods called concurrently by different threads due to (at least) the following reasons:

  • Operator.allPortsReady():
    • Because the port ready notification indicates the operator's ports are ready to process and submit tuples, tuples and punctuation can arrive during invocation of allPortsReady(). Thus Operator.process() and processPunctuation() might be called during allPortsReady(). For example, an upstream operator that is fused in the same processing element might submit tuples to this operator during its port ready notification handling.
  • Operator.process() and processPunctuation()
    • The process() and processPunctuation() methods handle all input ports for the operator. Thus tuples and punctuation marks that arrive on different ports can occur concurrently.
    • Even within a single input port, tuples and punctuation marks might also arrive concurrently, due to fused upstream operators that submit tuples and punctuation marks concurrently.
  • Operator.shutdown()
    • A shutdown request can occur at any time, such as a request to stop a PE or cancel a job. Thus the shutdown() can occur while the operator is processing tuples, punctuation marks, or even during port ready notification.

Since the methods can be called concurrently, they must be threadsafe with respect to data integrity and state visibility. The specific synchronization needs are driven by the functional and performance requirements of the operator implementation. Some points to note are as follows:

  • Use of a single synchronization reference, for example, by synchronizing making the event-related methods, guarantees visibility and single threading through the operator but can restrict performance.
  • Holding synchronization or a Java lock while it makes a StreamingOutput.submit() call is generally not recommended becausesubmit() can take significant time to run compared to the time spent processing the tuple in the operator. The operator might be fused and thus the submit() calls directly into the processing of downstream operators, which can block or perform heavyweight operations against the submitted tuple.
  • If strict ordering of output tuples is required, then either a lock is required across the submit method, or the operator must require single threading for tuple processing or tuple submission
  • In addition to the synchronized keyword, Java SE provides a number of classes for multithreaded programming in the java.util.concurrent, java.util.concurrent.atomic, and java.util.concurrent.locks packages.
The sample operator implementation com.ibm.streams.operator.samples.operators.PortSequencer demonstrates synchronization on a per-input port basis while it creates a port specific sequence number. The OrderedPortSequencer extends PortSequencer to guarantee that the submission of tuples from the operator is to be in the sequence number order within a port. The class uses a Java lock instead of Java synchronization. Both of these classes, including their Java source, are in the $STREAMS_INSTALL/lib/com.ibm.streams.operator.samples.jar.