IBM Streams 4.2

Implementing a blocking source operator with the Java Operator API

An external data source might require that data is fetched using a blocking style. With blocking, the API for the data source is called repeatedly, which might block processing while data is fetched.

Before you begin

Ensure that you are familiar with the Java™ Operator API and the external system's Java API, typically by being familiar with the appropriate Javadoc.

About this task

A typical access pattern for a Java data access API is to have your code call methods in the API that return data so that the caller's thread can be blocked in the method that is waiting for data. Examples of such APIs are fetching rows from a JDBC ResultSet where the underlying driver might block waiting for a response from the database, or thejava.io.InputStream class where a reader might block waiting from data from a file or the network. Within a Java operator, the approach for using such APIs is to have a thread for fetching data from the API in a loop and submitting tuples that are based on the data. This pattern also applies for cases where blocking might not be involved but the API still requires repeated calls to fetch the data. Alternative data access APIs are polling where the Java operator periodically polls the data source for new information.

Procedure

  1. Implement the Operator.initialize() method. For more information, see Implementing a source operator with the Java Operator API.
  2. Implement the Operator.allPortsReady() method. This method creates an implementation of java.lang.Runnable that accesses the external data source API and submit tuples. It also creates and starts one (or more) Java threads using OperatorContext.getThreadFactory(), passing an instance of the Runnable implementation. Use of the Operator's ThreadFactory ensures that the threads have the right context (including the context class loader) and that the threads are seen by the IBM® Streams instance. Starting threads any other way can lead to early termination of the operator as the IBM Streams instance might determine that no outstanding work remains.
  3. If the data source is finite, then ensure that the runnable implementation submits a final punctuation mark and cleans up any resources before it returns (which leads to completion of the thread).
  4. Implement your Operator's shutdown() method using either of the following options:
    • Handling java.lang.InterruptedException exceptions that are thrown
    • Checking to see whether the operator is shut down using the OperatorContext.shutdownRequested() method if the external data access API does not support being interrupted. For example, checking once per-loop.

Results

The result of this task is a Java operator implementation that reads data from an external task using a Java thread.

Example

The sample operator com.ibm.streams.operator.samples.sources.SystemPropertySource is a simple source operator that uses a Thread to read Java system properties and deliver them as a stream of two ustring attributes, name and value. SystemPropertySource extends the sample pattern abstract class ProcessTupleProducer, which allows subclasses to implement a process() method to fetch data and submit tuples, thus hiding the details of creating a Thread and implementing Runnable. This use of a single process() method is similar to the C++ operator runtime API style of using a process() function as the entry point for one or more threads. ProcessTupleProducer itself extends TupleProducer to allow a delay to be specified before tuples are submitted. Looking at the ProcessTupleProducer.initialize() method, see that it creates a new thread using the operator's ThreadFactory and that thread calls the abstract no-argument process() method when it is run.
public synchronized void initialize(OperatorContext context) throws Exception {
        super.initialize(context);
        
        /*
         * Create the thread at initialize time but do not start it.
         * The thread will be started by startProcessing() which will
         * be called at allPortsReady() time if no delay is specified,
         * otherwise it will be called once the delay has expired.
         */
        processThread = getOperatorContext().getThreadFactory().newThread(
                new Runnable() {

                    @Override
                    public void run() {
                        try {
                            process();
                        } catch (Exception e) {
                            trace.log(TraceLevel.ERROR, e.getMessage(), e);
                        }
                    }

                });
        
        /*
         * Set the thread not to be a daemon to ensure that the SPL runtime
         * will wait for the thread to complete before determining the
         * operator is complete.
         */
        processThread.setDaemon(false);
    }
The implementation of Runnable that calls process is provided by an anonymous inner class that calls process() and logs any thrown exception to the error log. The thread is set to a non-daemon thread to ensure that the IBM Streams instance waits for the thread to complete before it marks the operator as complete. When initialize() completes, the thread exists but is not started, ProcessTupleProducer then implements startProcessing() to start the thread during or after allPortsReady() (if a delay was specified) is called.
protected synchronized void startProcessing() {
        processThread.start();
    }
ProcessTupleProducer thus provides the framework for subclasses to implement the no-argument process() to loop and fetch data. SystemPropertySource has a simple process() method that fetches the Java system properties and submits a tuple for each property.
protected void process() throws Exception {
        
        final StreamingOutput<OutputTuple> out = getOutput(0);
        for (Enumeration<?> e = System.getProperties().propertyNames();
            e.hasMoreElements(); )
        {
            String name = (String) e.nextElement();
            String value = System.getProperty(name);
            
            OutputTuple tuple = out.newTuple();
            
            tuple.setString("name", name);
            tuple.setString("value", value);
            
            out.submit(tuple);
        }
        
        // Finite set of properties so send a final mark.
        out.punctuate(Punctuation.FINAL_MARKER);
    }
Since the properties are a finite set, the operator submits a final punctuation mark when all the properties are submitted as tuples and then returns, which completes the thread and the operator.

What to do next

  • Look at the complete source for ProcessTupleProducer and SystemPropertySource.
  • Implement your own operator using a Thread or by extending ProcessTupleProducer.