IBM Streams 4.2
Implementing a blocking source operator with the Java Operator API
An external data source might require that data is fetched using a blocking style. With blocking, the API for the data source is called repeatedly, which might block processing while data is fetched.
Before you begin
About this task
Procedure
Results
Example
public synchronized void initialize(OperatorContext context) throws Exception {
super.initialize(context);
/*
* Create the thread at initialize time but do not start it.
* The thread will be started by startProcessing() which will
* be called at allPortsReady() time if no delay is specified,
* otherwise it will be called once the delay has expired.
*/
processThread = getOperatorContext().getThreadFactory().newThread(
new Runnable() {
@Override
public void run() {
try {
process();
} catch (Exception e) {
trace.log(TraceLevel.ERROR, e.getMessage(), e);
}
}
});
/*
* Set the thread not to be a daemon to ensure that the SPL runtime
* will wait for the thread to complete before determining the
* operator is complete.
*/
processThread.setDaemon(false);
}
The implementation of Runnable that
calls process is provided by an anonymous inner class that calls process() and
logs any thrown exception to the error log. The thread is set to
a non-daemon thread to ensure that the IBM Streams
instance waits
for the thread to complete before it marks the operator as complete.
When initialize() completes, the thread exists but
is not started, ProcessTupleProducer then implements startProcessing() to
start the thread during or after allPortsReady() (if
a delay was specified) is called.protected synchronized void startProcessing() {
processThread.start();
}
ProcessTupleProducer thus provides
the framework for subclasses to implement the no-argument process() to
loop and fetch data. SystemPropertySource has a simple process() method
that fetches the Java system
properties and submits a tuple for each property.protected void process() throws Exception {
final StreamingOutput<OutputTuple> out = getOutput(0);
for (Enumeration<?> e = System.getProperties().propertyNames();
e.hasMoreElements(); )
{
String name = (String) e.nextElement();
String value = System.getProperty(name);
OutputTuple tuple = out.newTuple();
tuple.setString("name", name);
tuple.setString("value", value);
out.submit(tuple);
}
// Finite set of properties so send a final mark.
out.punctuate(Punctuation.FINAL_MARKER);
}
Since the properties are a finite set, the operator
submits a final punctuation mark when all the properties are submitted
as tuples and then returns, which completes the thread and the operator.What to do next
- Look at the complete source for ProcessTupleProducer and SystemPropertySource.
- Implement your own operator using a Thread or by extending ProcessTupleProducer.