Hi,
(I also created this Scastie live code relative to which I refer the line numbers).
I am fairly new to FS2, but I understand that the subscriber and the publisher on a Topic must run concurrently (in parallel). I launch the REPL with this command:
scala-cli repl -S 3.8.0-RC2 --dep co.fs2::fs2-core:3.13.0-M7
The imports are:
import scala.concurrent.duration.*
import cats.effect.{ IO, Deferred }
import cats.syntax.all.*
import fs2.Stream
import fs2.concurrent.Topic
import cats.effect.unsafe.implicits.global
I create a wrapper Event class handling both infinite streams and streams with just one element:
implicit final class Event(private val value: Any):
private def t = value.asInstanceOf[Topic[IO, ((Event, AnyRef), Deferred[IO, Unit])]]
override def toString: String = value.toString
object ∞ :
object p: // publisher
def infinite: Stream[IO, Event] =
Stream.eval(Topic[IO, Topic[IO, ((Event, AnyRef), Deferred[IO, Unit])]]).flatMap { topic =>
val pub = Stream.repeatEval(Topic[IO, ((Event, AnyRef), Deferred[IO, Unit])]).through(topic.publish)
val sub = topic.subscribeUnbounded.evalMap { it => Deferred[IO, Unit].map(new Event(it) -> topic -> _) }.through(t.publish)
t.subscribeUnbounded.filter(_._1._2 eq topic).evalMap { (it, d) => d.get.as(it._1) }.merge(pub.concurrently(sub))
}
object s: // subscriber
def infinite: Stream[IO, Event] =
t.subscribeUnbounded.evalMapFilter { (it, d) => d.complete(()).map(if _ then Some(it._1) else None) }
object `1`:
object p: // publisher
def just_one(event: Event): Stream[IO, Nothing] =
Stream.eval(Deferred[IO, Unit].map(event -> null -> _)).through(t.publish)
object s: // subscriber
def just_one: Stream[IO, Event] =
t.subscribeUnbounded.evalMapFilter { (it, d) => d.complete(()).map(if _ then Some(it._1) else None) }.head
The design is that an Event can be either an ordinary value or an FS2 Topic. I want that there can be multiple publishers and multiple subscribers on the same topic: the Deferred[IO, Unit]component ensures that exactly one subscriber gets some Event (26, 36).
In the case of publishing topics (i.e., not ordinary values), the AnyRef component (_1._2) is used to filter (21) only events related to a fresh topic whose vehiculated items are… Topics. Let:
val gen = new scala.util.Random
In the following excerpt, I create two infinite streams in parallel using the same topic from the outer for-comprehension:
for // outer
topic <- Stream.eval(Topic[IO, ((Event, AnyRef), Deferred[IO, Unit])]).map(new Event(_))
_ <- List(
for // 1st inner
t <- topic.∞.p.infinite
_ <- Stream.eval(IO.println(t))
e <- t.`1`.s.just_one
_ <- Stream.eval(IO.println(e))
yield
()
,
for // 2nd inner
t <- topic.∞.s.infinite
_ <- t.`1`.p.just_one(Event(gen.nextInt))
yield
()
).parJoinUnbounded
yield
()
The first one actually “publishes” an infinite stream of Topic Events (45), while the second one is just a subscriber (53).
The second inner for-comprehension then uses each Topic Event to publish just one random integer, while the first inner for-comprehension (and this is where I think the trouble is) takes the head of actually just one-element stream, and then printlns that event out.
If I run the code for at most 1 second:
scala> res0.compile.drain.timeout(1000.milliseconds).unsafeRunSync()
the result is that sometimes I see numbers output, sometimes one, sometimes more, and sometimes none. I can set more or less milliseconds, but the result is to no avail.
However, if I comment out two lines (47-48) in the first inner for-comprehension and thus avoid subscription on the topic t:
for
topic <- Stream.eval(Topic[IO, ((Event, AnyRef), Deferred[IO, Unit])]).map(new Event(_))
_ <- List(
for
t <- topic.∞.p.infinite
_ <- Stream.eval(IO.println(t))
//e <- t.`1`.s.just_one
//_ <- Stream.eval(IO.println(e))
yield
()
,
for
t <- topic.∞.s.infinite
_ <- t.`1`.p.just_one(Event(gen.nextInt))
yield
()
).parJoinUnbounded
yield
()
then I can see that I really get an infinite stream from both inner for-comprehensions. So why is this line (47):
e <- t.`1`.s.just_one
troublesome? If I use .lastOr:
e <- t.`1`.s.just_one.lastOr(new Event(Int.MinValue))
it does not seem to help: the output stops after a while again.
Thanks
I was able to reproduce the same erratic behaviour with less fuss in this Scastie snippet, without infinite streams.