Improving microservice resilience in Liberty

You can use the MicroProfile Fault Tolerance feature to make service invocation more resilient. This feature is an implementation of Eclipse Microprofile Fault Tolerance Specification 1.0, 1.1, and 2.0. It provides a programming model to support resilient microservices through patterns that include retries, circuit breakers, bulkheads, timeouts, and fallbacks.

Open Liberty The latest information about MicroProfile Fault Tolerance is available on the Open Liberty website.

Before you begin

For more information about the open source MicroProfile specification that is implemented by the MicroProfile Fault Tolerance feature, see Eclipse Microprofile Fault Tolerance Specification 2.0.

Procedure

  1. Add the mpFaultTolerance-1.0, mpFaultTolerance-1.1, or mpFaultTolerance-2.0 feature to a featureManager element in your server.xml file.
    <featureManager>
       <feature>mpFaultTolerance-2.0</feature>
    </featureManager>
    
  2. Use a code snippet to improve microservice resilience.
    A fault tolerance circuit breaker provides a way for systems to fail-fast. It temporarily disables the running of a service to prevent the service from overloading a system.
    Fault tolerance provides the ability to run methods asynchronously by annotating them with @Asynchronous and making the method return Future or (if using mpFaultTolerance-2.0) CompletionStage. This functionality can make other fault tolerance annotations more powerful, for example allowing a Bulkhead to queue waiting method calls, or allowing a @Timeout to return to a result as soon as the time limit is reached, even if the method does not respond to being interrupted.
    A fault tolerance bulkhead limits the number of concurrent calls to a service. The bulkhead limits the amount of system resource that service invocations can use.
  3. Configure annotation parameters
    • You can override the parameters of any Fault Tolerance annotation at runtime using MicroProfile config. For more information on this feature, see Spec Chapter 12
    • (When using mpFaultTolerance-1.1 or later) Metrics are automatically exported for each method annotated with a fault tolerance annotation so that it can be monitored. For more information on this feature, see Spec Chapter 11 and this OpenLiberty Blog Post
    • (if using mpFaultTolerance-2.0) Fault Tolerance is implemented using a CDI interceptor. If you are using mpFaultTolerance-2.0 and your application also uses other CDI interceptors, you can adjust the priority of the Fault Tolerance interceptor to configure how it interacts with other interceptors. For information on how to do this, see Spec Chapter 3
    • For a summary of the changes between different versions of the Fault Tolerance, please see the Spec release notes

Example

Circuit breaker code snippet 1: Create a CircuitBreakerBean with configured CircuitBreaker and Timeout.

@RequestScoped
public class CircuitBreakerBean {

    private int executionCounterA = 0;

    // The combined effect of the specified requestVolumeThreshold and failureRatio is that 3  
    // failures will trigger the circuit to open.
    // After a 1 second delay the Circuit will allow fresh attempts to invoke the service.
    @CircuitBreaker(delay = 1, delayUnit = ChronoUnit.SECONDS, requestVolumeThreshold = 3, failureRatio = 1.0)
    // A service is considered to have timed out after 3 seconds
    @Timeout(value = 3, unit = ChronoUnit.SECONDS)
    public String serviceA() {
        executionCounterA++;

        if (executionCounterA <= 3) {
            //Sleep for 10 secs to force a timeout
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                System.out.println("serviceA interrupted");
            }
        }

CircuitBreaker code snippet 2: Using the CircuitBreakerBean.

@Inject
CircuitBreakerBean bean;

// FaultTolerance bean with circuit breaker, should fail 3 times
for (int i = 0; i < 3; i++) {
    try {
        bean.serviceA();
        throw new AssertionError("TimeoutException not caught");
    } catch (TimeoutException e) {
            //expected
    }
}

// The CircuitBreaker should be open, so calling serviceA should generate a 
// CircuitBreakerOpenException.
try {
    bean.serviceA();
    throw new AssertionError("CircuitBreakerOpenException not caught");
} catch (CircuitBreakerOpenException e) {
    //expected
}

//allow time for the circuit to re-close
Thread.sleep(3000);

// The CircuitBreaker should be closed and serviceA should now succeed.
String res = bean.serviceA();
if (!"serviceA: 4".equals(res)) {
    throw new AssertionError("Bad Result: " + res);
}

Fallback and Retry code snippet 1: FTServiceBean with configured FallbackHandler and Retry policy.

@RequestScoped
public class FTServiceBean {

    // Annotate serviceA with a named FallbackHandler and a Retry policy specifying the
    // number of retries.
    @Retry(maxRetries = 2)
    @Fallback(StringFallbackHandler.class)
    public String serviceA() {
        throw new RuntimeException("Connection failed");
        return null;
    }   
}

Fallback and Retry code snippet 2: A FallbackHandler, the code that is driven if the main service fails.

@Dependent
public class StringFallbackHandler implements FallbackHandler<String> {

    @Override
    public String handle(ExecutionContext context) {
        return "fallback for " + context.getMethod().getName();
    }
}

Fallback and Retry code snippet 3: Using the FTServiceBean.

private @Inject FTServiceBean ftServiceBean;
    
try {
    // Call serviceA, which will be retried twice in the event of failure, after which
    // the FallbackHandler will be driven.
    String result = ftServiceBean.serviceA();
    if(!result.contains("serviceA"))
       throw new AssertionError("The message should be \"fallback for serviceA\"");
 }
catch(RuntimeException ex) {
    throw new AssertionError("serviceA should not throw a RuntimeException");
}
Asynchronous code snippet 1: Create an AsynchronousBean with methods which return Future or CompletionStage.
@RequestScoped
public class AsynchronousBean {
    
    @Asynchronous
    public Future<String> serviceA() {
        try {
            // Sleep to simulate work
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            throw new RuntimeException("serviceA interrupted", e);
        }
        // Return the result in a completed CompletableFuture
        return CompletableFuture.completedFuture("serviceA OK");
    }
    
    // Note: returning a CompletionStage requires Fault Tolerance 2.0
    @Asynchronous
    public CompletionStage<String> serviceB() {
        try {
            // Sleep to simulate work
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            throw new RuntimeException("serviceB interrupted", e);
        }
        // Return the result in a completed CompletableFuture (which implements CompletionStage)
        return CompletableFuture.completedFuture("serviceB OK");
    }

}
Asynchronous code snippet 2: Use the AsynchronousBean.
@Inject AsynchronousBean asyncBean;

// serviceA and serviceB methods will run in parallel because they are annotated with @Asynchronous
Future<String> resultA = asyncBean.serviceA();
CompletionStage<String> resultB = asyncBean.serviceB();

// The CompletionStage returned from serviceB allows us to add actions which take place when the serviceB method finishes
resultB.thenAccept((r) -> System.out.println("ServiceB result: " + r))
       .exceptionally((ex) -> {
           System.out.println("ServiceB failed");
           ex.printStackTrace();
           return null;
       });

// For the Future returned from serviceA, we need to wait for it to finish, then we can handle the result
try {
    System.out.println("serviceA result: " + resultA.get());
} catch (ExecutionException ex) {
    System.out.println("ServiceA failed");
    ex.printStackTrace();
} catch (InterruptedException ex) {
    System.out.println("Interrupted waiting for serviceA");
}

Bulkhead code snippet 1: Create a BulkheadBean with configured Bulkhead.

@RequestScoped
@Asynchronous
public class BulkheadBean {

    private final AtomicInteger connectATokens = new AtomicInteger(0);

    // Configure a Bulkhead that supports at most 2 concurrent threads.
    @Bulkhead(maxThreads = 2)
    public Future<Boolean> connectA(String data) throws InterruptedException {
        System.out.println("connectA starting " + data);
        int token = connectATokens.incrementAndGet();
        try {
            if (token > 2) {
                throw new RuntimeException("Too many threads in connectA[" + data + "]: " + token);
            }
            Thread.sleep(5000);
            return CompletableFuture.completedFuture(Boolean.TRUE);
        } finally {
            connectATokens.decrementAndGet();
            System.out.println("connectA complete " + data);
        }
    }
}

Bulkhead code snippet 2: Use the BulkheadBean.

@Inject
BulkheadBean bean;

// connectA has a poolSize of 2
// The first two calls to connectA should be run straight away, in parallel, each around
// 5 seconds
Future<Boolean> future1 = bean.connectA("One");
Thread.sleep(100);
Future<Boolean> future2 = bean.connectA("Two");
Thread.sleep(100);

// The next two calls to connectA should wait until the first 2 have finished
Future<Boolean> future3 = bean.connectA("Three");
Thread.sleep(100);
Future<Boolean> future4 = bean.connectA("Four");
Thread.sleep(100);

//total time should be just over 10s
Thread.sleep(11000);

if (!future1.get(1000, TimeUnit.MILLISECONDS)) {
    throw new AssertionError("Future1 did not complete properly");
}
if (!future2.get(1000, TimeUnit.MILLISECONDS)) {
    throw new AssertionError("Future2 did not complete properly");
}
if (!future3.get(1000, TimeUnit.MILLISECONDS)) {
    throw new AssertionError("Future3 did not complete properly");
}
if (!future4.get(1000, TimeUnit.MILLISECONDS)) {
    throw new AssertionError("Future4 did not complete properly");
}