Manipulating reactive streams of data in Liberty

You can use the MicroProfile Reactive Streams Operators feature to create and manipulate reactive streams of data. This feature is an implementation of the Eclipse MicroProfile Reactive Streams Operators Specification 1.0.

The mpReactiveStreams-1.0 feature provides an interface for creating a publish-subscribe relationship between a producer and consumer of data. It supports intermediate processors and uses a set of generic operators. It provides a mechanism for passing data down the stream. It also provides an elegant mechanism for handling errors and controlling data flow rates within the stream.

For more information about the open source MicroProfile specification that is implemented by the MicroProfile Reactive Streams Operators feature, see Eclipse MicroProfile Reactive Streams Operators Specification 1.0.

Procedure

  1. Add the mpReactiveStreams-1.0 feature to a featureManager element in your server.xml file.
    <featureManager>
       <feature>mpReactiveStreams-1.0</feature>
    </featureManager>
  2. Use methods in the ReactiveStreams class to create a reactive stream of data. Then, use methods in MicroProfile Reactive Streams Operators classes to manipulate that stream.

Example

The ReactiveStreams class is the starting class for building most MicroProfile reactive streams. Its methods that create the source of a reactive stream of data are easier to use than writing a full implementation of the reactive streams Publisher interface.

For example, the following code adds the even numbers in the first 10 integers:

CompletionStage<Optional<Integer>> result = 
ReactiveStreams.of( 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 )
                .filter(i -> (i % 2) == 0)                     // evens
                .collect(Collectors.reducing((i, j) -> i + j)) // sum
                .run();
// Wait for stream to complete and get result
Optional<Integer> total = result.toCompletableFuture().get();

In this example, the ReactiveStreams::of method creates an underlying reactive streams Publisher that emits its parameters when the stream is run. The generic filter() and collect() operators select specific data items from the stream and sum them. The full set of operators available is described in the PublisherBuilder interface of the Eclipse MicroProfile Reactive Streams Operators API documentation.

The ReactiveStreams class can initiate a stream from many sources, such as from an Iterable, from a CompletionStage, from a Supplier, from repeatedly calling a specific function, and from a reactive streams Publisher.

Other significant classes are the PublisherBuilder, ProcessorBuilder, and SubscriberBuilder classes. They provide a mechanism, through via() and to() methods, for connecting data processing elements in the stream.

After a stream is completed with a Subscriber or other terminating construct, the result is a CompletionRunner. It offers a run() method to set the stream into operation and, when finished, collect the result of the stream. The stream is run asynchronously by calling the run() method on a thread from the managed thread pool of the server.

Standard Liberty ManagedExecuter context propagation is done from the initiating thread to the thread that runs the stream. Each functional element of the stream typically runs in the context of the same thread and thus shares any changes that are made to the thread contexts.

The full set of classes and operators is described in the Eclipse MicroProfile Reactive Streams Operators Specification 1.0 documentation.