Hey folks,
I’m trying to figure out how to make the Alpakka MQTT Connector to reconnect in case of disconnection.
That’s my code so far:
object MQTT {
val connectionSettings = MqttConnectionSettings(
"tcp://localhost:1883",
"test-scala-client",
new MemoryPersistence()
)
private def source(channel: Channel): Source[MqttMessage, Future[Done]] = {
val mqttSourceSettings = MqttSourceSettings(
connectionSettings.withClientId("foobar-foo"),
Map(channel.topic + "/" + channel.subtopic -> MqttQoS.AtLeastOnce)
)
MqttSource(mqttSourceSettings, bufferSize = 8)
}
def subscribe(channel: Channel)(implicit mt: ActorMaterializer): Unit = {
source(channel).map(msg => msg.payload.utf8String).map(msg => println(msg)).toMat(Sink.ignore)(Keep.none).run()
}
}