Old `parallel` function

I’m trying to revisit some old code (from the Coursera Scala course). It no longer compiles. The first error I encounter is the use of the function parallel. What’s the correct way to fix this use? Do I need to do something with Future? For example, make a List of 4 Future objects, and then map Await.result(_, Duration.Inf) across the List?

 def fill (minX:Int, maxX:Int, minY:Int, maxY:Int, depth:Int):Unit = {
      val halfX = minX + (maxX - minX) / 2
      val halfY = minY + (maxY - minY) / 2
      if (depth < resolution) {
        parallel(
          fill(minX, halfX, minY, halfY, depth + 1), fill(halfX, maxX, minY, halfY, depth + 1),
          fill(minX, halfX, halfY, maxY, depth + 1), fill(halfX, maxX, halfY, maxY, depth + 1))
      }
      else {
        println(s"depth=$depth minX=$minX maxX=$maxX minY=$minY maxY=$maxY")

        val center = Location(
          lat0 + (lat1 - lat0) * (halfY - minY) / (maxY - minY),
          lon0 + (lon1 - lon0) * (halfX - minX) / (maxX - minX))

        def distToCenter(pair: (Location, Double)) = {
          val (loc, celsius) = pair
          euclidianDistanceSqr(loc, center)
        }

        val sortedTemperatures: Array[(Location, Double)] = temperatures.toArray.sortBy(distToCenter)
        val someTemperatures = sortedTemperatures.take(sortedTemperatures.length/2)
        for {imageX <- minX until maxX;
             imageY <- minY until maxY} {
          arr(imageX + 256 * imageY) = calcArrayEntry(someTemperatures, imageY, imageX)
        }
      }
    }

I’m using the following translation. But I’m not sure whether it is equivalent or not, especially since the previous parallel call contained recursive calls to fill.

    def fill (minX:Int, maxX:Int, minY:Int, maxY:Int, depth:Int):Unit = {
      val halfX = minX + (maxX - minX) / 2
      val halfY = minY + (maxY - minY) / 2
      if (depth < resolution) { 
/////////////////// translating call to parallel(...) with Future/this/and/that
        import scala.concurrent.{Future,Await}
        import scala.concurrent.duration.Duration
        import scala.concurrent.ExecutionContext.Implicits.global
        List(
          Future{fill(minX, halfX, minY, halfY, depth + 1)}, Future{fill(halfX, maxX, minY, halfY, depth + 1)},
          Future{fill(minX, halfX, halfY, maxY, depth + 1)}, Future{fill(halfX, maxX, halfY, maxY, depth + 1)}
          ).map{Await.result(_,Duration.Inf)}
      }
      else {
        println(s"depth=$depth minX=$minX maxX=$maxX minY=$minY maxY=$maxY")

        val center = Location(
          lat0 + (lat1 - lat0) * (halfY - minY) / (maxY - minY),
          lon0 + (lon1 - lon0) * (halfX - minX) / (maxX - minX))

        def distToCenter(pair: (Location, Double)) = {
          val (loc, celsius) = pair
          euclidianDistanceSqr(loc, center)
        }

        val sortedTemperatures: Array[(Location, Double)] = temperatures.toArray.sortBy(distToCenter)
        val someTemperatures = sortedTemperatures.take(sortedTemperatures.length/2)
        for {imageX <- minX until maxX;
             imageY <- minY until maxY} {
          arr(imageX + 256 * imageY) = calcArrayEntry(someTemperatures, imageY, imageX)
        }
      }
    }

Hmm. Looks plausible – that Await.result() is an anti-pattern in the Akka/Play world, so it makes me twitch, but might be the correct literal translation. You’re going to soak your threads, but that may be the intent. Not sure what the thread-switching overhead would be; if you care about efficiency, you might want to investigate cats-effect / Monix / ZIO, which might be more efficient. (Not sure, that’s just a guess.)

Not sure, though, because I’m not familiar with this old parallel() function. Any clue where it came from?

I remember just thinking it was a primitive operation when I took the Coursera-Scala course some time ago.

I’m not sure what the effect of starting lots of threads which do nothing but wait on other threads to finish. That’s what’s happening here. Each thread starts 4 smaller threads, until a threshold is met, which drops into the else part. This seems to run (makes progress) on my very old macbook. It seems, that on macOS starting a bunch of threads and waiting is not an expensive operation.

I’m not sure this approach is preferable at all and whether it’s an advantage over parallel collections, would have to look into it deeper. However, I’d drop the Await in favor of Future#sequence() and return a Future (for further processing inside the Future context or Await'ing on the caller).

1 Like

Can someone comment on what the problem with Await is? The goal of the fill function is to divide filling-in (mutably) a large 2d array by building lots of small ranges (minX to maxX) and (minY to maxY) and to process those concurrently as they are independent at the leaf level. The processing takes several hours, so I really want to wait until it’s done. Not have it done lazily. I.e., I don’t want to try to access something later, and then have to wait 1 hour for the lazy computation to be done.

The quick answer is that threads are fairly heavyweight and expensive, and are a limited resource; thus, blocking threads tends to hurt efficiency. In a serious case, it can lead to thread starvation, deadlock, and other such horrors. (And thread switching isn’t free either.)

It sounds like it’s working for you, and it is possible that it’s using 2.13’s new parasitic-execution-context mechanism, which might somewhat ameliorate these problems. (I haven’t dug into exactly how that works, or when it comes into play.)

But it’s worth digging into and understanding the threading side of the world: folks often treat threads as free, and they very much aren’t. In high-efficiency scenarios (eg, a Play webserver), blocking threads without very careful thread management is the biggest no-no – it can slow your server by an order of magnitude.

The advantage of something like cats-effect, Monix or ZIO (all of which work similarly from the thousand-foot view) is that you aren’t naively using native threads; instead, you are describing the operation, and then using an engine to more-intelligently schedule the threading. I gather that in some applications this can produce an order-of-magnitude speed improvement, although it depends on the circumstances. Broadly speaking, it sounds like you expect lazy computations to be slow; the reverse is typically more likely, I believe, because the lazy approach allows introspection and optimization that isn’t possible in an eager algorithm. (However, I should note that I’m not an expert on this.)

It depends on thread management, i.e. the ExecutionContext.

Many ExecutionContexts have a pool of a relatively small number of threads running in parallel, often the number of CPUs or maybe twice that. I think the default ExecutionContext works like that. Such a thread pool ExecutionContext is essentially designed based on the assumption that you never submit anything that blocks, because anything that blocks will effectively take away a thread from the pool, up to blocking everything.

Thread pools are popular, because there is a cost in creating new threads, running threads in parallel, and resuming blocked thread.

On the other hand, suspended thread itself does not cost much by staying suspended. So, in principle, you could create your own ExecutionContext that runs many threads in parallel and then you could suspend a number of them.

The Scaladoc already states it:

While occasionally useful, e.g. for testing, it is recommended that you avoid Await whenever possible— instead favoring combinators and/or callbacks. Await’s result and ready methods will block the calling thread’s execution until they return, which will cause performance degradation, and possibly, deadlock issues.

Then you still should wait for the whole bulk of work to be completed, rather than for single intermediate steps.

Just for illustration, ignoring the perils of poor man’s microbenchmarking and all…

import scala.concurrent._
import scala.concurrent.duration._
import scala.collection.parallel._

object ParWork extends App {

  import ExecutionContext.Implicits.global
  import CollectionConverters._

  type Millis = Long

  trait Out {
    def p(s: String): Unit
  }

  object NullOut extends Out {
    override def p(s: String): Unit = ()
  }

  object ConsoleOut extends Out {
    override def p(s: String): Unit = println(s)
  }

  def time(block: => Unit): Millis = {
    val start = System.currentTimeMillis()
    block
    System.currentTimeMillis() - start
  }

  def run(block: Out => Unit): Millis = {
    (1 to 3).foreach(_ => block(NullOut))
    time(block(ConsoleOut))
  }

  val times = Seq[Millis](100L, 500L, 400L, 800L, 200L, 700L, 300L, 600L)

  def doWork(t: Millis)(out: Out): Unit = {
    out.p(s"S $t (${System.currentTimeMillis()})")
    Thread.sleep(t)
    out.p(s"F $t (${System.currentTimeMillis()})")
  }

  def withAwait(): Millis = {
    def go(ts: Seq[Millis])(out: Out): Unit = {
      ts match {
        case Seq() => ()
        case Seq(t) => doWork(t)(out)
        case ts =>
          val (ts1, ts2) = ts.splitAt(ts.size / 2)
          Seq(Future { go(ts1)(out) }, Future { go(ts2)(out) }).foreach(Await.result(_, Duration.Inf))
      }
    }
    run(go(times))
  }

  def withSequence(): Millis = {
    def go(ts: Seq[Millis])(out: Out): Future[Unit] = {
      ts match {
        case Seq() => Future.successful(())
        case Seq(t) => Future { doWork(t)(out) }
        case ts =>
          val (ts1, ts2) = ts.splitAt(ts.size / 2)
          Future.sequence(Seq(go(ts1)(out), go(ts2)(out))).map(_ => ())
      }
    }
    run(out => Await.result(go(times)(out), Duration.Inf))
  }

  def withParSeq(): Millis = {
    run(out => times.par.foreach(doWork(_)(out)))
  }

  println(withAwait())
  println(withSequence())
  println(withParSeq())

}

Scala Futures are somewhat cumbersome for a variety of reasons, anyway, so I’d second @jducoeur’s suggestions in the long run, but that’s yet another story that unfortunately doesn’t fit the margin of this thread. :slight_smile: