Exactly-once intuition

Distributed algorithms are difficult. If you find yourself struggling to understand one of them, we assure you – you are not alone. We have spent last couple of years researching ways to ensure exactly-once message processing in systems that exchange messages in an asynchronous and durable way (a.k.a. message queues) and you know what? We still struggle and make silly mistakes. The reason is that even a very simple distributed algorithm generates vast numbers of possible execution paths.

To overcome these difficulties we use two different and complementary methods. One of them is TLA+, a framework for model-checking distributed algorithms. We will write more about TLA+ and our usage of it in one of the future posts. This time we want to focus on the other method – our intuition. Over past months we developed a set of heuristics that are very helpful in sketching the structure of an algorithm. They are useful in initial phases of research before a full TLA+ model is developed. We’d like to introduce to you some of them.

The transaction and the side effects

The outcome of processing a message consists of two parts. There is a transactional part and a side effects part. The transaction consists of application state change and of marking the incoming message as processed. The side effects include things like creating objects in non-transactional data stores (e.g. uploading a blob) and sending messages.

Until the transaction is committed, nothing happened

In order for an algorithm to behave correctly, it has to guarantee that until a transaction is committed, no effects of the processing are visible to the external observers. Violation of that rule results in the ghost message or ghost data phenomenons. Ghost messages are messages that carry information about a state change that has never occurred.

Prepare - Commit - Publish

So we don’t want to create any public information before the state change is committed. Bus we also don’t want to end up in a situation where we did commit the transaction but failed right after that and lost the information we were about to announce to the world. For this reason any correct algorithm has to make sure the side effects are made durable, but not visible (prepared), before the transaction is committed. Then, after the commit, the side affects are published.

Side effects stored in each processing attempt are isolated

This one is better explained using an example. Let’s say our processing consists of generating a PDF, storing it in a blob store, and sending out a message that contains an SHA hash of that document. The generation of the PDF document is not 100% deterministic due to a timestamp that is rendered in the top right corner. If we allow the PDF to be stored in a deterministic location, same for each processing attempts, we may end up in a situation where the message that is sent out contains a hash of a document that has been overridden by another processing attempt. This may happen if the incoming message has been duplicated. One copy has been processed correctly while the other copy succeeded to complete the prepare phase but failed to commit and was subsequently discarded as a duplicate.

A correct algorithm needs to ensure the isolation of the prepare phase e.g. by forcing usage of GUIDs for document names. In our PDF example each processing attempt would generate its own PDF document but only the attempt that succeeded to commit would publish its outgoing messages, announcing to the world the true location of the PDF. Unfortunately this approach generates garbage in form of durable prepared side effects resulting from attempts that failed to commit. This brings us to the last of the heuristics.

Register - Cleanup

Although we can’t avoid generating garbage, a well-behaved algorithm ensures that the garbage is eventually cleaned up. This can be achieved by adding two more phases, register and cleanup. The register phase is invoked before a given side effects is prepared. Register stores in the transactional store information about an intent to prepare that side effect. After the publish phase is done, the cleanup phase inspects all registered side effect intents and cleans up the ones that have not been committed and published.

Concurrency control ensures serialization of processing

Some simple algorithms, like the NServiceBus Outbox use very simple state machines which do not require concurrency checking. In this particular (pun intended!) case, the outbox record also contains the side effects information. It can exist in only two states: created and dispatched. The transition from created to dispatched does not generate any new information so it does not require concurrency control to prevent lost writes.

In more complex algorithms, however, the state transitions do generate new information and have to be protected. One example is an algorithm that ensures that each document processes each message exactly once. In that scenario a document has to contain a list of already processed messages. Updates of the document that include adding a message ID to that list have to be protected by concurrency control to prevent two processes from successfully processing two copies of one message at the same time.

Summary

Intuition is very helpful when designing exactly-once processing algorithms. Unfortunately, it takes time to develop that intuition and even after spending a significant amount of time in this space, we are still sometimes making silly mistakes. This is the reason why we don’t depend solely on our intuition when implementing these algorithms. We use TLA+ to model-check the algorithm.