IBM Streams 4.2

Implementing a sink operator using the Java Operator API

A sink operator transmits information or events to an external system, such as a dashboard, web-server, mail-server, or a database. When such a system provides a Java™ API, a sink operator to that system can be implemented in Java.

Before you begin

Ensure that you are familiar with the Java Operator API and the external system's Java API, typically by being familiar with the appropriate Javadoc. The Javadoc is available in your Streams installation under the $STREAMS_INSTALL/doc/spl/operator/api/java/api directory, and on the production documentation.

About this task

The general function of a sink operator that is implemented in Java is to translate the information from an incoming tuple into a format that is understood by the external system, and then to transmit the resultant data to the external system using the Java API provided by the system. For example, if you implement a sink to an external database using JDBC, then tuple processing would first set parameters in a JDBC PreparedStatement based on the tuple's attributes values, and then run the PreparedStatement to update the database.

A simple model is that the arrival of each incoming tuple results in data transmitted to the external system in the Operator.process() method. Typically this model might not be sufficient for performance reasons and, instead, batching must be used so that a batch of information that represents multiple tuples is transmitted to the external system in a single operation. For more information about batching, see Batching in a sink operator.

The general approach for writing a sink operator in Java is described here. Some steps include example information for the implementation of a sink operator in Java that implements HTTP POST, where incoming tuples result in HTTP POST operations, such as a web order form. Sample class com.ibm.streams.operator.samples.sinks.HttpPOST is provided.

Procedure

  1. Design the model for transmission of information from tuples to the external system.
    1. Decide whether and how to use batching. In the example, batching is configurable with timeouts and batching is implemented at the tuple level using the facilities of TupleConsumer.
    2. Design how the information from the incoming com.ibm.streams.operator.Tuple object is to be translated into the Java objects that are supported by the external system API. For HTTP POST, each attribute of the tuple is sent as a name, value pair where name is the attribute's name, and value is a textual representation of the attribute's value in the tuple, with standard encoding for HTTP.
  2. Specify the parameters that are required for the operator, such as information to determine the location of the external system and any batching related configuration. For HTTP POST, a url parameter is required to specify the URL for the POST and the TupleConsumer sample class provides parameters to configure batching.
  3. Design error handling for errors that are communicated with the external system.
    1. Handling errors by trying the connection to the external system again, logging the data that cannot be transmitted, or by passing the exception to the IBM® Streams instance, which causes the PE to terminate.
    2. Handling connection errors, such as failure to connect to remote database.
    3. Handling data errors, such as constraint violations for an external database.
  4. Create an initial implementation of com.ibm.streams.operator.Operator, either directly, by extending com.ibm.streams.operator.AbstractOperator, TupleConsumer, or any other existing Operator implementation suitable for the wanted goal.
  5. Implement the Operator.initialize() method, ensuring super.initialize() is called as required. Typically the initialize() method performs the following operations:
    • Fetches and validates the parameters that are passed in from SPL.
    • Sets up connections to the external system.
  6. Implement Operator.process() to process incoming tuples. Typically, the Operator.process() performs the following operations:
    • Implements batching as required
    • Adds the tuple to the batch or processes it immediately
    • Converts information from tuples into the form that is expected by the external system's Java API
    • Transmits the information to the external system.
  7. Implement the logic to process incoming punctuation. If batching is implemented, then the processing of the final punctuation (Punctuation.FINAL_MARKER) must ensure that the batch processing is completed. If the final punctuation is ignored, then the operator might be shut down while a batch is outstanding.
  8. Implement Operator.shutdown() to handle any cleanup, such as closing connections to external systems. If any processing requires asynchronous processing, then ThreadFactory or ScheduledExectorService provided by the OperatorContext must be used. Use of these standard Java interfaces that are provided by the OperatorContext allows the IBM Streams instance to track which operators have outstanding work to be done and thus allow the asynchronous work to be completed when a final punctuation is received.

Results

The result of this task is a Java class that implements a sink operator specific to an external system that may be used in a IBM Streams instance using the JavaOp primitive operator.

Example

The Streams installation includes a number of Java samples, including source in the JAR file com.ibm.streams.operator.samples.jar, in $STREAMS_INSTALL/lib. An example sink operator is provided to send HTTP POST requests based on incoming tuples. This example is broken into two pieces:
  • com.ibm.streams.operator.samples.sinks.HttpPOST shows the logic specific to creating an HTTP POST request from tuples.
  • com.ibm.streams.operator.samples.patterns.TupleConsumer is an abstract class that demonstrates a flexible batching of tuples that is intended for use as a sink operator.

HttpPost extends TupleConsumer: Starting with HttpPost, there are two key methods initialize() and processBatch(). The initialize() method uses the SPL parameter url to determine the URL for the POST request. A connection attempt is not made to validate the URL since the only knowledge is that the URL is suitable for a POST request and a POST request with no data might have unknown effects on the server.

/**
 * URL of the HTTP server we will be posting data to.
 */
private URL url;

/**
 * Initialize by setting the URL and the batch size.
 */
@Override
public void initialize(OperatorContext context) throws Exception {
    super.initialize(context);

    url = getURL();

    setBatchSize(batchSize());
}

/**
 * Get the URL for the POST requests from the required parameter url.
 * Sub-classes may override this to set the URL another way.
 * 
 * @return URL for POST requests
 * @throws MalformedURLException
 */
protected URL getURL() throws MalformedURLException {
    String urlPath = getOperatorContext().getParameterValues("url").get(0);
    return new URL(urlPath);
}

/**
 * Get the batch size to use from the parameter batchSize using 1 if that is
 * not set. Sub-classes may override this to set the batchSize another way.
 * 
 * @return batchSize to use
 */
protected int batchSize() {
    List<String> bp = getOperatorContext().getParameterValues("batchSize");
    if (bp.isEmpty())
        return 1;

      return Integer.getInteger(bp.get(0)); 
}

The processBatch() method is an abstract method that the super-class TupleConsumer requires and whose purpose is to take a batch of Tuple objects and transmit them to the external system. For HttpPost each attribute in the tuple is converted into a name-value pair with URL encoding. A single HTTP POST request might contain multiple tuples, with each name, value pair repeated multiple times.

@Override
protected final boolean processBatch(Queue<BatchedTuple> batch)
        throws Exception {

    StringBuilder data = new StringBuilder(1024);

    for (BatchedTuple item : batch) {

        StreamSchema schema = item.getStream().getStreamSchema();

        for (Attribute attribute : schema) {
            if (data.length() != 0)
                data.append('&');

            data.append(URLEncoder.encode(attribute.getName(), "UTF-8"));
            data.append('=');
            data.append(URLEncoder.encode(item.getTuple().getString(
                    attribute.getName()), "UTF-8"));
        }
    }

    // Send data
    URLConnection conn = url.openConnection();
    conn.setDoOutput(true);
    OutputStreamWriter wr = new OutputStreamWriter(conn.getOutputStream(),
            "UTF-8");
    wr.write(data.toString());
    wr.flush();

    // Get the response
    BufferedReader rd = new BufferedReader(new InputStreamReader(conn
            .getInputStream()));

    String line;
    while ((line = rd.readLine()) != null) {
        trace.log(TraceLevel.DEBUG, line);
    }
    wr.close();
    rd.close();

    return false;
  }
}
TupleConsumer: TupleConsumer implements generic batching of Tuples as follows:
  • Batch that is defined by number of tuples, including support for a single tuple in the batch.
  • Asynchronous processing of the batch so that processing of the batch does not affect the submission of tuples to the operator.
  • Optional timeout, so that if a time elapsed since the last tuple arrival, then the batch is processed regardless of how full it is and when it is not empty.
  • Option to preserve tuple arrival order of batch processing. If order preservation is not required, then multiple threads might be processing batches asynchronously.
  • The ability for the subclass to process partial batches, allowing unprocessed tuples to be processed in a future batch.
  • Final punctuation handling ensures that all batches are processed before they return from processPunctuation().

What to do next

Here are some optional tasks.
  • Study the code that implements HttpPost (com.ibm.streams.operator.samples.sinks.HttpPOST and com.ibm.streams.operator.samples.patterns.TupleConsumer).
  • Implement a sink operator in Java.
  • Study the code for other sample sink Java operators.