Sequential execution context?

I’m using an HTTP client which returns Futures. I’m making a sequence of calls against a REST API, with each call corresponding to an item in a source collection. I have the following requirements:

  • The calls shall be made in the same order as the order of items in the collection.
  • The next call should only be made after the previous have completed (with either success or failure).

It’s possible to compose futures in a way that will achieve this. However, an argument could be made that this should be solved by the ExecutionContext instead, as after all this is what controls the execution.

So the question is, how could one create an ExecutionContext which will satisfy these requirements?

The entire point of an ExecutionContext is to execute code asynchronously, with varying degrees of parallelism. Even if you were to reduce the parallelism of the ExecutionContext to 1, your code would still need to handle the asynchronous nature of the Futures being returned from your HTTP client. It seems to me that what you are looking to do should be handled in the logical control flow of your program, and not by the ExecutionContext.

Given your requirements / restrictions (this sounds an awful lot like an educational programming exercise), you could use flatmap to chain your Future executions together in order to ensure they execute in-order. Per your previous thread, you can also use transform to ensure you execute sequentially regardless of success or failure.

You can have asynchronous programming with just one thread.

And wanting to perform a series of asynchronous computations in a serial order is a very common real use case.

This actually shows why the eager nature of Future is problematic. With IO (and friends) this would be juts traverse and when you would need parallelism you would use a parallel traverse.

2 Likes

Thanks for your input. For the record, this is how I ended up composing the futures:

def sequentiallyLifted[A, B](as: Seq[A])(f: A => Future[B])(implicit ec: ExecutionContext): Future[List[Try[B]]] =
  as.foldLeft(Future.successful(List.empty[Try[B]])) { (fbs, a) =>
    fbs flatMap { bs =>
      f(a) transform { x =>
        Success(bs :+ x)
      }
    }
  }

A bit more, uh, context. The idea behind the question was triggered by this blog post:

http://derekwyatt.org/2014/11/14/a-sequential-execution-context/

However, the example shown there did not seem to yield the desired result (and I was unable to figure out how it worked).

Also for the record - it’s not an educational exercise.

You have two options. One is yo write your own serial traverse, or use the built-in Future.traverse with a single threaded ec.

For the first one, it would look like this:
(I am on cellphone, so it may have typos)

def serialTraverse[A, B](data: IterableOnce[A])(f: A => Future[B]): Future[List[B]] =
  data.iterator.foldLeft(Future.successful(List.empty[B])) {
  case (a, accF) =>
    for {
      acc <- accF
      b <- f(a)
     } yield b :: acc
 }.map(_.reverse)

The second option would look like this:

implicit ec = ExcutionContext.fromExecutor(Executors.newSingleThreadedExecutor())
Future.traverse(data)(doHttpPettion)

Note that appending to a List is very expensive, use a Vector or prepend and then reverse at the end as I did.


Also, note that if you use cats-effect this would be much simpler.

data.traverse(item => doHttpRequest(item). attempt)

That will return an IO[List[Either[Throwable, B]]]

2 Likes