Functional style for infinite loop

Is there a way to implement this in a more functional way? As far as I understand, break is not the (pure) functional way to go.

class Task extends Runnable {
  override def run(): Unit = {
    val service = Service()

    breakable {
      while (true) {
        val status = service.take()
        println(status)
        if (status.reset() == false) break
      }
    }
  }
}

Or is even everything fine with this?

Disclaimer: This is a simplified sample, reduced to the parts of the elements which matter for this question.

What I came up myself is:

class Task extends Runnable {
  override def run(): Unit = {
    implicit val service: Service = Service()

    checkStatus
  }

  @scala.annotation.tailrec
  def checkStatus(implicit service: Service): Unit = {
    val status = service.take()
    println(status)
    if (status.reset()) {
      checkStatus
    }
  }
}

With the code being based on Java threads and having side effects as its only result, it’s pretty non-functional to start with. :slight_smile:

But, yes, a tail-recursive function, as in your own answer, certainly is a more functional style than while/break. (It is advisable to annotate your function with @tailrec, BTW.)

Another option might be some lazy data structure like LazyList - however, AFAICS its API only comes with an exclusive #takeWhile(), while you still want to get the side effects on the element satisfying the exit condition, so you’d have to implement your own #takeWhileInclusive() on top somehow.

2 Likes

The actual use case is to permanently check if a file has been modified. I do this by the means of Java NIO’s WatchService which provides a blocking take() method.

I already thought of using Akka but I believe this is a bit of an overhead in this case. After all, even a pure functional implementation needs to have side-effects at a certain point, because if it would not, there was no reason to exist in the first place, right?

There’s some libs wrapping this functionality, e.g. os-lib and better-files. (I have used neither of those so far.) You may want to take a peek, either to use them as is or to get some inspiration for the design of your binding.

Right. :slight_smile: However, there’s a wide range for encapsulating/deferring/hiding the impure part(s).

Java threads are at one extreme end - Runnable doesn’t return anything, it’s fully opaque and exclusively about side effects. On the other end you have something like IO, basically turning your program into a function RealWorld => RealWorld. Scala’s Future (and thus akka, being based on futures) is taking some kind of middle ground - it has a “return type” and nice combinators, but it’s not referentially transparent.

Personally I’d think that you’ll fare better even in the short run at least using futures instead of plain old threads - but YMMV, and there’s nothing inherently wrong about matching and mixing. I was just poking at the perceived discrepancy between plain threads, being as “non-functional” as it gets, and asking for purely functional improvements.

2 Likes

Look ma no while or break:

Iterator.continually(service.take())
  .takeWhile(_.reset())
  .foreach(println)

There is a small difference in semantics here in that I’m calling reset before println. If that reordering isn’t acceptable, you could:

Iterator.continually(service.take())
  .takeWhile{status => println(status); status.reset()}
  .foreach(_ => ())
6 Likes

I tried something else:

package org.example.watcher

import java.nio.file.{FileSystems, Path, StandardWatchEventKinds}

import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future}
import scala.jdk.CollectionConverters._
import scala.util.{Failure, Success, Try}

object Watcher extends App {
  val home = Path.of(System.getProperty("user.home"))
  val watchService = FileSystems.getDefault.newWatchService()
  home.register(watchService, StandardWatchEventKinds.ENTRY_MODIFY)

  implicit val executionContext: ExecutionContextExecutor = ExecutionContext.global
  Future(watchPath).onComplete(onComplete)

  def watchPath: Path = {
    val watchKey = watchService.take()
    val event = watchKey.pollEvents().asScala.head
    val path = event.context().asInstanceOf[Path]
    watchKey.reset()
    path
  }

  def onComplete(result: Try[Path]): Unit = result match {
    case Success(path) =>
      println(path)
      Future(watchPath).onComplete(onComplete)
    case Failure(exception) => throw exception
  }
}

But this does not work, after the first val watchKey = watchService.take() the application exists.

Additional question: Is the syntax highlighting broken?

Futures are executed “fire-and-forget” on daemon threads - they won’t affect your main thread’s exit. In order to observe the effects of a chain of futures, you need to, well, chain them, e.g. by making them depend on their predecessor via #flatMap() (often phrased as for-expressions), and wait for the resulting cumulative future “at the end of the world”.

A naive attempt, replacing your code starting with Future(watchPath).onComplete(onComplete):

  def takeKey(): Future[WatchKey] = Future { blocking { watchService.take() } }

  def resetKey(watchKey: WatchKey): Future[Boolean] = Future { watchKey.reset() }

  def eventPaths(watchKey: WatchKey): List[Path] = {
    watchKey
      .pollEvents()
      .asInstanceOf[java.util.List[WatchEvent[Path]]]
      .asScala
      .toList
      .map(_.context())
  }

  def printPaths(paths: List[Path]): Future[Unit] = Future { paths.foreach(println) }

  def watchStep(): Future[Boolean] =
    for {
      key <- takeKey()
      paths = eventPaths(key)
      _ <- printPaths(paths)
      cont <- resetKey(key)
    } yield cont

  def watch(): Future[Unit] =
    for {
      cont <- watchStep()
      next <- if(cont) watch() else Future.successful(())
    } yield next

  Await.ready(watch(), Duration.Inf)

(I don’t know the file watcher API at all, and I’m in a bit of a hurry right now, so I’m relying on other readers to point out and fix any blatant errors I’ve introduced in the above code.)

And yes, the syntax hightlighting is kind of broken. :slight_smile:

Sometimes it helps if you write scala after the triple backquotes (```scala) that start the code block. Otherwise the highlighter just does a “best” guess at which highlighting algorithm to choose. But even then YMMV… The scala hint seems to work better on other markdown renderers.

1 Like

Thank you for your help! I have to admit that I am not sure if I understand what you did there. I believe I have to go through quite a lot advanced concepts to understand that.

I would have written this:

  class Task extends Runnable {
    def run(): Unit = {
      val service: Service = Service()

      @scala.annotation.tailrec
      def checkStatusLoop(): Unit = {
        val status = service.take()
        println(status)
        if (status.reset())
          checkStatusLoop()
      }

      checkStatusLoop()
    }
  }

Any advantage in using an implicit argument?

It’s mainly the general concept of Futures plus #map()/#flatMap()/for-expressions (extending beyond futures and likely not intuitive when encountered for the first time). The basics are covered in the starter’s book, but it’s certainly advisable to go with a more in-depth resource.

The code you posted kind of works, but just kind of. The first problem is that your main thread will just exit after it has spawned the initial future. If at the end you add some code that keeps your main thread running infinitely, like

Await.ready(Future.never, Duration.Inf)

…you’ll actually see the watcher happily reacting to events. You don’t have an abstraction/handle for the chain of futures that comprise the loop, though - each #onComplete() handler will just fire up a new, unrelated future. In order to represent this sequential chain as an aggregate future instance of its own, you can combine the futures via #flatMap() instead. Example:

object FutureLoop extends App {

  import scala.concurrent._
  import scala.concurrent.duration._

  import ExecutionContext.Implicits.global

  def loopStep(start: Int): Future[Int] =
    Future {
      println(start)
      blocking { Thread.sleep(1000) }
      start + 1
    }

  def loopFlatMap(start: Int, max: Int): Future[Unit] =
    loopStep(start).flatMap { next=>
      if(next > max) Future.successful(()) else loopFlatMap(next, max)
    }

  Await.ready(loopFlatMap(1, 10), Duration.Inf)
}

This way, you’ll have a future instance that represents the complete loop - you can wait for it in your #main() and/or combine it with other futures.

Instead of using #flatMap() directly, you could equivalently express this using a for-expression.

  def loopFor(start: Int, max: Int): Future[Unit] =
    for {
      next <- loopStep(start)
      res <- if(next > max) Future.successful(()) else loopFor(next, max)
    } yield res

  Await.ready(loopFor(1, 10), Duration.Inf)

The Watcher example I’ve posted before uses exactly the same structure. The main difference is that I’ve further split up the “loop body” (i.e. #watchStep()) into substeps. I’d hoped for this to show how for-expressions can be used to express multiple nested #flatMap() invocations more conveniently like a linear sequence of calls.

3 Likes

Thank you for this explanations, I will go over them step-by-step and hopefully I will understand those concepts soon :slight_smile: