This repo is deprecated. See

RabbitMQ Stream Plugin

This plugin adds the concept of stream exchanges. The idea is that when you define a policy that makes an exchange a stream, the plugin will create one queue per node in the cluster (Think sharding, for some definition of sharding). Messages published to the exchange will be delivered to the queues either by consistent hashing or by a random algorithm (The plugin augments the consistent-hash-exchange and the random-exchange plugins).


Why do we need this? RabbitMQ queues are bound to the node where they were first declared. This means that even if you create a cluster of RabbitMQ brokers, at some point all message traffic will go to the node where the queue lives. What this plugin does is to give you a centralized place where to send your messages, plus load balancing across many nodes, by adding queues to the other nodes in the cluster.

The advantage of this setup is that the queues from where your consumers will get messages will be local to the node where they are connected. On the other hand, the producers don't need to care about what's behind the exchange.

All the plumbing to automatically maintain the stream queues is done by the plugin. If you add more nodes to the cluster, then the plugin will automatically create queues in those nodes.

If you remove nodes from the cluster then RabbitMQ will take care of taking them out of the list of bound queues. Message loss can happen in the case where a race occurs from a node going away and your message arriving to the stream exchange. If you can't afford to lose a message then you can use publisher confirms to prevent message loss.

How to get the queue name to consume from?

With all these queues automatically added to the broker it can get tricky to know from which queue your consumer should get messages. The plugin adds a couple of HTTP endpoints to help with that. See bellow.


Message order is maintained per stream queue, but not globally. If you need global ordering then stick with mirrored queues.

Building the plugin

Get the RabbitMQ Public Umbrella ready as explained in the RabbitMQ Plugin Development Guide.

Move to the umbrella folder an then run the following commands:

# the plugin allows routing via the random-exchange plugin
git clone
git clone
git clone
cd rabbitmq-stream

Testing the plugin

Enable the following plugins as explained in the RabbitMQ Plugin Development Guide:

[rabbitmq_management, amqp_client, rabbitmq_consistent_hash_exchange, random_exchange].

Then run make run-in-broker.

On a separate Terminal window run the following commands to start a second RabbitMQ node.

First setup the enabled plugins for the other node:

echo '[amqp_client,rabbitmq_consistent_hash_exchange,rabbitmq_stream, rabbitmq_management_agent, random_exchange].' > other_plugins

Then start the other node and cluster it:

make start-other-node OTHER_NODE=rabbit2 OTHER_PORT=5673
make cluster-other-node MAIN_NODE=rabbit-test@hostname OTHER_NODE=rabbit2

You could repeat the previous two steps to start a couple more nodes. Don't forget to change the OTHER_NODE and OTHER_PORT values.

So far we have a RabbitMQ cluster. Now it's time to add a policy to tell RabbitMQ to make some exchanges as streams.

../rabbitmq-server/scripts/rabbitmqctl -n rabbit-test@hostname set_policy my_stream "^shard\." '{"stream": "my_stream"}'

That policy will create a stream called my_stream for all exchanges whose name start with shard..

Then if you declare an exchange called for example shard.logs_stream the plugin will create one queue per node in the cluster. So if we have a cluster of nodes [rabbit1, rabbit2, rabbit3], we will get the following queues:

stream: shard.logs_stream - rabbit1@hostname
stream: shard.logs_stream - rabbit2@hostname
stream: shard.logs_stream - rabbit3@hostname

Each queue will be local to the node included in its name. Stream queues name will have the stream: prefix in their names.

Obtaining the queue to consume from

The plugin adds a couple of new HTTP API endpoints to the management plugin.

  • GET /api/stream-queues/: returns all the stream queues present in the broker.
  • GET /api/stream-queues/vhost: returns all the stream queues for vhost.
  • GET /api/stream-queues/vhost/exchange: returns all the stream queues for the exchange of vhost.

The last API call accepts the optional parameter node which can be any of:

  • local: returns the stream queue that is local to the node where the HTTP call is made.
  • random: returns a random stream queue from the queues belonging to the exchange stream.
  • If the parameter is not present, then it returns all the stream queues for the exchange of vhost.

Running the examples

To test the plugin you can run the examples/shard.php example.

First install composer as explained here.

Then cd into the examples folder and run: composer.phar install.

Then run php stream.php.

That script will:

  • Declare a policy for all exchanges named "^shard\."
  • Create an exchange called shard.logs_stream of type x-consistent-hash
  • Retrieve all stream queues on the broker
  • Retrieve all stream queues on the broker for exchange shard.logs_stream
  • Retrieve the local stream queue for exchange shard.logs_stream
  • Retrieve a random stream queue for exchange shard.logs_stream


  • Adds parameters so the user can define an amqpuri to use when declaring queues.
  • Adds parameters to set the binding key for queues.