What is the difference between these 2 programs?

In ammonite repl:

Snippet 1

import $ivy.`org.typelevel::cats-core:2.1.1`
import $ivy.`org.typelevel::cats-effect:2.1.1`
import $ivy.`co.fs2::fs2-core:2.4.0`
import scala.concurrent.ExecutionContext
import scala.concurrent.ExecutionContext.global
import scala.concurrent.duration._
import cats.effect.IO
import fs2.Stream

implicit val contextShift = IO.contextShift(global)
implicit val timer = IO.timer(global)

val stream = Stream.fromIterator[IO](Iterator.from(1)) 
stream.interruptAfter(3.seconds).compile.toList.unsafeRunSync.size

Snippet 2

import $ivy.`org.typelevel::cats-core:2.1.1`
import $ivy.`org.typelevel::cats-effect:2.1.1`
import $ivy.`co.fs2::fs2-core:2.4.0`
import scala.concurrent.ExecutionContext
import scala.concurrent.ExecutionContext.global
import scala.concurrent.duration._
import cats.effect.IO
import fs2.Stream

implicit val contextShift = IO.contextShift(global)
implicit val timer = IO.timer(global)

val stream = Stream.fromIterator[IO](Iterator.from(1)) 
stream.interruptAfter(3.seconds).compile.toList.map(_.size).unsafeRunSync

Snippet 1 executes and returns res2: Int = 257052. But Snippet 2 never finishes. What is the difference between the two?

1 Like

It’d probably help others to reproduce the issue if you provided a complete compilable/runnable example along with the Scala and library versions and config settings used.

My first guess would be that you’re running into some REPL specific issue - see the fs2 FAQ.

2 Likes

Edited the post to add the full code

My guess would be that your guess is correct. When I run the second snippet in the 2.13.3 REPL which is class based and should avoid those deadlocks then it finishes without issues. I don’t know whether ammonite ever actually had those issues though (edit: it probably does).

You can do this to fix the error in Ammonite.

// file.sc

import $ivy.`org.typelevel::cats-core:2.1.1`
import $ivy.`org.typelevel::cats-effect:2.1.1`
import $ivy.`co.fs2::fs2-core:2.4.0`
import scala.concurrent.ExecutionContext
import scala.concurrent.ExecutionContext.global
import scala.concurrent.duration._
import cats.effect.IO
import fs2.Stream

object Main {
  implicit val contextShift = IO.contextShift(global)
  implicit val timer = IO.timer(global)
  
  val stream = Stream.fromIterator[IO](Iterator.from(1))
  val program = stream.interruptAfter(3.seconds).compile.toList.map(_.size)
  
  def run(): Unit = {
    println(program.unsafeRunSync())
  }
}

@main
def main(): Unit = {
  Main.run()
}

Which you can run like: amm ./file.sc