Java Asynchronous Programming using CompletableFuture – Part 1
What is Asynchronous Programming?
To put it in simple words:
“Synchronous programming is when the program is executed one statement followed by another in the sequence it appears in the code.”
vs
“Asynchronous programming is when the program is executed one statement followed by another in the code, but parts of the code exist which would branch out and execute in parallel with the main flow.”
Asynchronous programming is typically used when there are several chunks of mutually exclusive code (no shared mutable data or dependent data) in your program that take a lot of time (database queries/file operation/API calls) and prevent efficient use of your hardware resources (blocking code).
Why do we need Asynchronous Programming?
Today, we have machines that have multiple cores and greater memory capacities. Asynchronous programming helps the developer to take maximum advantage of these compute powers of the hardware.
Let us take an example scenario of an e-commerce portal. Assume, you are given the following payload about an order.
{ "order_id": 123456, "customer_id": 245123, "invoice_id": 432121323 }
Let us assume that you need to implement a function that would fetch the following information:
- additional order information
- customer information
- invoice details
- product details for each line item in the invoice
The synchronous flow would look like this:
In synchronous programming, each fetchY function would be blocked on the previous fetchX function. For instance, fetchCustomer(customer_id)
will not start until fetchOrder(order_id)
is completed. The system might be idle while fetchOrder(order_id)
waits for the information from the database. Here, fetchOrder(order_id)
blocks fetchCustomer(customer_id)
.
Let us implement the synchronous function.
package me.janeve.java8.concurrent_package.completablefutures.part1.sync.executors; import me.janeve.java8.concurrent_package.completablefutures.part1.entities.*; import me.janeve.java8.concurrent_package.completablefutures.part1.service.CustomerService; import me.janeve.java8.concurrent_package.completablefutures.part1.service.InvoiceService; import me.janeve.java8.concurrent_package.completablefutures.part1.service.OrderService; import me.janeve.java8.concurrent_package.completablefutures.part1.service.ProductInfoService; import me.janeve.java8.helpers.Timer; public class SynchronousExecution { public TransactionDetails fetchTransactionDetails(TransactionData payload){ Timer.start("SynchronousExecution::"+payload.getOrderId()); Order order = OrderService.fetchOrder(payload.getOrderId()); Customer customer = CustomerService.fetchCustomer(payload.getCustomerId()); Invoice invoice = InvoiceService.fetchInvoice(payload.getInvoiceId()); for(LineItem item: invoice.getItems()) { item.setProductInfo( ProductInfoService.getProductInfo(item.getProductId()) ); } TransactionDetails data = TransactionDetails.builder() .order(order) .customer(customer) .invoice(invoice) .build(); Timer.stop("SynchronousExecution::"+payload.getOrderId()); return data; } }
When we dissect the flow, we can see fetchOrder(order_id)
, fetchCustomer(customer_id)
, and fetchInvoice(invoice_id)
do not have any dependency or shared mutable data between them. But, fetchProductInfo(product_id_i)
can only be executed after getInvoice(invoice_id)
. Hence, fetchProductInfo(product_id_i)
depends on getInvoice(invoice_id)
. Here, each individual fetchProductInfo(product_id_i)
are also blocked by the previous fetchProductInfo(product_id_i-1)
but does not have any dependency or shared mutable data between them.
Asynchronous Programming of the function
We can rethink this problem as to how data flows through these functions. You have input data that you process and fetch (execution of functions) more data downstream. Some of these data can be fetched in parallel to others. Let us take an example scenario of what the synchronous program execution could look like.
Now in this scenario, the total execution of the main thread would take at least 885 milliseconds. Now let us implement the same as an asynchronous program. Let us rearrange the execution of the functions in separate threads. It could look something like this:
If there is sufficient and idle hardware resource available, the above main thread would take around 400 milliseconds. That is more than a 50% gain in performance. Apart from that, we can have a higher efficient usage of idle hardware resources. Asynchronous programming could help in attaining a competitive advantage for massive systems with a high demand for availability and performance.
CompletableFuture in Java
Java’s java.util.concurrent
package has evolved over its lifetime. Before understanding CompletableFuture, we must have a high-level understanding of the following Java features:
- Future API (since Java 1.5)
- ForkJoinPool (since Java 1.7)
- ExecutorService (since Java 1.5)
The interface java.util.concurrent.CompletionStage
and the class java.util.concurrent.CompletableFuture
were introduced in Java 8 as a part of Concurrency API improvements. The CompletableFuture classes are an extension to the Future API.
Let us implement the same logic with the CompletableFuture functionality of Java.
package me.janeve.java8.concurrent_package.completablefutures.part1.async.executors; import me.janeve.java8.concurrent_package.completablefutures.part1.entities.TransactionData; import me.janeve.java8.concurrent_package.completablefutures.part1.entities.TransactionDetails; import me.janeve.java8.concurrent_package.completablefutures.part1.service.CustomerService; import me.janeve.java8.concurrent_package.completablefutures.part1.service.InvoiceService; import me.janeve.java8.concurrent_package.completablefutures.part1.service.OrderService; import me.janeve.java8.concurrent_package.completablefutures.part1.service.ProductInfoService; import me.janeve.java8.helpers.Timer; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; public class ASynchronousExecution { public TransactionDetails fetchTransactionDetails(final TransactionData payload) { Timer.start("ASynchronousExecution::"+payload.getOrderId()); final TransactionDetails.TransactionDetailsBuilder builder = TransactionDetails.builder(); CompletableFuture.allOf( fetchInvoice(payload, builder), fetchOrder(payload, builder), fetchCustomer(payload, builder) ).join(); TransactionDetails data = builder.build(); Timer.stop("ASynchronousExecution::"+payload.getOrderId()); return data; } private CompletableFuture<Void> fetchCustomer(TransactionData payload, TransactionDetails.TransactionDetailsBuilder builder) { return CompletableFuture.supplyAsync(() -> CustomerService.fetchCustomer(payload.getCustomerId())).thenAccept(builder::customer); } private CompletableFuture<Void> fetchOrder(TransactionData payload, TransactionDetails.TransactionDetailsBuilder builder) { return CompletableFuture.supplyAsync(() -> OrderService.fetchOrder(payload.getOrderId())).thenAccept(builder::order); } private CompletableFuture<Void> fetchInvoice(TransactionData payload, TransactionDetails.TransactionDetailsBuilder builder) { return CompletableFuture.supplyAsync(() -> InvoiceService.fetchInvoice(payload.getInvoiceId())) .thenApply(invoice -> { List<CompletableFuture<Void>> futures = invoice.getItems().stream() .map(lineItem -> CompletableFuture.supplyAsync( () -> ProductInfoService.getProductInfo(lineItem.getProductId()) ).thenAccept(lineItem::setProductInfo)) .collect(Collectors.toList()); CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).join(); return invoice; }) .thenAccept(builder::invoice); } }
The complete code is available in my git repo.
Now let us understand some of the important implementations. First, let us look at the code in fetchCustomer(TransactionData payload, TransactionDetails.TransactionDetailsBuilder builder)
.
return CompletableFuture.supplyAsync(() -> CustomerService.fetchCustomer(payload.getCustomerId())).thenAccept(builder::customer);
Both fetchOrder(order_id) and fetchCustomer(customer_id) have the same logic. Let us look at fetchCustomer(customer_id). There are two important functions being invoked here:
- supplyAsync(Supplier supplier)
- thenAccept(Consume action)
supplyAsync(Supplier supplier)
The supplyAsync(Supplier supplier) function is called to fork a new thread. In our example, the main thread is asking CompletableFuture to perform the fetchCustomer(customer_id) function in a separate thread. The data that is returned by the fetchCustomer(customer_id) method is then sent downstream to the thenAccept(Consume action) function.
thenAccept(Consume action)
The thenAccept(Consume action) function gets the information from the upstream and just updates the data in the response builder class.
The fetchInvoice(invoice_id) must get the invoice details first. The invoice details would contain several line items for which we need to make requests to fetch the product info.
return CompletableFuture.supplyAsync(() -> InvoiceService.fetchInvoice(payload.getInvoiceId())) .thenApply(invoice -> { List<CompletableFuture<Void>> futures = invoice.getItems().stream() .map(lineItem -> CompletableFuture.supplyAsync(() -> ProductInfoService.getProductInfo(lineItem.getProductId())) .thenAccept(lineItem::setProductInfo)) .collect(Collectors.toList()); CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).join(); return invoice; }) .thenAccept(builder::invoice);
The CompletableFuture.supplyAsync() (Line No 1) function is to build an async call to fetch the invoice details. When the InvoiceService returns with the invoice details, it is then sent to the downstream function thenApply(Function fn).
thenApply(Function fn)
The thenApply(Function fn) function gets the invoice details and it then internally iterates through each line item (.getItems().stream().map()) and makes CompletableFuture.supplyAsync() call to fetch each of the product info in parallel. The thenAccept(Consume action) function is then called to update the product info for the line item it is processing.
The collect() function accumulates all the futures that is created for each line item we are processing. It is then used to make sure that all the line items has completed processing in line number 7. Since we need to now update the invoice details in the response builder, we return the invoice to the downstream in line number 8.
The thenAccept(builder::invoice) then updates the response builder with the value that was returned from the upstream.
Now let us look at the main thread that had spawned all these new threads.
final TransactionDetails.TransactionDetailsBuilder builder = TransactionDetails.builder(); CompletableFuture.allOf( fetchInvoice(payload, builder), fetchOrder(payload, builder), fetchCustomer(payload, builder) ).join(); TransactionDetails data = builder.build();
Line no 1, we first want to initialize a response builder that would help in async call. The methods can then take their time to update the builder class and once all the various threads have done their job, the builder is used to build the response.
Lines 3 to 7, we call the various calls and wait for those threads to finish their work. Once all the tasks are complete, the response is built and returned.
Summary
We have only scratched the surface of the capabilities of the CompletableFuture class. We have a plethora of functions and features to explore. Make sure you like/subscribe to our social media channels for new content updates.
If you have any specific scenarios you would like us to cover, please comment below and we will try to cover those scenarios in the coming parts on CompletableFuture and Java concurrency.