Limiting the number of futures running in parallel

Hello! I’ve been looking for a while and haven’t been able to find any answers, hoping someone here might be able to help. I have a Seq[Future[something]]. The Seq is huge (100k+ futures), and the futures are hitting a cache. I’m trying to find a mechanism to process this Seq with some cap on the max number of futures running at once.

2 solutions I’m that I’d rather not pursue:

  1. Creating a fixed thread pool of maxConcurrency, and running the futures there, but blocking each future, so each thread is only working on 1 future at a time.
    To me this solution defeats the point of Futures, but it works, and if the Futures are short may not be that bad.

  2. Using scala stream.Source(seq).mapAsync(maxConcurrency)
    This is super nice, however, I have some issues with it for my use case:

  • If I have multiple clients seeking to run the Seq[Future…]], then the maxConcurrency is actually not met, since I need to create 2 Streams.
  • I could create a Source.queue… and funnel all futures into that, but then keeping the callers lists in order would require some future wrapper, wrapping them in promises which resolve when the stream is done with the underlying future, so the callers can get results for their own Seq’s.
  • However, that Source.queue needs a specific type, and my cache returns multiple types (Future[SomeType], Future[SomeOtherType]), so using one stream for all cache futures doesn’t work.

Are there any async ways to limit the max concurrency in scala? Any help would be much appreciated!

There are lots of ways to limit concurrency, but they tend to involve getting away from raw Futures. By the time you have a Future in hand, it’s too late: it’s already running, and hard to control.

Take a look at Cats-effect and ZIO, which handle this correctly: instead of having a huge sequence of Futures, you have a lot of IO objects, which are algorithms that create async processes. At that point, it’s extremely easy and normal to gate the parallelism before the processing starts, and control the running of those algorithms as they go.

Beyond that, if you’re stuck with Futures, you really want to be thinking about the JVM, and how to manage the parallelism of the Threads that underlie those Futures…

If you can use Java 21 and Java futures instead (which works fine from Scala, just not quite as elegantly), you can use virtual threads to run your futures, which will scale reasonably well to 100,000 (virtual) threads.

This, in practice, is not terribly different from your first thread pool idea.

For example, with my wrapper over the Java 21 virtual threads model, I can run

val ai = new java.util.collection.atomic.AtomicInteger(0)
val seq = Array.fill(100000){
  Fu:
    Thread.sleep(1)
    ai.incrementAndGet
}
seq.fu.ask().get.sum

and it finishes, well, too fast for me to type the last line.

2 Likes

While I personally would use IO from cats-effect to manage this problem, it seems you are in the Akka world so that is out of question.

Your best option may be to use a Java Semaphore: Semaphore (Java Platform SE 8 ) to limit the concurrency.
BTW, since the list is quite big, it would be worth to actually look into Akka Source and consider why you need to re-run it.

I had also come across the suggestion of Semaphore in my quest. It just seemed so low level, I was really hopeful higher level option existed. As for the Akka source and re-running it, here’s a simplification of the use case I have:

class Cache(asyncCacheClient: Client) {
  def getString(key: String): Future[String] = {
    asyncCacheClient.getString(key)
  }
  def getInt(key: String): Future[Int] = {
    asyncCacheClient.getInt(key)
  }
}

And then the server is processing request with some logic like:

def handleRequest(userInput: Input): Future[Response] = {
  if (...) {
    val cacheStrings: Future[Seq[String]] = Future.sequence(userInput.map(cache.getString(_.key)))
    cacheStrings.map(toResponse)
  } else {
    val cacheInts = Future.sequence(userInput.map(cache.getInt(_.key)))
    cacheInts.toResponse
  }
}

It’s in this use case I need to make sure the calls to the cache client are capped (no more than X active requests). So Akka stream was difficult to make one stream instance for processing generic futures. E.g. here’s some sudo scala to try and illustrate my conundrum:

class Cache(client: Client) {
  val maxRunningFutures = 100
  val stream = Source.queue[CantBeGenericAndWouldLikeToAvoidUnionTypingAllPossibleTypes]
    .mapAsync(maxRunningFutures)({ case (promise, k): (Promise, String) => {
      client.getString(k).andThen(promise.success(_))
    }...).toMat(Sink.ignore)


  def getString(key: String): Future[String] = {
    val promise = new Promise[String]()
    queue.offer((promise, key))
    promise.future
  }
}

The question for me is “CantBeGenericAndWouldLikeToAvoidUnionTypingAllPossibleTypes”: how to create a stream to process generic types.

Does that make any sense?

Ah okay yeah, in this case, I would use Source because of the length of the data; but a List is also fine if you are not having memory errors and you don’t want to add extra complexity. But, not limit the concurrency there (or have as an additional upper-level concurrency limit for other reasons).
Rather the Cache itself limits the amount of max requests by any other means.

A semaphore would be the best solution here IMHO, I don’t think it is low-level; at least not more than Future itself.
You may check if the AsyncClient itself can have some rate-limiting options or if you can use a rate-limiter library on top of it.

Thank you for the responses! For the moment I’m going the fixed threadpool route, and blocking the futures there, as they’re for the most part extremely low latency Futures. But I’ll keep the semaphore route in my back pocket now in case it becomes a bottleneck.