It there a way to stop a method immediately?

Let say I have a method def computation(): String = ??? which a CPU-bound task and provided by a third party.
How can I call computation() and wait for x minutes, if it does not finish in x minute, I want to get a Left and kill the task totally otherwise I get a Right(result)
I have tried

  • Await.ready
  • Await.result
  • Thread.interrupt
  • ExecutorService.shutdown
  • ExecutorService.shutdownNow
    They just give me a exception, but not stopping the computaion immediately.
2 Likes

The Java Future returned by executor.submit has a get method that can be passed a timeout.
Wrap it in a Try to catch the exception and map it to an Either. Before returning a Left for the exception, call executor.shutdownNow() to interrupt the active thread.

Example

class TimeoutCallable[T](callable: Callable[T], timeout: Duration)
  extends Callable[Either[Throwable, T]] {

  override def call(): Either[Throwable, T] = {
    // Use something like Executors.newSingleThreadExecutor if you do not use virtual threads from Loom (JDK 21)
    Using.resource(Executors.newVirtualThreadPerTaskExecutor()) { executor =>
      Try {
        executor
          .submit(this.callable)
          .get(timeout.length, TimeUnit.of(timeout.unit.toChronoUnit))
      } match {
        case Failure(exception) =>
          executor.shutdownNow()
          Left(exception)
        case Success(value) =>
          Right(value)
      }
    }
  }

}

Usage

val result: Either[Throwable, String] = 
    new TimeoutCallable(() => computation(), 1.second).call()
1 Like

newVirtualThreadPerTaskExecutor

I will have a try.

The answer is no, if the thread does not want to relinquish control, there is little you can do about it.

(You can terminate the JVM.)

The Javadoc for shutdown says

There are no guarantees beyond best-effort attempts to stop processing actively executing tasks. This implementation interrupts tasks via Thread.interrupt(); any task that fails to respond to interrupts may never terminate.

If you are lucky the code running on the thread will do something that tests interrupt status and bail.

2 Likes

This did work out for me:

import scala.concurrent.duration.*
import cats.effect.IO
import cats.effect.unsafe.implicits.global

object Example:
  def veryExpensive(n: Long): Long =
    if (n < 10000L) { Thread.sleep(10); veryExpensive(n + 1) } else n
    

  def check(n: Long) = IO.interruptibleMany(veryExpensive(n)).timeout(2.seconds).attempt

  def runOk = check(9999).unsafeRunSync()
  def runFail = check(0).unsafeRunSync()

                                                                                                                                                      
scala> Example.runFail
val res2: Either[Throwable, Long] = Left(java.util.concurrent.TimeoutException: 2 seconds)
                                                                                                                                                      
scala> Example.runOk
val res3: Either[Throwable, Long] = Right(10000)

As far as I understand, it’s not guaranteeed to interrupt, but will do the best and will keep repeating.

Upd: if I remove Thread.sleep make the number bigger - it indeed fails to interrupt. Perhaps purely CPU-bound tasks indeed are harder to interrupt.

Count me among the unlucky ones. This has been a perennial problem for me because I need to run tests on student code. For years, I used the deprecated stop method in a very controlled way, but now that it’s going away, I can’t even do that. I check for interrupts within the testing code as often as possible, e.g.:

while someCondition do interruptibly { ... }

where interruptibly is defined as:

inline def interruptibly[A](inline code: A): A =
   if Thread.interrupted() then throw InterruptedException() else code

But if the infinite loop (or tail recursion) is in the student code, I’m stuck. I use other tricks (making students inherit from classes that check for interrupts in their constructor and their equals method, for instance), but nothing is foolproof. I wish someone would come up with a solution (instrumented bytecode dedicated to teaching?) and if other educators have great ideas, I’d love to hear from them.

1 Like

@charpov Hopefully you see this. I was struggling now with the same problem and solved it using futures. One can wrap the tests in futures end then handle them, somewhat like this:

override def withFixture(test: NoArgTest) =

    implicit val ec: scala.concurrent.ExecutionContext = scala.concurrent.ExecutionContext.global

    val future = Future {
      super.withFixture(test)
    }

    try
      Await.ready(future, timeLimit)
      future.value.get.get
    catch
      case _: TimeoutException =>
        fail(s"Running this test took more than ${timeLimit.prettyString} - interrupting.")

That doesn’t really stop the Future, they are uncancellable.
You are just canceling the wait so your main thread can continue, but the Future is still running.

Whenever or not that matters on your specific case is a different thing.

2 Likes

Yes, that’s the problem. I did define an Async tag using withFixture to run a test in a separate thread, but too many failed tests can result in many threads left, some of then actively using CPU (hence my interruptibly and similar attempts to make code more responsive to interrupts).

1 Like

I did not realize the downside myself, although it is acceptable in my usecase. Thanks for the explanation!