Elegant Patterns to execute work concurrently using Completion Service [Java]

Aman Garg
Nerd For Tech
Published in
6 min readMar 14, 2021

--

Courtesy: https://www.baeldung.com/

Use Case Analysis

Assume you have a use case, where there’s an input of N numbers from the user and you need to fetch the results of an expensive mathematical function f(x) computed on those numbers and return any first K that finish first.

Further, the logic of computing the result of the expensive function is obtained through HTTP interactions with a remote service, say Math Service

The objective is to solve this with a minimal latency overhead.

High Level Use Case

How do we go on about solving this? Let’s start by asking questions.

Characterising our work load

It’s important to analyse the work load we are dealing with it before we come up with any set of solutions.

  • Work is I/O Bound and not compute bound.
  • Input is going to be a list of numbers, can be modelled as Iterable<Work>
  • We’re most likely going to use HTTP for remote service communication.
  • Out of order execution / completion of results is perfectly acceptable.
  • A limited set of results K ≤ N is required.

Exploiting the domain service.

Now is a good time to know more about the domain. We need to have an engaging discussion with the maintainers of Math Service and stakeholders.

  • Is there any kind of input that is slow to compute on?
  • Can we pre filter the slower inputs such that the input size N = K
  • Is the Math Service reliable? If yes, how so? If no, why not.
  • Does Math Service degrade / rate limit requests after a certain input?
  • Do we know what kind of inputs the user is going to give so that either the Math Service or our service can precompute / cache the results?

Let’s assume the answer is No to all of the above. Any Yes on the above is bound to make the problem easier and add scope for more optimisations.

Building Blocks

  • We know the workload can be parallelised. Hence, multiple threads.
  • We know we need some implementation of a Thread Pool Executor to be able to manage the threads effectively. Let’s start with a fixed thread pool.
Pseudocode. Submit work in a fixed executor. Not the most efficient way.

This works. However, there are some obvious problems:

  • We need to block for the results of each call in an order. This means that if the user wanted any 10 results on a batch of 20 and if the first request was the slowest, the time to compute the entire batch is affected.
  • We’re not leveraging the fact that we do not need to get the results in an order. Any order will do.

Is there an implementation of an Executor that, to which, when the tasks are supplied, the results are placed on a queue in the order they complete and not in the order they were submitted?

Is there an implementation that places results on a queue as and when the tasks are completed from which we can take the first K and be done with?

Ideal Executor Service queuing “completed” tasks in “completion” order and not “submit” order

Executor Completion Service

Executor Completion Service is exactly that.

Instead of trying to find out which task has completed (to get the results), it just asks the CompletionService instance to return the results as they become available.

This is what we want. Let’s code out the building blocks.

Math Service:

We don’t have to worry too much about the underlying details here. To simulate the calls on a remote interaction, let’s add a random delay on inputs.

Our interface for the Math Service. Simple. Beautiful.
We just return the same number after a random delay. Simulates network delay in a real call.

Work Executor

This is the workhorse where the actual work is done. Here, we have a dependency injection of the Math Service and expose an API to the method.

Driver Program

  • Initialise the dependency and the dependent in the driver.
  • Output the results on the given input.
Our driver. We want first 3 in the given list.

Let’s look at the outputs. Notice the first tasks that are submitted in order are not the first ones to complete. This is because there’s a random delay to each and we process the first available results.

What we wanted to achieve. Return top K that finish first.

If we were to run this again, we might notice different results as expected.

Generalising

This is good and all. What if now there’s a similar requirement, but to User Service instead of Math Service. Say, we want first K payment methodsof the user and display results as and whey it arrives on the UI (Checkout page)

Do you notice a pattern here? We’re breaking work into chunks and submitting it to an executor and looping to find the first K. We should not be worried about the low level take / submit APIs of the executor service.

Indeed, there is.

  • Define an iterable / stream of input on which the work would be split and submitted into the executor.
  • Define a mapper function that creates a task for each element in the iterable. This specifies the actual work.
  • Specifying the count of results that are expected. Not all methods require completion of all sub events. This would be the parameter K
  • Implement a collector for collecting the intermediate results and returning the final response. It could be a list, set or what have you.

Let’s define an interface that the whole codebase can benefit from.

With the above interface, we can model our previous solution to the Math Service as elegantly as follows.

Using the interface for our problem.

And that’s it. Using the splitJoin abstraction powered by the Concurrent Work Executor our application programs can benefit from not knowing the low level details. Here’s the output again.

We get a similar output. But now we have a very good abstraction.

Fetching top K Payment methods now becomes easy.

How we’d model fetching first K payment method details

Interface implementation: Out of Order Concurrent Work Executor

The last piece of the puzzle is to implement the concurrent work executor interface. We can have multiple implementations. One that can process results out of order, one that processes in order etc.

  • This executor submits its tasks into a newCachedThreadPool This can be parameterised further. Some might require a fixed thread pool.
  • Shuts down the pool immediately to avoid more work.
  • Processes non null events of completion results as they arrive using a Completion service (a blocking completion queue)
  • It waits for as many tasks that need to be completed.
  • Throws checked exceptions when either a future#get fails

What if you don’t want the first K but the whole results?
You simply set K to be the size of the entire input.

This should be straightforward to implement. I’ve added relevant comments wherever necessary. Feel free to skip this or implement on your own.

Conclusion

  • Identify the work load
  • Try to exploit the domain wherever possible.
  • Choose the right executor service implementation and the right executor.
  • Build a meaningful abstraction around it.
  • Document the interfaces and how they provide value.

The full gist along with unit tests can be seen here.
That’s a wrap. Hope this piece added value. Open to constructive feedback :)

--

--