IBM Streams 4.2

Implementing a source operator with the Java Operator API

A source operator fetches information or events from an external system, such as a sensor, messaging system, or a database, and presents them as a stream. If such a system provides a Java™ API, then you can implement a source operator for that system in Java.

Before you begin

Ensure that you are familiar with the Java Operator API and the external system's Java API, typically by being familiar with the appropriate Javadoc.

About this task

The general function of a source operator is to produce a stream of information from an external data source, such as sensor readings or an external database. Since the data is coming from an external source the Operator implementation uses Java threads or tasks to fetch data, instead of processing the arrivals of tuples though the Operator.process() method. The threads or tasks fetch data from the external source and then populate tuples corresponding to data items before it submits tuples to the output ports.

Procedure

  1. Implement your Operator's initialize() method. This method typically sets up the Operator's state with the information to communicate with the external data source, such as loading a JDBC driver and creating the JDBC connection URL for an external database using JDBC. The information that describes the state might come from parameters that are supplied to the operator invocation, or external configuration files or a combination of the two.
    Note: The initialize() method typically does not fetch information from the external data source, as tuples cannot be submitted to output ports until allPortsReady() is called.
  2. Implement your Operator's allPortsReady() method. The allPortsReady() method indicates that the output ports are ready for submitting tuples to them. Thus it is typically used as the signal to start fetching data from the external source. The allPortsReady() method takes the operator's state that was set up in the initialize method and start additional threads using OperatorContext.getThreadFactory() or schedule background tasks using OperatorContext.getScheduledExecutorService() to extract data from the external sources. Data is not fetched from within the allPortsReady() method as it is a notification from the IBM® Streams instance that the ports are ready for output and the IBM Streams instance is expecting the method to return without blocking.
  3. Decide whether to use threads or tasks. The decision on whether to use threads or tasks is application-specific and depends on how data is to be fetched from the external data source. Use a task if the data source is to be polled periodically, or start a thread if the thread must block and wait for data from the external source. For more information about polling and blocking source Java operator, see Implementing a polling source operator using the Java Operator API and Implementing a blocking source operator with the Java Operator API.
  4. If the data source is finite, for example, a complete file, then send final punctuation to the source operator when it reads all the data.
  5. Implement your Operator's shutdown() method. The shutdown method must close any outstanding resources, such as open file streams or JDBC connections. The shutdown processing by the IBM Streams instance will interrupt any threads or tasks after the Operator.shutdown() method returns, but it might be clearer and cleaner for the shutdown() method to explicitly end any background activity it started in its allPortsReady() method.
    Note: The shutdown() method might be called at anytime due to early termination of the application that is running the operator, and thus your fetching of data must be interruptible to ensure that the shutdown request is successful.

Results

The result of this task is an implementation of Operator that reads data from an external source.