Akka and nested Futures

I am new to scala. I am currently playing around with mongodb, akka and futures. I am trying to implement the following scenario:

there is one actor acting as a database client.
After registering the actor to the actor system, I invoke something like

dbclient ? FindAllUsers

the clients receive method looks like this

override def receive: Receive = {
  case FindAllUsers => sender ! AllUsers(users.findAll)
}

where users is an Object that looks like this:

class UserRepository(collection: MongoCollection[Document])(implicit ec: ExecutionContext)
{
  def findAll: Future[Seq[String]] =
    collection
      .find()
      .projection(fields(excludeId(), include("name")))
      .map(doc => doc("name").asString.getValue)
      .toFuture()
}

so, to my understanding, what is finally returned to dbclient ? FindAllUsers is of type Future[AllUsers(Future[Seq[String]])]. In order to print my users, I now do

implicit val ec = system.dispatcher
implicit val timeout = Timeout(1.second)
val response = dbclient ? FindAllUsers

response onComplete {
  case Success(AllUsers(users)) => users onComplete {
    case Success(listOfUserNames: Seq[String]) => listOfUserNames foreach println
    case _ =>
  }
  case _ =>
}

Well, this works. But reading my response is a little bit too nested for my taste. Is there anything I don’t see here? Any way I could reduce complexity?

In general, I’d strongly recommend against sending a Future in an Akka message. It will work so long as your system is small enough to fit onto a single node – but the whole point of Akka is to write larger-scale systems that work across a cluster. And I suspect that that Future will crash and burn when you try to send it across the network. (I haven’t tried it, but it doesn’t seem likely to work.) Messages should usually just contain plain old data.

So I’d recommend instead resolving the Future inside of the Actor, and sending the result as the response.

That involves one or two important tricks – caution is required when resolving Futures inside of Actors. The most important is that sender is transient – it is a function, and is only valid synchronously during receive. So if you want to send your response when the Future resolves, you need to store the sender for later.

So I would recommend changing receive to something like:

override def receive: Receive = {
  case FindAllUsers => {
    // Save the current value of sender for later:
    val mySender = sender
    users.findAll.map { allUsers =>
      // We're now inside the Future, and allUsers is the result:
      mySender ! AllUsers(allUsers)
    }
  }
}

So now, the return type of dbclient ? FindAllUsers is a more-manageable Future[AllUsers(Seq[String])] – which is probably what you wanted in the first place – and AllUsers is now a plain old data message, safe to send across the network.

Finally, you might want to check out the Akka Forum, a sibling site to this one that specializes in Akka. Happy hakking!

Thank you! But …

… is this still non-blocking?

Correct – .map is the function you usually want to use with Future, to keep things non-blocking.

I usually describe it like this:

You start with a Future[A]. This is essentially a box, in which an A will (probably) eventually be placed.

When you say:

val futA: Future[A] = createAFuture()
val futB: Future[B] = futA.map { a =>
  doSomethingWithAThatMakesB(a)
}

What you’re doing here is creating a box that will eventually have a B in it. The .map can be read as “when I get a value for A, take that value, transform it, and stuff the result into the B box”. Nothing is blocking – instead, you’re describing a chain of events that should happen when the A resolves.

In this particular case, B is actually Unit (since Akka’s ! operator doesn’t return anything). So you could actually use .foreach instead of .map. But that’s largely a detail – it’s all the same basic principle, of saying what to do when the Future resolves, rather than stopping and waiting for it.

It’s worth noting that this .map approach is usually preferable to onComplete. There are times you need onComplete, but idiomatic Scala code tends to use it relatively rarely. Doing these chains of .maps (and sometimes .flatMap, which is how you nest Futures inside of one another, and sometimes .foreach when you aren’t going to get a result) is usually the best way to go, and tends to lead to the cleanest code.

(And yes, Future has many of the same functions as List. That’s not an accident – they’re actually more alike and related than they appear at first glance…)

Thanks for that helpful answer! Your box comparison is really easy to understand. And your snippet shows me how to deal with it: whenever a box is filled, I unpack the stuff inside before putting it into a new box, otherwise I might get a huge number of boxes being packed into each other and the final unboxing would be a great mess, right?

Correct – I think you’ve got the idea.

While we’re on the subject, here are some extra details that aren’t relevant right now, but you may encounter later:

The most common way in which you wind up with Futures-inside-Futures is something like this:

val futA: Future[A] = createAFuture()
val futB: Future[Future[B]] = futA.map { a =>
  doSomethingThatCreatesAFutureOfB(a)
}

Basically, if the innards of your map() also returns a Future, then you’re trying to stick a Future inside the box. This is totally legal, but usually inconvenient. In these cases, you usually want to use flatMap instead:

val futA: Future[A] = createAFuture()
val futB: Future[B] = futA.flatMap { a =>
  doSomethingThatCreatesAFutureOfB(a)
}

flatMap means essentially, "map(), and then flatten the adjacent Futures". Since Future has a well-defined order of operations, this “flattening” is legal and makes sense: you finish the first step, then do the second.

This happens so often in Scala that there is a general syntax for it, called “for comprehensions”. The above could be rewritten as:

val futB: Future[B] = for {
  a <- createAFuture()
  b <- doSomethingThatCreatesAFutureB(a)
}
  yield b

This is literally the same code as the flatMap example above (almost – I’m over-simplifying a little): the for comprehension is called “syntax sugar”, because the compiler simply rewrites it into the flatMap above.

You can repeat this, with as many steps of "and then do something that returns a Future" as you like.

Types that have flatMap() (including Future, Option, List and lots of others – types that have a clear concept of how to “flatten” them when nested) are generally called “Monads”, and any of them can be used in nested for comprehensions like this.

The one gotcha that you will sometimes hit is that flatMap only allows you to nest like-with-like. You can only nest Futures in Futures, Lists in Lists, and so on. But if you wind up with something like Future[Option[Future[A]]], with different types being nested, that’s just a headache. There are ways to simplify them somewhat, but they’re a little more advanced, and they only work for some cases. So you want to avoid such things when you can, and the code tends to get a little messy when you can’t.

Check out https://doc.akka.io/api/akka/current/akka/pattern/index.html#pipe[T](future:scala.concurrent.Future[T])(implicitexecutionContext:scala.concurrent.ExecutionContext):PipeToSupport.this.PipeableFuture[T] , it covers exactly your use case:

override def receive: Receive = {
  case FindAllUsers =>
    import akka.pattern.pipe
    users.findAll.pipeTo(sender)
}

EDIT: I’d recommend to keep the toFuture conversion in the service layer, and keep the repository layer in terms of the Mongo Observable. So the above line would look more like users.findAll.toFuture().pipeTo(sender). This way the repository layer doesn’t need to know anything about ExecutionContext and the service layer where the actor is running can provide the actor’s dispatcher as the ExecutionContext if necessary.

I like that idea. However, I think I am still not used to that future concept: My mongo db holds some information I require within my GUI, e.g. I would like to show the result of FindAllUsers in a ChoiceDialog (ScalaFX). This Dialog needs to be initialized with a list of values and a defaultChoice value. It seems like I need to “wait for my result” before I can construct the dialog. So, the whole Future-stuff won’t help me anything when I always need to wait for my result before I can continue, right?

I mean, my UserRepository looks something like this for the moment:

object UserRepository
{
  def getAllUsersNames: Observable[String] =
    usersCollection
      .find()
      .map(user => user("username").asString.toString)

  // TODO: this should return Observable[Array[String]]
  def getUserMailAddresses(username: String): Observable[Array[AnyRef]] =
    usersCollection
      .find( equal("username", username) )
      .map(user => user("mail").asArray.toArray)
}

Now, as the repository layer is on my client, I do not have a classic service layer (the server is just MongoDB itself). I could directly read my results into GUI controls, but I guess this must be blocking anyway …

Like jducoeur said earlier in the thread, you can attach callbacks to Futures, and they will be automatically executed when the Future is completed, without you having to wait for the completion. But in your case you don’t even need to deal with Futures, because you have the Observable result type from the repository operations, and you can attach callbacks directly to Observables. So it would look like:

val choiceDialog = new ChoiceDialog()
val items = choiceDialog.items
UserRepository.getAllUsersNames.foreach(items.+=)

This concurrently adds each name to the choice dialog as it becomes available from the Mongo driver. You can do more sophisticated things with the Observable callbacks–error handling, etc.

1 Like

Thank you, @yawaramin, I guess that solved my issue :slight_smile:

1 Like