CompletableFuture
About
Java CompletableFuture is a class introduced in Java 8 as part of the java.util.concurrent package that implements the Future and CompletionStage Interface. It represents a future result of an asynchronous computation that might not be available immediately. It acts like a placeholder for the eventual outcome of an operation that's being executed concurrently by different thread. It provides a way to write asynchronous, non-blocking code in Java, especially when dealing with tasks that might take a long time to complete, such as network requests or database queries.
Advantages:
Asynchronous Programming: CompletableFuture enables you to execute long-running task/tasks without blocking the current thread. This allows the application to remain responsive and handle other requests while the asynchronous operations are in progress.
Composable Operations: CompletableFuture provides a vast set of methods to chain together multiple asynchronous operations, forming complex workflows. We can define what happens after a computation finishes successfully (thenApply, thenAccept), what to do if it encounters an exception (exceptionally), and how to combine results from multiple CompletableFutures (allOf, anyOf).
Improved Error Handling: CompletableFuture offers more complex mechanisms for handling exceptions that arise during asynchronous computations. We can define fallback actions using exceptionally and propagate or chain exceptions using methods like handle.
Some Key Points:
supplyAsync
supplyAsync(Supplier<U> supplier, Executor executor)
: This method initiates an asynchronous task that returns a value of type U
. The provided Supplier
is executed asynchronously, and the result is wrapped in a CompletableFuture
. An optional Executor
can be provided to specify the thread pool where the computation will be executed.
runAsync
runAsync(Runnable runnable, Executor executor)
: Similar to supplyAsync()
, but for tasks that do not return a value. The provided Runnable
is executed asynchronously, and the resulting CompletableFuture
completes when the task finishes.
thenApply
thenApply(Function<? super T,? extends U> fn)
: This method specifies a function to be applied to the result of the current CompletableFuture
when it completes. It returns a new CompletableFuture
that holds the result of the function.
thenAccept
thenAccept(Consumer<? super T> action)
: Similar to thenApply()
, but for cases where you want to perform an action (such as printing or logging) on the result without returning a value. It accepts a Consumer
that takes the result as input.
thenCombine
thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
: This method combines the result of the current CompletableFuture
with the result of another CompletionStage
when both are complete. It applies the specified function to the results of both stages and returns a new CompletableFuture
holding the combined result.
thenCompose
thenCompose(Function<? super T,? extends CompletionStage<U>> fn)
: This method applies a function to the result of the current CompletableFuture
and returns a new CompletionStage
. It's useful for chaining dependent asynchronous tasks where the result of one task determines the execution of another.
exceptionally
exceptionally(Function<Throwable,? extends T> fn)
: This method handles exceptions that occur during the execution of the current CompletableFuture
. It applies the specified function to the exception and returns a new CompletableFuture
with the result of the function, effectively recovering from the exception.
exceptionallyAsync
exceptionallyAsync(Function<Throwable, ? extends T> fn)
: This method is also used for handling exceptions that may occur during the execution of the asynchronous computation. However, it differs from exceptionally
in how it handle the execution context (thread) in which the exception handling function is invoked.
handle
handle(BiFunction<? super T,Throwable,? extends U> fn)
: Similar to exceptionally()
, but the function provided can handle both the result and any exception that occurs during the execution of the current CompletableFuture.
If an exception occurs during the execution, the handle()
function receives the exception object and if no exception occurs, the handle()
function receives the result, and we modify it.
allOf
allOf(CompletableFuture<?>... cfs)
: This method waits for all of the provided CompletableFutures
to complete. It returns a new CompletableFuture
that completes when all of the provided CompletableFutures
are done, regardless of their individual results.
anyOf
anyOf(CompletableFuture<?>... cfs)
: This method waits for any of the provided CompletableFutures
to complete. It returns a new CompletableFuture
that completes when any of the provided CompletableFutures
completes, with the result of the first completed CompletableFuture
.
orTimeout
orTimeout(long timeout, TimeUnit unit):
This method sets a timeout for the completion of the future. If the future does not complete within the specified timeout duration, it completes exceptionally with a TimeoutException
. It's useful when you want to handle the timeout by throwing an exception. The timeout is set on the original CompletableFuture, and if the timeout occurs, the CompletableFuture itself is completed exceptionally.
completeOnTimeout
completeOnTimeout(T value, long timeout, TimeUnit unit)
: This method sets a timeout for the completion of the future. If the future does not complete within the specified timeout duration, it completes with the provided value. It's useful when you want to handle the timeout by providing a fallback value instead of throwing an exception. The timeout is set on a new CompletableFuture derived from the original CompletableFuture, and if the timeout occurs, the new CompletableFuture is completed with the specified value, while the original CompletableFuture remains incomplete.
cancel
cancel(boolean mayInterruptIfRunning)
: This method cancels the computation associated with the current CompletableFuture
, if possible.
join
join()
: This method waits for the completion of the current CompletableFuture
and returns its result, blocking if necessary until the result is available.
get
get()
: This method is similar to join()
i.e. wait (block) the current thread until the CompletableFuture
finishes its execution, but it can throw checked exceptions. It waits for the completion of the current CompletableFuture
and returns its result, but it also throws any exception that occurred during the computation, wrapped in a ExecutionException
.
Example:
Creating a single CompletableFuture using
supplyAsync
.
package org.example;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@Slf4j
public class MainApplication {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// supplyAsync method takes a Supplier<U> function
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
// Perform some async computation
log.info("Performing some async computation");
return "Returning the result";
});
log.info("Continuing with main process");
// get method waits if necessary for the future to complete, and then returns its result
log.info(completableFuture.get());
}
}

Multiple asynchronous operations using
supplyAsync
.
package org.example;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CompletableFuture;
@Slf4j
public class MainApplication {
@SneakyThrows
public static void main(String[] args) {
CompletableFuture<String> completableFuture_1 = CompletableFuture.supplyAsync(() -> {
log.info("Performing some async computation with completableFuture_1");
return "Returning the result with completableFuture_1";
});
CompletableFuture<String> completableFuture_2 = CompletableFuture.supplyAsync(() -> {
log.info("Performing some async computation with completableFuture_2");
return "Returning the result with completableFuture_2";
});
CompletableFuture<String> completableFuture_3 = CompletableFuture.supplyAsync(() -> {
log.info("Performing some async computation with completableFuture_3");
return "Returning the result with completableFuture_3";
});
log.info("Continuing with main process");
CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(completableFuture_1, completableFuture_2, completableFuture_3);
combinedFuture.get();
log.info("completableFuture_1 Result: {}", completableFuture_1.get());
log.info("completableFuture_2 Result: {}", completableFuture_2.get());
log.info("completableFuture_3 Result: {}", completableFuture_3.get());
}
}

Creating a single CompletableFuture using
runAsync
.
// runAsync method takes a Runnable function
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
// Perform some async computation
log.info("Performing some async computation");
});
// Main process continues
log.info("Continuing with main process");
// get method waits if necessary for the future to complete
// Since the computation doesn't return a value, we just wait for it to complete
completableFuture.get();

Example using
thenApply
.
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
// Perform some async computation
return "Hello";
});
// thenApply method applies a function to the result of the CompletableFuture
CompletableFuture<String> futureResult = completableFuture.thenApply(result -> result + " World");
log.info("Continuing with main process");
log.info(futureResult.get());

Example using
thenAccept
.
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
// Perform some async computation
return "Hello";
});
// thenAccept method performs an action on the result without returning a value
CompletableFuture<Void> futureResult = completableFuture.thenAccept(result -> log.info(result + " World"));
// Waits for the action to complete
futureResult.get();

Example using
thenCombine
.
CompletableFuture<String> completableFuture1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> completableFuture2 = CompletableFuture.supplyAsync(() -> "World");
// thenCombine method combines the results of two CompletableFutures
CompletableFuture<String> futureResult = completableFuture1
.thenCombine(completableFuture2, (result1, result2) -> result1 + " " + result2);
log.info(futureResult.get()); // Prints "Hello World"

Example using
thenCompose
.
CompletableFuture<String> completableFuture1 = CompletableFuture.supplyAsync(() -> "Hello");
// thenCompose method applies a function that returns a new CompletionStage
CompletableFuture<String> futureResult = completableFuture1
.thenCompose(result -> CompletableFuture.supplyAsync(() -> result + " World"));
log.info(futureResult.get()); // Prints "Hello World"

Example using
orTimeout
andexceptionallyAsync
package org.example;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@Slf4j
public class MainApplication {
@SneakyThrows
public static void main(String[] args) {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> getSomeDataFromExternalService())
.orTimeout(7, TimeUnit.SECONDS)
.exceptionallyAsync(ex -> {
log.error("Some error occurred");
return "Some fallback value";
});
// exceptionallyAsync method handles the exception and provides a fallback value
log.info("Response: {}", completableFuture.get());
}
static String getSomeDataFromExternalService() {
log.info("Fetching data from external service");
throw new RuntimeException("Got some exception");
}
}

Example using
exceptionally
.
@SneakyThrows
public static void main(String[] args) {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(MainApplication::getSomeDataFromExternalService)
.exceptionally(ex -> {
log.error("Some error occurred");
return "Some fallback value";
});
// exceptionally method handles the exception and provides a fallback value
log.info("Response: {}", completableFuture.get());
}
static String getSomeDataFromExternalService() {
log.info("Fetching data from external service");
throw new RuntimeException("Got some exception");
}

Example using
orTimeout
.
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(5000); // Simulate a long-running computation
return "Result";
} catch (InterruptedException e) {
return "Interrupted";
}
}).orTimeout(2000, TimeUnit.MILLISECONDS);
try {
log.info("orTimeout result: {}", completableFuture.get());
} catch (Exception e) {
log.info("orTimeout exception: {}", e.getMessage()); // Will throw a TimeoutException
}

Example using
completeOnTimeout
.
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(5000); // Simulate a long-running computation
return "Result";
} catch (InterruptedException e) {
return "Interrupted";
}
}).completeOnTimeout("Fallback Value", 2000, TimeUnit.MILLISECONDS);
try {
log.info("completeOnTimeout result: {}", completableFuture.get());
} catch (Exception e) {
log.info("completeOnTimeout exception: {}", e.getMessage()); // Will print "Fallback Value"
}

Example using
handle
.
// Example CompletableFuture that can throw an exception
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
// Simulate an exception
throw new RuntimeException("Exception occurred");
}).handle((result, exception) -> {
if (exception != null) {
log.info("Exception occurred: {}", exception.getMessage());
return 0; // Fallback value in case of exception
} else {
return (Integer) result * 2; // Modify the result
}
});
log.info("Result: {}", completableFuture.get()); // Prints "Result: 0" because of the exception

Example using
allOf
.
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Result 1");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Result 2");
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "Result 3");
CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2, future3);
allFutures.get(); // Waits for all futures to complete
log.info("All futures completed successfully");

Example using
anyOf
.
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Result 1";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Result 2";
});
CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(future1, future2);
log.info("First future completed: {}", anyFuture.get()); // Waits for any future to complete

Example using
cancel
.
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
try {
log.info("Started Processing something");
Thread.sleep(3000);
log.info("Processing completed");
return "Result";
} catch (InterruptedException e) {
return "Exception occurred";
}
});
// Attempt to cancel the computation
// true indicates that interrupt should be sent to the thread if it's running
boolean cancelled = completableFuture.cancel(true);
log.info("Computation cancelled: {}", cancelled);
// Check if the computation was successfully cancelled before retrieving the result
if (!cancelled) {
log.info("Result: {}", completableFuture.get()); // Prints "Result" if not cancelled
} else {
log.info("Computation was cancelled, no result available");
}

Example using
get
andjoin
.
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
throw new ArithmeticException();
});
// Using get()
try {
String result = completableFuture.get(); // Throws checked exceptions (InterruptedException, ExecutionException)
log.info("Result using get(): {}", result); // Will not be printed
} catch (ExecutionException e) {
log.info("Exception occurred: {}", e.getMessage());
}
// Using join()
try {
String result = completableFuture.join(); // Throws unchecked exceptions (CompletionException)
log.info("Result using get(): {}", result); // Will not be printed
} catch (CompletionException e) {
log.info("Exception occurred: {}", e.getMessage());
}

Use Cases:
When we have a list of account IDs and want to fetch the balance with the help of those IDs by calling external API parallelly.
package org.example;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@Slf4j
public class MainApplication {
@SneakyThrows
public static void main(String[] args) {
List<String> accountIdList = List.of("Account1", "Account2", "Account3", "Account4", "Account5");
var balanceTask = accountIdList
.stream()
.map(MainApplication::getBalance)
.toList();
log.info("Main process continues");
CompletableFuture.allOf(balanceTask.toArray(new CompletableFuture[balanceTask.size()])).get();
for (CompletableFuture<String> balance : balanceTask) {
log.info("Balance: {}", balance.get());
}
}
static CompletableFuture<String> getBalance(String accountId) {
// Fetch the balance by calling some external api
return CompletableFuture.supplyAsync(() -> {
log.info("Calling some External API to fetch balance for {}", accountId);
return "BalanceFor" + accountId;
});
}
}

Last updated