Manipulating reactive streams of data in Liberty
You can use the MicroProfile Reactive Streams Operators feature to create and manipulate reactive streams of data. This feature is an implementation of the Eclipse MicroProfile Reactive Streams Operators Specification 1.0.
The mpReactiveStreams-1.0
feature provides an interface for creating a
publish-subscribe relationship between a producer and consumer of data. It supports intermediate
processors and uses a set of generic operators. It provides a mechanism for passing data down the
stream. It also provides an elegant mechanism for handling errors and controlling data flow rates
within the stream.
For more information about the open source MicroProfile specification that is implemented by the MicroProfile Reactive Streams Operators feature, see Eclipse MicroProfile Reactive Streams Operators Specification 1.0.
Procedure
Example
The ReactiveStreams
class is the starting class for building most MicroProfile
reactive streams. Its methods that create the source of a reactive stream of data are easier to use
than writing a full implementation of the reactive streams Publisher
interface.
For example, the following code adds the even numbers in the first 10 integers:
CompletionStage<Optional<Integer>> result =
ReactiveStreams.of( 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 )
.filter(i -> (i % 2) == 0) // evens
.collect(Collectors.reducing((i, j) -> i + j)) // sum
.run();
// Wait for stream to complete and get result
Optional<Integer> total = result.toCompletableFuture().get();
In this example, the ReactiveStreams::of
method creates an underlying reactive
streams Publisher that emits its parameters when the stream is run. The generic
filter()
and collect()
operators select specific data items from
the stream and sum them. The full set of operators available is described in the PublisherBuilder
interface of the Eclipse
MicroProfile Reactive Streams Operators API documentation.
The ReactiveStreams
class can initiate a stream from many sources, such as from
an Iterable
, from a CompletionStage
, from a
Supplier
, from repeatedly calling a specific function, and from a reactive streams
Publisher.
Other significant classes are the PublisherBuilder
,
ProcessorBuilder
, and SubscriberBuilder
classes. They provide a
mechanism, through via()
and to()
methods, for connecting data
processing elements in the stream.
After a stream is completed with a Subscriber or other terminating construct, the
result is a CompletionRunner. It offers a run()
method to set the
stream into operation and, when finished, collect the result of the stream. The stream is run
asynchronously by calling the run()
method on a thread from the managed thread pool
of the server.
Standard Liberty
ManagedExecuter
context propagation is done from the initiating thread to the
thread that runs the stream. Each functional element of the stream typically runs in the context of
the same thread and thus shares any changes that are made to the thread contexts.
The full set of classes and operators is described in the Eclipse MicroProfile Reactive Streams Operators Specification 1.0 documentation.