Alpakka and MQTT

Hey folks,

I’m trying to figure out how to make the Alpakka MQTT Connector to reconnect in case of disconnection.

That’s my code so far:

object MQTT {

  val connectionSettings = MqttConnectionSettings(
    "tcp://localhost:1883",
    "test-scala-client",
    new MemoryPersistence()
  )

  private def source(channel: Channel): Source[MqttMessage, Future[Done]] = {
    val mqttSourceSettings = MqttSourceSettings(
      connectionSettings.withClientId("foobar-foo"),
      Map(channel.topic + "/" + channel.subtopic -> MqttQoS.AtLeastOnce)
    )
    MqttSource(mqttSourceSettings, bufferSize = 8)
  }


  def subscribe(channel: Channel)(implicit mt: ActorMaterializer): Unit = {
    source(channel).map(msg => msg.payload.utf8String).map(msg => println(msg)).toMat(Sink.ignore)(Keep.none).run()
  }
}