Parallel stream and CompletableFuture example
Let’s see how we can paralellize java application. Suppose we have asked to develop application where given the libraries list we need to calculate the number of books in each library. Suppose calculation is an expensive operation. The first thing that comes to our mind is to do this by streams.
public List<String> getNumberOfBooks() {
return libraries.stream()
.map(library -> String.format("library %s number of books %d",
library.getName(), library.calculateBooksCount()))
.collect(toList());
}
Since the calculation is expensive operation it will take a long time to finish all the tasks. We can use parallel steams and have non blocking executions.
public List<String> getNumberOfBooks() {
return libraries.parallelStream()
.map(library -> String.format("library %s number of books %d",
library.getName(), library.calculateBooksCount()))
.collect(toList());
}
[addToAppearHere]
Another way to accomplish the task is to use CompletableFuture
public List getNumberOfBooks() {
List tasksResult =
libraries.stream()
.map(library -> CompletableFuture.supplyAsync(
() -> library.getName() + " number of books: " +
library.calculateBooksCount()))
.collect(Collectors.toList());
return tasksResult.stream()
.map(CompletableFuture::join)
.collect(toList());
}
With this implementation parallel streams and CompletableFuture
have almost the same performance. Under the hood they use the same thread pool with the number equals to Runtime.getRuntime()
.
.availableProcessors()
CompletableFuture
has an advantage over parallel streams that we can define our own Executor with the size of thread pool which better suites the need of our application. Specifically for our task execution application, if there are 50 tasks we can have better performance, with the following Executor
:
private final Executor executor =
Executors.newFixedThreadPool(50),
new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
return t;
}
});
CompletableFuture.supplyAsync(() -> "task " + task.getName() + " execution status: " +
task.execute(), executor);