A source operator fetches information or events from an
external system, such as a sensor, messaging system, or a database,
and presents them as a stream. If such a system provides a Java™ API, then you can implement
a source operator for that system in Java.
Before you begin
Ensure that you are familiar with the Java Operator API and the external system's Java API, typically by being familiar
with the appropriate Javadoc.
About this task
The general function of a source operator is to produce a
stream of information from an external data source, such as sensor
readings or an external database. Since the data is coming from an
external source the
Operator implementation uses Java threads or tasks to fetch data,
instead of processing the arrivals of tuples though the
Operator.process() method.
The threads or tasks fetch data from the external source and then
populate tuples corresponding to data items before it submits tuples
to the output ports.
Procedure
- Implement your Operator's initialize() method. This method typically sets up the Operator's
state with the information to communicate with the external data source,
such as loading a JDBC driver and creating the JDBC connection URL
for an external database using JDBC. The information that describes
the state might come from parameters that are supplied to the operator
invocation, or external configuration files or a combination of the two.
Note: The initialize() method
typically does not fetch information from the external data source,
as tuples cannot be submitted to output ports until allPortsReady() is
called.
- Implement your Operator's allPortsReady() method. The allPortsReady() method indicates that the
output ports are ready for submitting tuples to them. Thus it is typically
used as the signal to start fetching data from the external source. The allPortsReady() method takes
the operator's state that was set up in the initialize method
and start additional threads using OperatorContext.getThreadFactory() or
schedule background tasks using OperatorContext.getScheduledExecutorService() to
extract data from the external sources. Data is not fetched from
within the allPortsReady() method as it is a notification
from the IBM® Streams
instance that
the ports are ready for output and the IBM Streams
instance is
expecting the method to return without blocking.
- Decide whether to use threads or tasks. The decision on whether to use threads or tasks is application-specific and depends on how data is to be fetched from the external data source. Use a task if the data source is to be polled periodically, or start a thread if the thread must block and wait for data from the external source. For more information about polling and blocking source Java operator, see Implementing a polling source operator using the Java Operator API and Implementing a blocking source operator with the Java Operator API.
- If the data source is finite, for example, a complete file,
then send final punctuation to the source operator when it reads all
the data.
- Implement your Operator's shutdown() method. The shutdown method must close any outstanding resources, such
as open file streams or JDBC connections. The shutdown processing
by the IBM Streams
instance will
interrupt any threads or tasks after the Operator.shutdown() method
returns, but it might be clearer and cleaner for the shutdown() method
to explicitly end any background activity it started in its allPortsReady() method.
Note: The shutdown() method might be called
at anytime due to early termination of the application that is running
the operator, and thus your fetching of data must be interruptible
to ensure that the shutdown request is successful.
Results
The result of this task is an implementation of Operator that reads
data from an external source.