Execute a LazyList[IO] until a deadline is reached

I have a function that returns me a LazyList[IO[_]]. The IO is wrapped around an http call. I am trying to take items from the LazyList and execute them in parallel until a deadline is reached. Something like this:

val deadLine = timeToRunInSeconds.seconds.fromNow
foo(someParameter)  //Function that returns LazyList[IO[_]]
  .takeWhile(_ => deadLine.hasTimeLeft)
  .parSequence
  .as(ExitCode.Success)

But the problem with this is that it takes items without executing them. If my timeToRunInSeconds is 10, then I am guessing that this code will take items from the lazy list for 10 seconds (which will be humongous) and then started executing them (parSequence). But that was not my intention. I want it to actually execute (make the HTTP calls) for the duration of 10 seconds.

If you are already using IO it would be better to use fs2 rather than LazyList.

sorry, I have not used fs2. So this might be a basic question. When you say that it would be better to use fs2, do you mean that the function foo should return a fs2.Stream[IO[_]] ?

The amount of new requests you can keep starting in 10 seconds is just as unwieldy.

Maybe it’s more sensible to have some fixed limit of requests you want to have in flight, and use a semaphore: https://typelevel.org/cats-effect/concurrency/semaphore.html

3 Likes

Use a semaphore while still keeping the parSequence?

As usual with the cats[-effect] stack, it’s probably more like “all in” rather than just switching a container type. :wink:

Here’s a naive attempt, based on the interruption example from the fs2 guide and a “code completion search” discovery of #parEvalMapUnordered().

Disclaimer: I’m not yet entirely fluent with cats[-effect] and pretty new to fs2. I just took this thread as an occasion to look a bit deeper into fs2 beyond plain linear stream transformations. I haven’t understood all of the implications, this may all be glaringly wrong, and it almost certainly isn’t idiomatic code for the cats[-effect]/fs2 stack. Any corrections or suggestions for improvement are appreciated. (I’d also be really curious as to how one might approach cancelling “in progress” items upon cutoff starting from here.)

import cats.effect._
import fs2._

import scala.concurrent.duration._
import scala.language.postfixOps

object IOLimitMain extends IOApp {

  def ids: Stream[IO, Int] =
    Stream.fromIterator[IO](Iterator.from(1))

  def workItem(id: Int): IO[FiniteDuration] = {
    for {
      time <- IO { (Math.random() * 1000).toInt milliseconds }
      _ <- timer.sleep(time)
      _ <- IO { println(s"$id ($time)")}
    } yield time
  }

  def worker(inputs: Stream[IO, Int]): Stream[IO, FiniteDuration] =
    inputs.parEvalMapUnordered(3)(workItem)

  override def run(args: List[String]): IO[ExitCode] = {
    val program = worker(ids).interruptAfter(3 seconds)
    for {
      start <- timer.clock.realTime(MILLISECONDS)
      busyMs <- program.map(_.toMillis).compile.toList
      stop <- timer.clock.realTime(MILLISECONDS)
      summary = s"busy: ${busyMs.sum} ms, wall: ${stop - start} ms"
      _ <- IO { println(summary) }
    } yield ExitCode.Success
  }

}

[EDIT: fixed FiniteDuration to millis conversion]
[EDIT: #interruptAfter() instead of explicit Deferred switch]

1 Like

I used parEvalMapUnordered and interruptAfter from your example and came up with this

val httpCalls: Stream[IO, Request[Unit]] = ???
httpCalls
 .parEvalMapUnordered(10)(r => IO(r.execute))
 .interruptAfter(timeToRunInSeconds.seconds)
 .compile
 .toList
 .as(ExitCode.Success)

This seems to work. But my machine starts hanging when I increase the timeToRunInSeconds. I did not expect it to hang as the concurrency is 10 irrespective of how long the thing runs. Also since this is stream, the data is loaded as required. So, I am not sure why increasing the timeToRunInSeconds hangs my machine.

Again, a self-contained, compilable and runnable example that exhibits the problematic behavior, along with environment/version information, probably would encourage readers willing to investigate. (Plus, assembling such an example often helps me to understand/solve the issue at hand myself before even posting my question.)