Parallel Processing

I’m creating engineering software where analysis of multiple loading conditions gets completed sequentially. Each of these analysis cases involve thousands to tens of thousands of calculations. The end result comes down to a boolean - if the item fails get a stronger one and start again.

Of course this is ideal for parallel processing. Unlike talking to web or a db, this solution has to block until the final state of things are known.

My idea to do this was cobbled together and looks like following test code. I used this code to work out the framework of my solution:

import scala.concurrent.Future
import scala.concurrent.Await
import scala.concurrent.duration.Duration
import scala.util.{Failure, Success}
import scala.collection.mutable.ListBuffer

object FutureTester extends App {

  var maxTime: Int = 10;    // milliseconds

  def method(x: Int): Int = {   // Normal method with out a future
    Thread.sleep(scala.util.Random.nextInt(maxTime));  // randomly sleep to maxTime
    x * x

  def doParallel(x: Int, y: Int, z: Int): Future[List[Int]] = {
    val result1 = Future(method(x))  // Note application of Future here.
    val result2 = Future(method(y))
    val result3 = Future(method(z))
    val result = for {
      r1 <- result1
      r2 <- result2
      r3 <- result3
    } yield List(r1, r2, r3)

  for (i <- 0 to 6) {  // do 6 runs doubling the maximum permitted sleep time each time
    val rList = new ListBuffer[Int]
    val result = Await.ready(doParallel(2, 3, 4), Duration.Inf) // do the parallel stuff while waiting
    result onComplete {
      case Success(result) => {
        for (rslt <- result)
          rList += rslt
      case Failure(e) => println(s"Failed with ${e.printStackTrace()}")
    val finalArray = rList.toArray   // will use the results as an array
    print(s"Max Time: ${maxTime} Final Array: ")
    for (e <- finalArray)
      print(s"$e ")
    maxTime *= 2 // double the maximum time to wait

As you can see it just calculates squares of numbers going through a loop with a different maximum sleep time, which doubles through each loop.

It works, mostly! For the longer square calculations with the times near a second the algorithm
silently fails frequently.

Typical runs of the program look like this:

Max Time: 10 Final Array: 4 9 16 
Max Time: 20 Final Array: 
Max Time: 40 Final Array: 
Max Time: 80 Final Array: 
Max Time: 160 Final Array: 4 9 16 
Max Time: 320 Final Array: 4 9 16 
Max Time: 640 Final Array: 


Max Time: 10 Final Array: 4 9 16 
Max Time: 20 Final Array: 4 9 16 
Max Time: 40 Final Array: 4 9 16 
Max Time: 80 Final Array: 4 9 16 
Max Time: 160 Final Array: 
Max Time: 320 Final Array: 
Max Time: 640 Final Array: 

The results come back in a random pattern each time.

I need the final algorithm to always return a result and sometimes that will take well over a second on my slower machine.

What have I done wrong here? My current web server is a single threaded but the machine at my office has twelve threads, and neither returns the full set of proper answers.

You are mutating rList outside the concurrency framework - there is no guarantee that another thread (than the one that runs the onComplete block) will ever see these changes.

Instead of “fixing” this, avoid shared, mutable state altogether. The simplest approach here would be to just await the result (rather than just the completion) of the future.

for(_ <- 0 to 6) {
  val result = Await.result(doParallel(2, 3, 4), Duration.Inf)
  println(s"Max Time: $maxTime Final Array: ${result.mkString(" ")}")
  maxTime *= 2

Moving one step further, you could run the whole process inside the future framework and only await terminal completion.

val res =
    .iterate(10)(_ * 2)
    .foldLeft(Future.successful(())) { (acc, maxTime) =>
      for {
        _ <- acc
        result <- doParallel(2, 3, 4)(maxTime)
        _ <- Future { println(s"Max Time: $maxTime Final Array: ${result.mkString(" ")}") }
      } yield ()
Await.ready(res, Duration.Inf)

No mutable state, no blocking. (You have to add the maxTime parameter to the #doParallel() and #method() signatures for this, obviously.)


My boss does not like it when I spend a lot of time helping people on these list servers, so this has to be short. My experience is with Java; I am just learning Scala. I wrote a highly parallel program that scraped the SETI@Home (S@H) results website to determine what combination of factors led to the fastest processing times for S@H work units (WUs). I wound up not using any of the concurrency features of Java; all the logic was based on work queues, semaphores, and wait lists. The main loop traversed my main results page on S@H looking for completed WUs not yet in the database (DB). For each such WU, a packet containing the result ID (rid), WU ID (wuid), and the WU name was put on the results work queue, and the queue semaphore signaled, freeing a worker who took a packet off the queue and gave it to the download queue and waited on a different semaphore until the page appeared in memory and the page could be parsed for the results of processing that WU. Similar logic pertained to the work unit queue which held requests for other users who had processed the same WU. There was also a user queue which held requests for the other users’ cpu type, frequency, and memory. It was critically important that each succeeding request be run at a higher priority. For example, the main loop ran at the lowest thread priority. When a packet was placed on the result worker queue, that worker ran at one priority higher than the main loop. The result downloader ran at one priority higher than the result worker, and etc. until the DB inserter, which had all the info scraped from the various web pages, ran at the highest thread priority. Until I made all the different types of workers run at different priorities, my program hung just like yours does. In the end, the number of workers waiting on the various queues determined the program speed, and I increased the number of workers until the Internet connection was saturated. (However, I seem to recall that there were only two workers for each queue.) Also, in the end, the program ran every day without failure, but it took weeks to arrive at that point. Once awakened, each worker processed its queue until the queue was empty, so there was much redundancy if a worker became stuck or was delayed for some reason.

For this kind of problem I have found Akka Streaming, specifically the mapAsync feature, to make things way simpler and avoids the necessity to solve all these complex multi- threading issues. The specific type of problem in talking about is basically a traversal of a complex solution “tree”, where the evaluation of each “node” in the tree requires a blocking computation (wrapped as a Future).

Maybe worth it to you to spend a few hours investigating in order to save several days debugging and feeling unsure about your solution.

Best Regards,
Brian Maso


Agreed in general. However, you’ll need a safe footing in basic future handling (e.g. avoid shared mutable state, don’t do effectful stuff in callbacks,…) when using akka-streams, so it probably doesn’t hurt to exercise this at a lower level a few times before pulling in further libs (and abstractions). And unless I’m missing something, the underlying use case here doesn’t sound like a complex tree, but rather a static set of concurrent tasks with a sync’ed, shared result that needs to be re-run with modified parameters until the result matches some predicate. This could be done with streams, of course, but probably it’d be sufficient to replace the fold in my example with a simple recursive function with an if/flatMap over each result level.

But the OP seems to have lost interest, anyway. :slight_smile:

1 Like

I used your first solution, which worked brilliantly and sped up the program considerably. Thank you very much for the solution.

Haven’t lost interest!

Just been very busy implementing and testing. And this is my side hustle. Not much free time.

The next question I had was about this type of problem in general and the use of Akka. It does appear from your comments and Brian Maso’s that Akka is worth exploring. It maybe a month or two before I can start that but I certainly will.

Thanks to all of you getting me going. The speed up in my program is quite noticable!

At some point you’ll definitely want to pull in further abstractions on top of plain futures, akka being one option. An alternative would be the cats/cats-effect/fs2 stack which swaps Scala Futures for the (cleaner or more rigid, depending on your point of view) IO abstraction.

1 Like

Thanks for the information. I’ll loose a few weekends learning how all this technology works. My main bottleneck in my software is just working through masses of calculations and then picking the appropriate solution. I’ll use anything that will speed and simplify that process. So far, in that regard, Scala has been a great choice.

My rule of thumb is that Akka is the way to go if you need to be maintaining serious state in a parallel program.

For a straight-up, feed-in-some-data-and-get-an-answer algorithm there are usually better approaches (I agree with @sangamon that using IO or one of its variants is usually best). Something like that is usually going to be more reliable.

But if you have state that persists at all, and parallel paths hitting it, Akka can be a useful way to tackle the problem. In particular, for problems that break down into clear “objects” with really clean lines between them, the Actor Model provides extreme scalability, and an approach that is generally thread-safe so long as you follow the disciplines.

That said, that’s a fairly limited domain. I’ve built serious stuff on top of the Akka suite of libraries, and they’re great for highly-stateful interactive applications, but I typically wouldn’t reach for it as my first choice, and I generally wouldn’t use it for the category of problem you’re looking at – it forces you to spend a lot of time thinking about plumbing, and there are a lot of easy mistakes you can make.