In ammonite repl:
Snippet 1
import $ivy.`org.typelevel::cats-core:2.1.1`
import $ivy.`org.typelevel::cats-effect:2.1.1`
import $ivy.`co.fs2::fs2-core:2.4.0`
import scala.concurrent.ExecutionContext
import scala.concurrent.ExecutionContext.global
import scala.concurrent.duration._
import cats.effect.IO
import fs2.Stream
implicit val contextShift = IO.contextShift(global)
implicit val timer = IO.timer(global)
val stream = Stream.fromIterator[IO](Iterator.from(1))
stream.interruptAfter(3.seconds).compile.toList.unsafeRunSync.size
Snippet 2
import $ivy.`org.typelevel::cats-core:2.1.1`
import $ivy.`org.typelevel::cats-effect:2.1.1`
import $ivy.`co.fs2::fs2-core:2.4.0`
import scala.concurrent.ExecutionContext
import scala.concurrent.ExecutionContext.global
import scala.concurrent.duration._
import cats.effect.IO
import fs2.Stream
implicit val contextShift = IO.contextShift(global)
implicit val timer = IO.timer(global)
val stream = Stream.fromIterator[IO](Iterator.from(1))
stream.interruptAfter(3.seconds).compile.toList.map(_.size).unsafeRunSync
Snippet 1 executes and returns res2: Int = 257052
. But Snippet 2 never finishes. What is the difference between the two?
1 Like
It’d probably help others to reproduce the issue if you provided a complete compilable/runnable example along with the Scala and library versions and config settings used.
My first guess would be that you’re running into some REPL specific issue - see the fs2 FAQ.
2 Likes
Edited the post to add the full code
My guess would be that your guess is correct. When I run the second snippet in the 2.13.3 REPL which is class based and should avoid those deadlocks then it finishes without issues. I don’t know whether ammonite ever actually had those issues though (edit: it probably does).
You can do this to fix the error in Ammonite.
// file.sc
import $ivy.`org.typelevel::cats-core:2.1.1`
import $ivy.`org.typelevel::cats-effect:2.1.1`
import $ivy.`co.fs2::fs2-core:2.4.0`
import scala.concurrent.ExecutionContext
import scala.concurrent.ExecutionContext.global
import scala.concurrent.duration._
import cats.effect.IO
import fs2.Stream
object Main {
implicit val contextShift = IO.contextShift(global)
implicit val timer = IO.timer(global)
val stream = Stream.fromIterator[IO](Iterator.from(1))
val program = stream.interruptAfter(3.seconds).compile.toList.map(_.size)
def run(): Unit = {
println(program.unsafeRunSync())
}
}
@main
def main(): Unit = {
Main.run()
}
Which you can run like: amm ./file.sc