Type mismatch given Source.empty yielding a type parameter of Nothing

I’m getting a type mismatch error that I’ve been staring at for a few hours. Sorry for the noise, but I’m wondering if someone can help. Here’s my error:

[error] /Users/huntc/Projects/hacking/alpakka/mqtt-streaming/src/main/scala/akka/stream/alpakka/mqtt/streaming/scaladsl/Mqtt.scala:40:30: type mismatch;
[error]  found   : akka.stream.scaladsl.Flow[akka.util.ByteString,scala.util.Either[akka.stream.alpakka.mqtt.streaming.MqttCodec.DecodeError,akka.stream.alpakka.mqtt.streaming.Event[_ <: A]],akka.NotUsed]
[error]  required: akka.stream.Graph[akka.stream.FlowShape[akka.util.ByteString,Either[akka.stream.alpakka.mqtt.streaming.MqttCodec.DecodeError,akka.stream.alpakka.mqtt.streaming.Event[A]]],?]
[error]       session.eventFlow.merge(

I think it is because I’m using a Source.empty in my code, and A becomes Nothing. Here’s my code in full:

  def clientSessionFlow[A](
      session: MqttClientSession
  )(
      implicit mat: Materializer
  ): BidiFlow[Command[A], ByteString, ByteString, Either[MqttCodec.DecodeError, Event[A]], NotUsed] = {
    import mat.executionContext
    val commandFlowCompleted = Promise[Done]
    BidiFlow.fromFlows(
      session.commandFlow.watchTermination() {
        case (e, done) =>
          done.foreach(commandFlowCompleted.success)
          e
      },
      session.eventFlow.merge(
        Source
          .fromFuture(commandFlowCompleted.future)
          .flatMapConcat(_ => Source.empty[Either[MqttCodec.DecodeError, Event[A]]]),
        eagerComplete = true
      )
    )
  }

Given that I’m using a Source.empty, I actually don’t care about the A becoming Nothing. I’m thinking that there might be a place for the use of uncheckedVariance annotation, but I’ve not yet discovered it.

Thanks for any help.

I don’t know enough Akka Streams offhand to answer with confidence, so this is purely a guess. But the error message suggests to me that you’re short a level. Consider – what it wants is:

akka.stream.Graph[akka.stream.FlowShape[akka.util.ByteString,Either[akka.stream.alpakka.mqtt.streaming.MqttCodec.DecodeError,akka.stream.alpakka.mqtt.streaming.Event[A]]],?]

and what it’s finding is:

akka.stream.scaladsl.Flow[akka.util.ByteString,scala.util.Either[akka.stream.alpakka.mqtt.streaming.MqttCodec.DecodeError,akka.stream.alpakka.mqtt.streaming.Event[_ <: A]],akka.NotUsed]

So it appears that it wants a Graph[Flow], but what it has is a Flow. Is there a Graph wrapper you’re supposed to be putting in here?

No, I think that bit is ok. I don’t fully understand either, but I think akka.stream.Graph[akka.stream.FlowShape and akka.stream.scaladsl.Flow translate to the same thing.

Ah, in this instance, I also need to carry the A type param through to session.commandFlow and session.eventFlow. I’m good now. Sorry for any noise.

How do you call clientSessionFlow(…)?

Here are some references to the code in reply to your question. Thanks for taking an interest. All is now well here.