Consistent Hash Exchange

Messages sent to a consistent hash exchange are distributed among bound queues, depending on the hash of the routing key or a header value. The consistent hash exchange also ensures that messages with the same computed hash end up in the same queue. This allows for all messages of the same character, e.g. the same booking or client id, to end up in the same queue. This can ensure a causal message order of events that relies on consistency for that specific ID assuming no bindings have changed.

What is Consistent Hash Exchange?

The consistent hash exchange is an exchange type available in LavinMQ. It distributes messages among queues bound to it, while still ensuring that messages of a certain character are sent to the same queue, assuming no bindings have changed.

Message distribution depends on the computed hash of the routing key or header value. Queues are bound with an integer-based weight rather than a routing key or header values. This weight is used as part of the algorithm that determines the delivery of the message.

With consistent hash exchange, the broker itself is used as the unit of parallelism instead of the consumers.

When should I use Consistent Hash Exchange?

The consistent hash exchange can be used to load-balance messages between queues when causal message ordering is still needed. Manually distributing messages can be hard, since publishers aren’t connected with the information about the number of queues and bindings.

Description

LavinMQ distributes messages in sequential order, but to avoid long queues, data is often spread among multiple queues or handled by several consumers. Although, data that are evenly distributed over multiple queues might lose the order.

With Consistent Hash Exchange, the hash of the routing key or the header values on the messages determines the destination queue. All queues bound to the exchange are potential destinations.

Consistent hashing

The process where data is distributed to a certain location using a hashing algorithm is known as consistent hashing. Only the hash can be used to determine exactly where that data should be routed. This location hashing is usually termed a “ring”, or “hash ring”.

A hash is something that maps one piece of data, typically describing some kind of object, to another piece of data, typically an integer. Known as hash code, or simply hash. As an example, some hash functions are designed to hash strings, with an output range of 0..100. Here the string “ClientID:40” is mapped to the number “59”, while “ClientID:45” is mapped to 33.

  • “ClientID:40” is mapped as 33
  • “ClientID:45” is mapped as 59
  • Other possible input is mapped to some number within the range 1..100

Any other possible input is mapped to some number within that range. There are many more possibilities when it comes to inputs than outputs, so for this reason, many different strings can be mapped to any given number. Input values, therefore, are evenly spread as much as possible over the range of outputs.

Distribution of messages

The hash can be distributed over a specified span among an interval, into buckets, which makes it easy to add or remove more bounded queues into the exchange. The figure below shows how messages with given ClientIDs as routing keys are distributed among buckets, depending on the number of queues and their values.

Consistent Hash Exchange The image illustrates messages routed by the Consistent Hash Exchange by their hashed routing key.

Setting up a consistent hash exchange is as easy as setting up an exchange:

  1. Declare a number of queues
    channel.queue_declare(queue=q1, durable=True)
    channel.queue_declare(queue=q2, durable=True)
  2. Define the exchange
    channel.exchange_declare(exchange="ce", exchange_type="x-consistent-hash", durable=True)
  3. Bind the queues to the exchange with a given routing key. The routing key on the binding must be set as a numeric value.
    ch.queue_bind(exchange="ce", queue=q1, routing_key="1")
    ch.queue_bind(exchange="ce", queue=q2, routing_key="2")

Dead messages and resends

Messages that are unable to be consumed for some reason will only be resent to their given queues and be requeued according to configurations like TTL and x-delivery-count. If the system suffers from consumer failures, the messages will stay in the queue until the given consumer/consumers are ready to handle messages.

Removing or adding queues with consistent hash exchange for LavinMQ

It is possible to remove or add queues connected to the consistent hash exchange in runtime. Consistent hash exchange is built for distributing messages as evenly as possible over the queues while keeping the messages with the same routing key/header value in the same queue.

Example: Imagine 100 entries distributed over 4 queues. In this example, all calculated hash values between 0-24 end up in the first queue. All calculated hash values between 25 and 49 end up in the second queue and so on.

The example below has a binding key of 1, meaning that one queue has one bucket:

0 ≤ Hash value < 25 goes to queue 1
25 ≤ Hash value < 50 goes to queue 2
50 ≤ Hash value < 75 goes to queue 3
75 ≤ Hash value < 100 goes to queue 4

When a routing key is set on the binding, the number specifies how many buckets there will be. A routing key on the binding of value 2 adds 2 buckets per queue, meaning that it is repeated. Two different hashes here are linked to the same queue. A routing key on the binding of value 3 adds three buckets per queue, with three linked hashes.

Below is an example with a routing key on the binding of value 2.

0 ≤ Hash value < 12.5 goes to queue 1
12.5 ≤ Hash value < 25 goes to queue 2
25 ≤ Hash value < 37.5 goes to queue 3
37.5 ≤ Hash value < 50 goes to queue 4
50 ≤ Hash value < 62.5 goes to queue 1
62.5 ≤ Hash value < 75 goes to queue 2
75 ≤ Hash value < 87.5 goes to queue 3
87.5 ≤ Hash value < 100 goes to queue 4

Consistent Hash Exchange - Adding queues

Adding a queue will affect the message distribution over some linked queues, since the span changes. The same scenario applies to removing queues.

Once a new queue is created, it needs to be added into the full span which will move them around a bit. This might cause messages for a given routing key to end up in another queue than before:

The example below is a new queue added between 75 and 85.

0 ≤ Hash value < 25 goes to queue 1
25 ≤ Hash value < 50 goes to queue 2
50 ≤ Hash value < 75 goes to queue 3
75 ≤ Hash value < 85 goes to queue 4
85 ≤ Hash value < 100 goes to queue 5

How much the queue will be affected can be controlled by the setting of the required number on the routing key on the binding. A lower number will aim for keeping the message ordering, since they end up in the same queue, but can cause more uneven distribution. A higher number makes the distribution more even between queues but a message with a given ID is more likely to be routed to a different queue than all other messages with the same ID when adding or removing queues to the exchange since the span will change.

Adding one more queues with routing keys on the binding keys equal to 2 will look similar to the example below - the queue is added in two places:

0 ≤ Hash value < 12.5 goes to queue 1
12.5 ≤ Hash value < 25 goes to queue 2
25 ≤ Hash value < 37.5 goes to queue 3
37.5 ≤ Hash value < 43.75 goes to queue 4
43.75 ≤ Hash value < 50 goes to queue 5
50 ≤ Hash value < 62.5 goes to queue 1
62.5 ≤ Hash value < 75 goes to queue 2
75 ≤ Hash value < 87.5 goes to queue 3
87.5 ≤ Hash value < 93.75 goes to queue 4
93.75 ≤ Hash value < 100 goes to queue 5

One Queue per customer Use case

When you add more consumers for a queue to go faster you never know which consumer gets what messages, it might be that messages related to one customer end up in different consumers every time. Instead, by routing the message for a specific customer to one queue, you can be sure that the consumer on that queue will always get ALL the messages for that customer.

Important takeaways

  • The consistent hash exchange can be used when you want to load-balance messages between queues, where you still need causal message ordering.
  • Note that the consistent hash exchange does not necessarily distribute messages evenly among the given queues.
  • Messages routed to a queue via the consistent hash exchange are not sent to other queues or consumers in case of a consumer failure but stay in the given queue until the consumer is ready to handle them, or until dropped or dead-lettered.
  • The consistent hash exchange is not used for scaling, scaling is done on the queue by adding more consumers, adding more queues to an exchange doesn’t make it faster.

Ready to take the next steps? Here are some things you should keep in mind:

Managed LavinMQ instance on CloudAMQP

LavinMQ has been built with performance and ease of use in mind - we've benchmarked a throughput of about 1,000,000 messages/sec. You can try LavinMQ without any installation hassle by creating a free instance on CloudAMQP. Signing up is a breeze.

Help and feedback

We welcome your feedback and are eager to address any questions you may have about this piece or using LavinMQ. Join our Slack channel to connect with us directly. You can also find LavinMQ on GitHub.