Executing an action after everything in a list of futures has completed

I have a list of futures representing asynchronous computations and I’m using map on the list to perform an additional action (display results) as each future in the list completes - this works really well, with each future being handled asynchronously.

However I need to perform cleanup after the last future in the list has completed (shut down the Actor system) but I haven’t been able to come up with a good way of doing it.

I tried using Future.sequence on the list but that appears to prevent the asynchronous handling of the individual futures in the list.

I’m sure there’s a neat way of doing it but I haven’t managed to figure it out.

As an aside, I really, really object to the Discourse brokenness that means I’ve had to submit this question to a list I subscribe to via email via some noxious web interface. Site admins, please sort this mess out.

1 Like

I stole this with only slight tweaks from Alvin Alexander who often has the most comprehensible Scala explanations out there:

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}

def sleep(time: Long): Unit = Thread.sleep(time)

// (a) create the futures
val f1 = Future { sleep(800); 1 }
val f2 = Future { sleep(200); 2 }
val f3 = Future { sleep(400); 3 }

// (b) run them simultaneously in a for-comprehension
val result = for {
  r1 <- f1
  r2 <- f2
  r3 <- f3
} yield (r1 + r2 + r3)

// (c) do whatever you need to do with the result
result.onComplete {
  case Success(x) => println(s"\nresult = $x")
  case Failure(e) => e.printStackTrace
}

// important for a little parallel demo: keep the jvm alive
sleep(3000)

Seems like this will do what you need, right? result.onComplete won’t fire with Success until all the Futures are done with their own async meandering, and then you can do whatever cleanup you need instead of printing as above.

Future.sequence seems to work fine for me (Scastie - An interactive playground for Scala.).

However, are the futures using the ActorSystem’s ExecutionContext? In that case it might be a race condition, since AFAIK a shutdown ActorSystem wouldn’t be able to dispatch the remaining Futures. From Akka’s documentation on ActorSystem.terminate:

Be careful to not schedule any operations on the dispatcher of this actor system as it will have been shut down before this future completes.

To work around that, either use another ExecutionContext, or make sure that your terminate handler depends on the “display results” Futures, rather than the “fetch results” Futures.

I stole this with only slight tweaks from Alvin
Alexander

who often has the most comprehensible Scala explanations out there:

Unfortunately I have a variable-length list of futures where the length
isn’t known in advance, so a for-comprehension won’t work.

One possibility may be to use a Monix Task - this has a method gather which is like sequence but can run concurrently.

regards,
Siddhartha

Future.sequence seems to work fine for me
(Scastie - An interactive playground for Scala.).
It works fine for me as well but it seemed to prevent the Futures in the
list executing asynchronously, although I was probably wrong as that
doesn’t seem to be the case in your example above (thanks for that).

However, are the futures using the ActorSystem’s
ExecutionContext? In that case it might be a race condition, since
AFAIK a shutdown ActorSystem wouldn’t be able to dispatch the
remaining Futures. From Akka’s documentation on
ActorSystem.terminate
:

Yes, that was the problem I was trying to fix by using the Futures - by
having each Actor complete a Future when it is finished and having the
main method wait for them all to complete before shutting everything down.

Be careful to not schedule any operations on the dispatcher of this
actor system as it will have been shut down before this future
completes.

To work around that, either use another ExecutionContext, or make
sure that your terminate handler depends on the “display results”
Futures, rather than the “fetch results” Futures.
I’ve got this working as I want but I think your last paragraph nails
the problem I was having - initially I was waiting on the fetch and not
the display result.

The application is taking a CSV file of flight data and using an Akka
stream to parse it and then collate by airport. The collation is done by
an actor that creates a sub-actor per airport which collects each
airport’s data and performs calculations on it:

val airports = actorSystem.actorOf(AirportRouter.props)
csv.runWith(Sink.actorRefWithAck(airports, StartOfData, Acknowledge,
EndOfData))

// Collect and display the per-airport statistics.
(airports ?
GetStats).mapTo[Future[Seq[Future[AirportStats]]]].map(.map(stats =>
Future.sequence(stats.map(
.map(displayStats()))).onComplete( =>
actorSystem.terminate())
))

The outer future in Future[Seq[Future[AirportStats]]]] is completed when
the router actor has processed all the records in the Akka stream - by
that point all the per-airport collation actors will have been created,
even if they are still processing the data. The inner list of futures is
per-airport and they are completed as each airport’s statistics
calculations are complete. What I needed to do was to map those inner
futures to a future that is completed when the airport’s data has been
displayed, as you suggested.

Thanks for the help,

One possibility may be to use a Monix Task - this has a method gather which is like sequence but can run concurrently.

Interesting, I wasn’t aware of that library, thanks. However I’m doing
this on top of Akka so I don’t think I’d be able to use Monix without
starting over.