Future, executionContext and ThreadLocal

Hi all, I’m trying to execute Futures and preserve the caller-thread’s ThreadLocal-value, kept in DynamicDataSourceContext.

This is the code, taken from ThreadLocal Variables and Scala Futures - Steven Skelton's Blog, and it seems to work:

case class DynamicDataSourceContext(instanceId: String)

object DynamicDataSourceContext {
	private val CONTEXT = new ThreadLocal[DynamicDataSourceContext]

	def clear(): Unit = {
		CONTEXT.remove()
	}

	def create(contextKey : String): DynamicDataSourceContext = {
		create(DynamicDataSourceContext(contextKey))
	}

	def create(context : DynamicDataSourceContext): DynamicDataSourceContext = {
		val current = CONTEXT.get()
		if ( current != null) {
			throw new IllegalStateException(s"Attemting to set context to ${context} when it is already set to ${current}")
		}
		CONTEXT.set(context)
		context
	}

	def getContext() = CONTEXT.get()

	def hasContext() : Boolean = {
		CONTEXT.get() != null
	}

}


class DataSourceContextAwareForkJoinPool extends ForkJoinPool {

	override def execute(task: Runnable): Unit = {
		val copyValue = DynamicDataSourceContext.getContext()
		super.execute(() => {
			try {
				DynamicDataSourceContext.create(copyValue)
				task.run()
			} finally {
				DynamicDataSourceContext.clear()
			}
		})
	}
}

object Fisk extends App {

	//use our new execution context
	implicit val executionContext: ExecutionContextExecutorService = scala.concurrent.ExecutionContext.fromExecutorService(new DataSourceContextAwareForkJoinPool)

	DynamicDataSourceContext.create("FISH!!!")

	//prints thread information and dynamic variable value.
	def printThreadInfo(id: String): Unit = {
		Thread.sleep((Math.random() * 1000).toLong)
		println {
			id + " : " + Thread.currentThread.getName + " = " + DynamicDataSourceContext.getContext().instanceId
		}
	}

	val fut1 = Future { printThreadInfo("fut1") }
	val fut2 = Future { printThreadInfo("fut2") }
	val fut3 = Future { printThreadInfo("fut3") }

	Await.result(fut1, 1.second)
	Await.result(fut2, 1.second)
	Await.result(fut3, 1.second)

	val fut4 = Future { printThreadInfo("fut4") }
	val fut5 = Future { printThreadInfo("fut5") }

	Await.result(fut4, 1.second)
	Await.result(fut5, 1.second)
}

This relies, of course, on that the execute-method in DataSourceContextAwareForkJoinPool:

override def execute(task: Runnable): Unit

is executed in the context of the calling-thread.

So my question is: Is it guaranteed that this execute-method is executed in the context of the calling-thread, so that this will always work?

Thanks.

Hi. That’s rather unpleasant combination. Futures are not designed to stick to any thread or any other thread-like abstraction.

Look at this code, it shows that Futures chain execution jumps from thread to thread:

import java.util.concurrent.Executors
import scala.concurrent.duration.Duration
import scala.concurrent._

object MultipleThreadPools {
  private def makeSingleThreadEc(
      name: String): ExecutionContextExecutorService = {
    val defaultTf = Executors.defaultThreadFactory()
    ExecutionContext.fromExecutorService(
      Executors.newSingleThreadExecutor(runnable => {
        val result = defaultTf.newThread(runnable)
        result.setName(name)
        result
      }))
  }

  private val threadPoolAbc = makeSingleThreadEc("abc")
  private val threadPoolDef = makeSingleThreadEc("def")
  private val threadPoolGhi = makeSingleThreadEc("ghi")

  private def printThreadName(prefix: String): Unit =
    println(s"$prefix: thread=${Thread.currentThread().getName}")

  def main(args: Array[String]): Unit = {
    println("usual for-comprehensions syntax")
    Await.result(forComprehensions(), Duration.Inf)
    println()
    println("after desugaring (output should be the same)")
    Await.result(desugaredForComprehensions(), Duration.Inf)
    sys.exit()
  }

  private def forComprehensions(): Future[Unit] = {
    implicit val abcEc: ExecutionContext = threadPoolAbc
    for {
      _ <- Future(printThreadName("standalone future 1"))(threadPoolDef)
      _ = printThreadName("future from for desugaring 1")
      _ <- Future(printThreadName("standalone future 2"))(threadPoolGhi)
    } yield {
      printThreadName("future from for desugaring 2")
    }
  }

  // same as method forComprehensions, but desugared using IntelliJ
  private def desugaredForComprehensions(): Future[Unit] = {
    implicit val abcEc: ExecutionContext = threadPoolAbc
    Future(printThreadName("standalone future 1"))(threadPoolDef)
      .map { _ =>
        val _ = printThreadName("future from for desugaring 1"); ()
      }
      .flatMap(_ =>
        Future(printThreadName("standalone future 2"))(threadPoolGhi)
          .map(_ => printThreadName("future from for desugaring 2")))
  }
}

Output:

usual for-comprehensions syntax
standalone future 1: thread=def
future from for desugaring 1: thread=abc
standalone future 2: thread=ghi
future from for desugaring 2: thread=abc

after desugaring (output should be the same)
standalone future 1: thread=def
future from for desugaring 1: thread=abc
standalone future 2: thread=ghi
future from for desugaring 2: thread=abc

Now to the main question:

Either the question is wrongly stated or I don’t get it. How can a method be executed outside of the callling-thread? If I’m in thread A and I call methodB() then methodB() is still executed in thread A.

However, I can try to explain which thread executes the execute method. Scala Futures are interfaces implemented by Scala Promises (under the hood, not publicly). When a Promise is completed, all the callbacks registered on it (using future.foreach, future.flatMap, future.recoverWith and so on) are immediately scheduled on their respective execution contexts (you always have to provide one for each callback / transformation function / etc). Scheduling (i.e. running threadPool.execute(task)) is done in the thread that completed the Promise. Examples:

import java.util.concurrent.{Executors, ForkJoinPool}
import scala.concurrent._

object MultipleCompletingThreads {
  private def printThreadName(prefix: String): Unit =
    println(s"$prefix in thread=${Thread.currentThread().getName}")

  class ContextAwareForkJoinPool extends ForkJoinPool {

    override def execute(task: Runnable): Unit = {
      printThreadName(s"${getClass.getSimpleName}.execute(task) called")
      super.execute(task)
    }
  }

  private def makeSingleThreadEc(
      name: String): ExecutionContextExecutorService = {
    ExecutionContext.fromExecutorService(
      Executors.newSingleThreadExecutor(runnable => new Thread(runnable, name)))
  }

  private val threadPoolAbc = makeSingleThreadEc("abc")
  private val threadPoolDef = makeSingleThreadEc("def")
  private val threadPoolGhi = makeSingleThreadEc("ghi")

  def main(args: Array[String]): Unit = {
    implicit val ec: ExecutionContextExecutorService =
      ExecutionContext.fromExecutorService(new ContextAwareForkJoinPool())

    val promiseA = Promise[Int]()
    val promiseB = Promise[Int]()
    val promiseC = Promise[Int]()
    val promiseD = Promise[Int]()
    promiseA.future.foreach(v => printThreadName(s"got $v in promise A"))
    promiseB.future.foreach(v => printThreadName(s"got $v in promise B"))
    promiseC.future.foreach(v => printThreadName(s"got $v in promise C"))
    promiseD.future.foreach(v => printThreadName(s"got $v in promise D"))

    Future(promiseA.success(3))(threadPoolDef)
    Thread.sleep(234)
    Future(promiseB.success(4))(threadPoolAbc)
    Thread.sleep(234)
    Future(promiseC.success(5))(threadPoolGhi)
    Thread.sleep(234)

    promiseD.success(6)
    Thread.sleep(234)

    sys.exit()
  }
}

Output:

ContextAwareForkJoinPool.execute(task) called in thread=def
got 3 in promise A in thread=ForkJoinPool-1-worker-1
ContextAwareForkJoinPool.execute(task) called in thread=abc
got 4 in promise B in thread=ForkJoinPool-1-worker-1
ContextAwareForkJoinPool.execute(task) called in thread=ghi
got 5 in promise C in thread=ForkJoinPool-1-worker-1
ContextAwareForkJoinPool.execute(task) called in thread=main
got 6 in promise D in thread=ForkJoinPool-1-worker-1

However, the behaviour is not specified in the documentation (or at least I haven’t found it anywhere), so future versions of Scala Futures are free to move scheduling of tasks to any other thread they like.

Last thing: which thread completes the Promise backing a Future being a result of transformation?

val futureA = ...
val futureB = futureA.map(mapFunction1)(threadPool1)

In above example:

  • futureB is backed by Promise (and probably futureA too, but that’ not relevant here)
  • second line registers a callback on futureA. The callback body is mapFunction1 and associated thread pool is threadPool1
  • after futureA finished successfully and also mapFunction1 finished successfuly then the Promise backing futureB is completed from threadPool1, because that’s where the value for it was computed (again, that’s an implementation detail that can change)

Thanks for the explanation.
From what I can see calls to Future.apply result in the ForkJoinPool.execute-method being called in the same thread. What I’m interested in is if this is guaranteed to happen in the same thread?

Thanks.

Depends on what do mean by guarantee. The implementation of Futures and Promises in current version of Scala will schedule the Future body in the same thread in which the Future.apply was called. However, that’s not formally guaranteed as it’s not in the specification, so it could threoretically be changed in any next Scala version (excluding patch versions).

OTOH, there’s nothing stopping you from creating your own Future factory method that will do what you want, e.g.:

  object MyFutureFactory {
    def apply[T](body: => T)(exeCtx: ExecutionContext): Future[T] = {
      val promise = Promise[T]()
      exeCtx.execute(() => promise.complete(Try(body)))
      promise.future
    }
  }

Ah, nice - thanks.