Reconnecting to RabbitMQ with AMQP in Elixir

2021-10-26 Software-Craftmanship AMQP Elixir Functional-Programming Micro-Services Tips & Tricks Web-Services

At Community we are building a platform that will allow you to have meaningful conversations with large audiences … at scale!!!

The architectural backbone is a shared-nothing micro-services architecture, that uses an event-bus for state propagation.

To avoid any kind of 2-phase-commit shenanigans we are publishing events at-least-once (not exactly-once). That means our services need to be able to and are able to deal with dublicated events gracefully (by ignoring them).

It also means, that we have to retry publishing an event on the event-bus, if the publish fails. One reason the publish can fail (especially when you run long event replay jobs) is that the AMQP infrastructure (connections, channel, …) dies.

The tech-stack is: Elixir, AMQP and RabbitMQ.

The interesting thing is that, if you use AMQP out of the box and create a connection and a channel and start publishing events (e.g. from a gen-server), you will not get notified when the infrastructure has a problem (e.g. dies).

Instead you will get a Cannot find process error/exception next time you try to use the infrastructure (e.g. publish an event).

So … what is going on here?

After investigating and understanding it the answer is simple: the Elixir/Erlang AMQP implementation creates an Erlang process for the connection and a process for the channel on that connection and when there is a problem these two processes die AND that is the right way to implement something like this in Elixir/Erlang.

So far so good …

The problem is that out-of-the-box these processes are not linked to the processes that created them, which makes detecting that something went wrong (and retrying what ever failed) kind-a interesting.

At the end we came up with three ways to deal with this …

  1. Use try/catch around every statement that uses a connection or a channel (catching the exit)
  2. Use Process.trap_exit() AND link the connection process to the process that created the connection and handle the exit message
  3. Take a step back and use a supervisor to restart the connection AND retry whatever needs retrying

The good news is there is a repo that has all of the code in it that shows how this works!!!

Let’s talk about these three options …

Using try/catch

Obviously easy to implement and easy to understand. Retrying whatever you did is also easy, because you know what operation you tried/failed.

But … you need to wrap every AMQP call in a try/catch. Not so nice.

With this you can (just) listen for the exit message to arrive in your inbox and handle it with handle_info. On one hand this is much nicer, but … retrying whatever failed becomes harder (you probably need to put some info into the state, so that you know what to retry, when you get/handle the exit).

We also realized that just pattern matching on :exit/:infrastructure_died is not good enough, because there is small race condition, where the next publish happens before you can trap the exit from the connection/channel genserver and in that case you get the treaded :exit/:noproc again (and need to handle that too).

Using a supervisor

That’s obviously the nicest solution. For that you create a supervisor and a worker and give the work to-do (e.g. publish this batch of events) to the worker. The supervisor supervises the worker and if the worker has any kind of problem getting the work done the supervisor will restart the worker and retry the work (the batch).

This will also cater for the case that we descibed above, where the publisher can exit with at least two abnormal reasons (:infrastructure_died and :noproc).

Initial implementations of this had very elaborate trapping and linking and message passing. The main lesson learned was: Do not try to do the work for the OTP. Let the OTP do the work for you. In our case that means/meant: Let the worker fail! Let the supervisor restart it. And do not use any of the “internal” messages (e.g. :exit/:normal) to tell the batcher to send the next batch. Instead use a custom message (e.g. :next).

Again: If you want to find out about the details how this works, please take a look at the repo. Also: Feel free to send me a PR, if you have a better way to do this.

Happy hacking …