Hi All,
I write here because are days I’m trying to understand how message delivery works with Akka.
During simple and plain scenario there are not really much to see, but when simulating an error the things get complicated.
Basically I’m simulating in Akka Testkit test the conversation between three actors (A, B, C)
A ---> MessageA2B ---> B ---> MessageB2C ---> C
When MessageB2C
is successfully delivered to C then the acknowledgement is sent back to the origin.
C --> MessageB2C_Ack --> B --> MessageA2B_Ack --> A
The only peculiarity of this conversation is the message MessageB2C
. B insist to sent
MessageB2C
. Even many times, until C does not answer with its acknowledgement.
As said this simple conversation is implemented with Scala Testkit framework, but the test fail in a particular situation.
When ActorB retries to send MessageB2C more than once time, then is unable to receive the answers from ActorC. And even if the Patterns.ask
gets an AskTimeoutException
, the exception is contained, so the B should be consistent. But the reply from C to B goes deadLetters.
I’ve also noticed that the test completes successfully, C deliver the message to B, if I remove the line actorA.expectNoMessage
…
Here is the code:
test("expectNoMessage-case: actorB retries MessageB2C") {
val actorA = TestProbe()
val actorC = TestProbe()
val actorB = system.actorOf(ActorB.props(Props(classOf[TestRefWrappingActor], actorC)), "step1-case2-primary")
actorA.send(actorB, MessageA2B())
actorA.expectNoMessage(100.milliseconds)
actorC.expectMsg(MessageB2C())
// Retries form above
actorC.expectMsg(200.milliseconds, MessageB2C())
// Never reach this point with 100 ms frequency
actorC.expectMsg(200.milliseconds, MessageB2C())
actorA.expectNoMessage(100.milliseconds)
actorC.reply(MessageB2C_Ack())
// Never reach this point with MessageB2C 50 ms frequency
actorA.expectMsg(MessageA2B_Ack())
}
This is the ActorB
code:
class ActorB(actorCProps: Props) extends Actor {
import ActorB._
import context.dispatcher
val log = Logging(context.system, this)
val actorC = context.actorOf(actorCProps)
def retry(actorRef: ActorRef, message: Any, maxAttempts: Int, attempt: Int): Future[Any] = {
log.info("ActorB - sent message MessageB2C to ActorC " + actorC)
val future = Patterns.ask(actorRef, message, 50.millisecond) recover {
case e: AskTimeoutException =>
if (attempt <= maxAttempts) retry(actorRef, message, maxAttempts, attempt + 1)
else None
}
future
}
def receive = {
case r:MessageA2B => {
val client = context.sender()
implicit val timeout = Timeout(100.milliseconds)
implicit val scheduler=context.system.scheduler
val p = MessageB2C()
retry(actorC, p, 10) onSuccess({
case p: MessageB2C_Ack => {
client ! MessageA2B_Ack()
}
})
}
}
}
Any help is really appreciated.
BTW, if you’re interested I’ve also committed this code as sample project on github