Overview

HTTPS SSH

TykRMQ - Making it easier to work with RabbitMQ message queues

Tyk is a helper pattern to make it easier to work with the RabbitMQ message broker. If you've ever had to work with Pika and write in the pass-through style, you quickly end up with a lot of code that doesn;t do much except set up your connection, getting to the really productive bits takes a little more time, and that's where TykRMQ is there to help.

History

TykRMQ is the core library for an enterprise service bus we were writing at Jively, this ESB eventually got scrapped and the core library used in the upcoming release of the new Loadzen.com back end. We're also using TykRMQ to power our multi-room sound system, so it's had a bit of real-world use, though it is still very early days and the API is still in flux.

Getting started with TykRMQ

Currently the easiest thing to do is to copy the TykRMQ directory into your application and access it directly from there, you could also install it into your Python path - we're working on getting a setup script up and running.

Structure

TykRMQ splits out your queue interactions into 'Components' each component is a self-contained application that has:

  • Queue configuration
  • Input queues
  • Output queues
  • A master component that ties them together and enables them to use one another

All the callbacks, set up and teardown are taken care for you.

Queue configuration

The queue configuration is basically a listing of all the queues your component will use, you'll define what the names, routing keys and types of the messages will be here.

Input and Output Queues

TykRMQ uses the concept of QueueActions to define the set of behaviours on inbound and outbound queues - these classes enable you to hook into key events in a queue's lifecycle to make stuff happen, you merely implement the class and override the methods that are most important to you to get up and running - keeping things short and sweet and hiding all the overhead of maintaining queue connectivity.

Components

The component class basically initialises all the queues you have defined, and creates a backbone that will let your queues access each others functions. Most importantly, it gives you hooks for when queues are ready (connected), and the ability to access the publishing method of one queue from a completely seperate one, keeping your code clean even though the interactions are complex.

It's possible to create dynamically allocated queues, dynamically named ones and all kinds of interactions by overriding this class in specific ways - we'll cover these in other tutorials, just believe us that it's possible!

Creating a component

To create a new component, create a new module in your application (lets call it "HelloWorld") and create 4 files:

  • queue_config.py
  • input_queues.py
  • output-queues.py
  • hello_world_component.py

Step 1 - defining your queue configuration (queue_config.py):

from tykRMQ.MQCore.Components.ComponentConfig import MQQueueConfiguration
from tykRMQ.MQCore.Components.Constants import EXCHANGE_TYPES

#----- OUTPUT -----

# This is an obscure example, we are publishing to the same exchange that we are listening on - in a real scenario
# your input and output queues would come from other components.

publishing_queue = MQQueueConfiguration(exchange='hello-world-exchange',
                                        exchange_type=EXCHANGE_TYPES.DIRECT,
                                        routing_key='say_hello_to_everyone.*',
                                        durable=False,
                                        queue='hello_publisher')

#----- INPUT -----
listening_queue = MQQueueConfiguration(exchange='hello-world-exchange',
                                                exchange_type=EXCHANGE_TYPES.DIRECT,
                                                routing_key='say_hello_to_everyone.*',
                                                durable=False,
                                                queue='hello_listener')

Here's we're defining an input and and an output queue configuration - the settings are pretty self explanatory if you've ever had to work with Pika.

Step 2 - set up your output queue dummies

It's possible to hook into your output queues at various points, but most likely you will actually just publish to a dumb pipe, in your output_queues.py file add this:

from tykRMQ.MQCore.Components.QueueAction import QueueOutputAction


class SpeakerStatusOutputQueue(QueueOutputAction):
    """
    Proxy class to write to a queue.
    """
    pass

Step 3 - Set up your input queue

The input queue has one job to do, when it gets a message, it should print it, so let's see how easy it is to do that with tykRMQ, in your input_queues.py file, do the following:

from tykRMQ.MQCore.Components.QueueAction import QueueInputAction
import logging


class HelloWorldInputQueue(QueueInputAction):
    """
    Input queue that does something when a message is received
    """
    def on_message_callback(self, unused_channel, basic_deliver, properties, body):
        if body:
            print "Hello %s!" % (body.get("name"))
        else:
            logging.error('Something wrong with message body, ignoring')
            logging.error(str(body))

Step 4 - Glue it all together!

Now it's time to make all these things speak to one another - this is done by creating a component and registering the input and output queues with it, in your hello_world_component.py file, add this:

from tykRMQ.MQCore.Components.Component import BaseComponent
from tykRMQ.MQCore.Components.ComponentConfig import MQConfiguration
from input_queues import HelloWorldInputQueue
from output_queues import DummyOutputAction
from queue_config import listening_queue, publishing_queue
import logging

logging.basicConfig(level=logging.INFO)

host_config = MQConfiguration()  # Use the default host configuration


def demo_sender(self):
    # We will send a message as soon as the output queues are ready
    self.output_queues['Publisher'].publish_message({"name": "TykRMQ"})

# Define the component - simply tell it what inputs and outputs are interacting
thisComponent = BaseComponent(
    host_config,
    input_queues={
        'Listener': HelloWorldInputQueue(listening_queue),
    },
    output_queues={
        'Publisher': DummyOutputAction(publishing_queue)
    })

# Hook the demo sender function up to the component
# Ideally you'd subclass and override - this is dirty
thisComponent.output_queues_ready = demo_sender

if __name__=="__main__":
    # And go!
    thisComponent.run()

There's a lot going on here, but in essence, we are creating our component by passing in the various pipes we have created, then named them so we can reference them from within the queue actions we have defined. This is possible because a parent component will inject itself into the queue actions as soon as they are registered, meaning with self.parent_component.output_queues['name'] you can access any of the queues (so an input queue could pipe modified input form another component right through to multiple outputs.

More to come

This is a high-level overview of what tyk does - we're working on more detailed documentation, please bear with us.