Fs2 with Topics and publishers/subscribers - uncertain of erratic behaviour

Hi,

(I also created this Scastie live code relative to which I refer the line numbers).

I am fairly new to FS2, but I understand that the subscriber and the publisher on a Topic must run concurrently (in parallel). I launch the REPL with this command:

scala-cli repl -S 3.8.0-RC2 --dep co.fs2::fs2-core:3.13.0-M7

The imports are:

import scala.concurrent.duration.*
import cats.effect.{ IO, Deferred }
import cats.syntax.all.*
import fs2.Stream
import fs2.concurrent.Topic
import cats.effect.unsafe.implicits.global

I create a wrapper Event class handling both infinite streams and streams with just one element:

implicit final class Event(private val value: Any):
  private def t = value.asInstanceOf[Topic[IO, ((Event, AnyRef), Deferred[IO, Unit])]]
  override def toString: String = value.toString

  object ∞ :

    object p: // publisher
      def infinite: Stream[IO, Event] =
        Stream.eval(Topic[IO, Topic[IO, ((Event, AnyRef), Deferred[IO, Unit])]]).flatMap { topic =>
          val pub = Stream.repeatEval(Topic[IO, ((Event, AnyRef), Deferred[IO, Unit])]).through(topic.publish)
          val sub = topic.subscribeUnbounded.evalMap { it => Deferred[IO, Unit].map(new Event(it) -> topic -> _) }.through(t.publish)
          t.subscribeUnbounded.filter(_._1._2 eq topic).evalMap { (it, d) => d.get.as(it._1) }.merge(pub.concurrently(sub))
        }

    object s: // subscriber
      def infinite: Stream[IO, Event] =
        t.subscribeUnbounded.evalMapFilter { (it, d) => d.complete(()).map(if _ then Some(it._1) else None) }

  object `1`:
    
    object p: // publisher
      def just_one(event: Event): Stream[IO, Nothing] =
        Stream.eval(Deferred[IO, Unit].map(event -> null -> _)).through(t.publish)

    object s: // subscriber
      def just_one: Stream[IO, Event] =
        t.subscribeUnbounded.evalMapFilter { (it, d) => d.complete(()).map(if _ then Some(it._1) else None) }.head

The design is that an Event can be either an ordinary value or an FS2 Topic. I want that there can be multiple publishers and multiple subscribers on the same topic: the Deferred[IO, Unit]component ensures that exactly one subscriber gets some Event (26, 36).

In the case of publishing topics (i.e., not ordinary values), the AnyRef component (_1._2) is used to filter (21) only events related to a fresh topic whose vehiculated items are… Topics. Let:

val gen = new scala.util.Random

In the following excerpt, I create two infinite streams in parallel using the same topic from the outer for-comprehension:

for // outer
  topic <- Stream.eval(Topic[IO, ((Event, AnyRef), Deferred[IO, Unit])]).map(new Event(_))
  _ <- List(
         for // 1st inner
           t <- topic.∞.p.infinite
           _ <- Stream.eval(IO.println(t))
           e <- t.`1`.s.just_one
           _ <- Stream.eval(IO.println(e))
         yield
           ()
       ,
         for // 2nd inner
           t <- topic.∞.s.infinite
           _ <- t.`1`.p.just_one(Event(gen.nextInt))
         yield
           ()
       ).parJoinUnbounded
yield
  ()

The first one actually “publishes” an infinite stream of Topic Events (45), while the second one is just a subscriber (53).

The second inner for-comprehension then uses each Topic Event to publish just one random integer, while the first inner for-comprehension (and this is where I think the trouble is) takes the head of actually just one-element stream, and then printlns that event out.

If I run the code for at most 1 second:

scala> res0.compile.drain.timeout(1000.milliseconds).unsafeRunSync()

the result is that sometimes I see numbers output, sometimes one, sometimes more, and sometimes none. I can set more or less milliseconds, but the result is to no avail.

However, if I comment out two lines (47-48) in the first inner for-comprehension and thus avoid subscription on the topic t:

for
  topic <- Stream.eval(Topic[IO, ((Event, AnyRef), Deferred[IO, Unit])]).map(new Event(_))
  _ <- List(
         for
           t <- topic.∞.p.infinite
           _ <- Stream.eval(IO.println(t))
           //e <- t.`1`.s.just_one
           //_ <- Stream.eval(IO.println(e))
         yield
           ()
         ,
         for
           t <- topic.∞.s.infinite
           _ <- t.`1`.p.just_one(Event(gen.nextInt))
         yield
           ()
       ).parJoinUnbounded
yield
  ()

then I can see that I really get an infinite stream from both inner for-comprehensions. So why is this line (47):

e <- t.`1`.s.just_one

troublesome? If I use .lastOr:

e <- t.`1`.s.just_one.lastOr(new Event(Int.MinValue))

it does not seem to help: the output stops after a while again.

Thanks

I was able to reproduce the same erratic behaviour with less fuss in this Scastie snippet, without infinite streams.

To be honest I don’t quite understand what you’re trying to do. But I think the core of the issue is if you do List(t.publish1(x), t.subscribeUnbounded.head).parJoinUnbounded it’s possible that x gets published to the topic before the other fiber manages to complete the subscription to the topic. This causes x to vanish into the void and the subscriber to wait forever for an element that will never come. Once a subscription to the topic exists new elements will be buffered for that subscription if they are coming in faster than the subscriber can handle, but the subscriber will not receive elements that came in before the subscription started. I think subscribeAwait exists to avoid this type of race condition.

I see. So, copying from fs2 repository Topic.scala, I add a start Deferred: it seems to work with one subscriber, but not when I add two:

for
  _ <- Stream.unit.repeat
  topic <- Stream.eval(Topic[IO, Int])
  start <- Stream.eval(Deferred[IO, Unit])
  _ <- List(
         for
           _ <- Stream.eval(start.get)
           _ <- Stream.eval(IO { gen.nextInt(10) }).through(topic.publish)
         yield
           ()
       ,
         for
           n <- Stream.resource(topic.subscribeAwaitUnbounded <* Resource.eval(start.complete(()))).flatten
           _ <- Stream.eval(IO.println(n))
         yield
           ()
       ,
         for
           n <- Stream.resource(topic.subscribeAwaitUnbounded <* Resource.eval(start.complete(()))).flatten
           _ <- Stream.eval(IO.println(n-10))
         yield
           ()
       ).parJoinUnbounded
yield
  ()

The idea I’m ready to accept is that should any subscription existed, the publishing can start. But is this what I got?

Another way that works is to use manually Stream.concurrently:

for
  _ <- Stream.unit.repeat
  topic <- Stream.eval(Topic[IO, Int])
  _ <- (
       for
         _ <- Stream.eval(IO { gen.nextInt(10) }).through(topic.publish)
       yield
         ()
       ).concurrently(
         (
         for
           n <- topic.subscribeUnbounded
           _ <- Stream.eval(IO.println(n))
         yield
           ()
         ).merge(
         for
           n <- topic.subscribeUnbounded
           _ <- Stream.eval(IO.println(n-10))
         yield
           ()
         )
       )
yield
  ()

but is not acceptable for me, because I cannot track these cases programmatically. There seems to be an essential difference between Stream.concurrently and parJoinUnbounded

Okay, I think I understand that “join" in parJoinUnbounded means “joining”/awaiting each argument stream, so if one subscriber does not receive, no joining occurs, hence no passing past parJoinUnbounded. So, I guess - as in documentation - adding a Quit event - like the number 0 below - can be used to interruptWhen the subscribers:

for
  _ <- Stream.unit.repeat
  topic <- Stream.eval(Topic[IO, Int])
  start <- Stream.eval(Deferred[IO, Unit])
  stop <- Stream.eval(SignallingRef[IO, Boolean](false))
  _ <- List(
         for
           _ <- Stream.eval(start.get)
           _ <- (Stream.eval(IO { 1+gen.nextInt(10) }) ++ Stream.emit(0)).through(topic.publish)
         yield
           ()
       ,
         for
           n <- Stream.resource(topic.subscribeAwaitUnbounded <* Resource.eval(start.complete(()))).flatten.evalTap { case 0 => stop.set(true) case _ => IO.unit }.interruptWhen(stop)
           _ <- Stream.eval(IO.println(n))
         yield
           ()
       ,
         for
           n <- Stream.resource(topic.subscribeAwaitUnbounded <* Resource.eval(start.complete(()))).flatten.evalTap { case 0 => stop.set(true) case _ => IO.unit }.interruptWhen(stop)
           _ <- Stream.eval(IO.println(n-10))
         yield
           ()
       ).parJoinUnbounded
yield
  ()