Implementing a sink operator using the Java Operator API
A sink operator transmits information or events to an external system, such as a dashboard, web-server, mail-server, or a database. When such a system provides a Java™ API, a sink operator to that system can be implemented in Java.
Before you begin
About this task
A simple model is that the arrival of each incoming tuple results in data transmitted to the external system in the Operator.process() method. Typically this model might not be sufficient for performance reasons and, instead, batching must be used so that a batch of information that represents multiple tuples is transmitted to the external system in a single operation. For more information about batching, see Batching in a sink operator.
The general approach for writing a sink operator in Java is described here. Some steps include example information for the implementation of a sink operator in Java that implements HTTP POST, where incoming tuples result in HTTP POST operations, such as a web order form. Sample class com.ibm.streams.operator.samples.sinks.HttpPOST is provided.
Procedure
Results
Example
- com.ibm.streams.operator.samples.sinks.HttpPOST shows the logic specific to creating an HTTP POST request from tuples.
- com.ibm.streams.operator.samples.patterns.TupleConsumer is an abstract class that demonstrates a flexible batching of tuples that is intended for use as a sink operator.
HttpPost extends TupleConsumer: Starting with HttpPost, there are two key methods initialize() and processBatch(). The initialize() method uses the SPL parameter url to determine the URL for the POST request. A connection attempt is not made to validate the URL since the only knowledge is that the URL is suitable for a POST request and a POST request with no data might have unknown effects on the server.
/**
* URL of the HTTP server we will be posting data to.
*/
private URL url;
/**
* Initialize by setting the URL and the batch size.
*/
@Override
public void initialize(OperatorContext context) throws Exception {
super.initialize(context);
url = getURL();
setBatchSize(batchSize());
}
/**
* Get the URL for the POST requests from the required parameter url.
* Sub-classes may override this to set the URL another way.
*
* @return URL for POST requests
* @throws MalformedURLException
*/
protected URL getURL() throws MalformedURLException {
String urlPath = getOperatorContext().getParameterValues("url").get(0);
return new URL(urlPath);
}
/**
* Get the batch size to use from the parameter batchSize using 1 if that is
* not set. Sub-classes may override this to set the batchSize another way.
*
* @return batchSize to use
*/
protected int batchSize() {
List<String> bp = getOperatorContext().getParameterValues("batchSize");
if (bp.isEmpty())
return 1;
return Integer.getInteger(bp.get(0));
}
The processBatch() method is an abstract method that the super-class TupleConsumer requires and whose purpose is to take a batch of Tuple objects and transmit them to the external system. For HttpPost each attribute in the tuple is converted into a name-value pair with URL encoding. A single HTTP POST request might contain multiple tuples, with each name, value pair repeated multiple times.
@Override
protected final boolean processBatch(Queue<BatchedTuple> batch)
throws Exception {
StringBuilder data = new StringBuilder(1024);
for (BatchedTuple item : batch) {
StreamSchema schema = item.getStream().getStreamSchema();
for (Attribute attribute : schema) {
if (data.length() != 0)
data.append('&');
data.append(URLEncoder.encode(attribute.getName(), "UTF-8"));
data.append('=');
data.append(URLEncoder.encode(item.getTuple().getString(
attribute.getName()), "UTF-8"));
}
}
// Send data
URLConnection conn = url.openConnection();
conn.setDoOutput(true);
OutputStreamWriter wr = new OutputStreamWriter(conn.getOutputStream(),
"UTF-8");
wr.write(data.toString());
wr.flush();
// Get the response
BufferedReader rd = new BufferedReader(new InputStreamReader(conn
.getInputStream()));
String line;
while ((line = rd.readLine()) != null) {
trace.log(TraceLevel.DEBUG, line);
}
wr.close();
rd.close();
return false;
}
}
- Batch that is defined by number of tuples, including support for a single tuple in the batch.
- Asynchronous processing of the batch so that processing of the batch does not affect the submission of tuples to the operator.
- Optional timeout, so that if a time elapsed since the last tuple arrival, then the batch is processed regardless of how full it is and when it is not empty.
- Option to preserve tuple arrival order of batch processing. If order preservation is not required, then multiple threads might be processing batches asynchronously.
- The ability for the subclass to process partial batches, allowing unprocessed tuples to be processed in a future batch.
- Final punctuation handling ensures that all batches are processed before they return from processPunctuation().
What to do next
- Study the code that implements HttpPost (com.ibm.streams.operator.samples.sinks.HttpPOST and com.ibm.streams.operator.samples.patterns.TupleConsumer).
- Implement a sink operator in Java.
- Study the code for other sample sink Java operators.