1. Michael Granger
  2. symphony

Wiki

Clone wiki

symphony / Usage

Initial Setup

Symphony uses the RabbitMQ server for speed, redundancy, and high availability. Installing RabbitMQ is outside the scope of this document, please see their installation instructions.

Once installed and running, lets move on!

Nomenclature

Here's a quick review on RabbitMQ vocabulary. It's helpful to have some fundamental understanding of AMQP concepts when using Symphony, as a lot of advanced messaging options are available only via server-side configuration.

virtual host:: A logical container for exchanges and queues. You can have as many of these as you like for a RabbitMQ cluster, they are used to partition objects for security purposes.

exchange:: An exchange receives incoming messages, and routes them to queues via rules defined in bindings.

queue:: Consumers receive messages from a queue, if their binding rules match.

binding:: A rule that links/routes messages from an exchange to a queue.

routing key:: A period separated hierarchal string that specifies a category for a message. This is matched against existing bindings for incoming messages.

dead-letter-exchange:: An exchange that receives messages that fail for any reason. Configuring a dead letter exchange is critical for robust error reporting.

Starting Out

Symphony uses Configurability to determine connection criteria to the RabbitMQ server, which is stored in a YAML file.

An example configuration file for Symphony looks as such, if you were connecting to a virtual host called '/test', wanted to log all messages to STDERR at a level of 'debug', and had an exchange called 'symphony' (the default):

amqp:
    broker_uri: amqp://USERNAME:PASSWORD@example.com:5672/%2Ftest
    exchange: symphony

logging:
    symphony: debug STDERR (color)

Symphony won't create the exchange for you, it is expected to already exist under the specified virtual host. (There are a lot of server-side exchange configuration options, and we don't make any assumptions on your behalf.) The only requirement is that it is a 'topic' exchange. If it is of a different kind ('fanout', 'direct', etc, it won't work!) It's also recommended to mark it 'durable', so you won't need to re-create it after any RabbitMQ service restarts.

Building a Task

Tasks inherit from the Symphony::Task class. You can start an individual task directly via the 'symphony-task' binary, or as a pool of managed processes via the 'symphony' daemon, or by simply calling #run on the class itself.

The Simplest Task

#!/usr/bin/env ruby

require 'symphony'

class Test < Symphony::Task

    # Process all events
    subscribe_to '#'

    def work( payload, metadata )
        puts "I received an event: %p with a payload of %p" % [
            metadata[ :delivery_info ][ :routing_key ],
            payload
        ]
        return true
    end
end

Symphony.load_config( 'etc/config.yml' )
Test.run

The only requirement is the 'subscribe_to' clause, which indicates what topics the queue will receive from the exchange. Subscribing to '#' means this task will receive anything and everything that is published. In practice, you'll likely want to be more discerning. You can specify as many comma separated topics as you like for a given task, as well as use AMQP wildcard characters.

# Subscribe to hypothetical creation events for all types,
# and deletion events for users.
#
subscribe_to '#.create, 'users.delete'

In this fashion, you can decide to have many separate (and optionally distributed) tasks that only respond to a small subset of possible events, or more monolithic tasks that can respond to a variety of event topics.

Because AMQP manages the messages that the bound consumers receive, starting up multiple copies of this Test task (across any number of machines) will automatically cause published events to be received in a round-robin fashion without any additional effort. All running task workers will receive roughly equal numbers of matching events.

An Aside on Queues and Binding Behavior

A task will automatically create an auto-delete queue for itself with a sane (but configurable) name, along with subscription bindings, ONLY if the queue didn't previously exist. The auto-delete flag ensures that when the last worker disconnects, the queue is automatically removed from the AMQP broker, setting everything back to the state it was before a Symphony worker ever connected.

Along the same logic as the initial exchange creation, if a matching queue exists already for a task name (an automatically generated or manually specified name with the 'queue_name' option), then a binding is NOT created. Symphony is 'hands off' for anything server side that already exists, so you can enforce specific behaviors, and know that Symphony won't fiddle with them.

This can be confusing if you've created a queue manually for a task, and didn't also manually create a binding for the topic key. There are some advanced routing cases where you'll want to set up queues yourself, rather than let Symphony automatically create and remove them, we'll talk more on that later.

Return Values

By default, a Task is configured to tell AMQP when it has completed its job. It does this by returning +true+ from the #work method. When AMQP sees the job was completed, the message is considered "delivered", and its lifetime is at an end. If instead, the task returns an explicit +false+, the message is retried, potentially on a different task worker.

If something within the task raises an exception, the default behavior is to permanently abort the task. If you need different behavior, you'll need to catch the exception. As the task author, you're in the best position to know if a failure state requires an immediate retry, or a permanent rejection. If you'll be allowing message retries, you might also want to consider publishing messages with a maximum TTL, to avoid any situations that could cause jobs to loop infinitely. (Default Max-TTL is an example of something that is settable when creating queues on the RabbitMQ server manually.)

Task Options

Message acknowledgement

If you don't require retry or reject behaviors on task error states, you can set 'acknowledge' to +false+. With this setting, the AMQP server considers a message as "delivered" the moment a consumer receives it. This can be useful for additional speed when processing non-important events.

acknowledge false   # (default: true)

Worker Model

By default, a task signals that is ready for more messages as soon as it finishes processing one. This isn't always the optimal environment for long running processes. The work you want to accomplish might require heavy computing resources, and you might want to do things like relinquish memory in between messages, or disconnect from network resources (databases, etc.)

Settings your work_model to 'oneshot' causes the task to exit immediately after performing its work. Clearly, this only makes sense if you're running managed processes under the Symphony daemon, so a fresh process can take its place. You can do all of your expensive work within the #work method, leaving the main "waiting for messages" loop as low-resource as possible.

work_model :oneshot  # (default:  :longlived)

Message Prefetch

Prefetch is another tuning for speed on extremely busy exchanges. If a task worker sees there are additional pending messages, it can concurrently retrieve and store them locally in memory while performing current work. This can reduce consumer/server network traffic, but the trade off is that all prefetched message payloads are stored in memory until they are processed. It's a good idea to keep this low if your payloads are large.

prefetch 100   # (default: 10)

Message prefetch is ignored (automatically set to 1) if the task's work_model is set to oneshot.

Timeout

The timeout option specifies the maximum length of time (in seconds) a task can be within its #work method, and what action to take if it exceeds that timeframe. Please note, this is different from the Max-TTL setting for AMQP, which dictates the maximum timeframe a message can exist in a queue.

AMQP can't know if a task is in ruby loop or otherwise unfortunate state, so this can ensure workers won't ever get permanently stuck. There is no default timeout. If one is set, the default action is to act as an exception, which is a permanent rejection. You can choose to retry instead:

# perform work for maximum 20 seconds, then stop and try
# again on another worker
#
timeout 20.0, :action => :retry

Queue Name

By default, Symphony will try and create a queue based on the Class name of the task. You can override this per-task.

class Test < Symphony::Task

    # Process all events
    subscribe_to '#'

    # I don't want to connect to a 'test' queue, lets make it interesting:
    queue_name 'shazzzaaaam'

    def work( payload, metadata )
    ...

Plugins

Plugins change or enhance the behavior of a Symphony::Task.

Metrics

The metrics plugin provides periodic information about the performance of a task worker to the log. It shows processed message averages and resource consumption summaries at the "info" log level, and also changes the process name to display a total count and jobs per second rate.

require 'symphony/metrics'

class Test < Symphony::Task
    prepend Symphony::Metrics
    # ...
end

Routing

The routing plugin removes the requirement to perform all processing in the #work method, and instead links individual units of work to separate #on declarations. This makes tasks that are designed to receive multiple message topics much easier to maintain and test.

require 'symphony/routing'

class Test < Symphony::Task
    include Symphony::Routing

    on 'users.create', 'workstation.create' do |payload, metadata|
        puts "A user or workstation wants to be created!"
        # ...
        return true
    end

    on 'users.delete' do |payload, metadata|
        puts "A user wants to be deleted!"
        return true
    end
end

The #on method accepts the same syntax as the 'subscribe_to' option. In fact, if you're using the routing plugin, it removes the requirement of 'subscribe_to' altogether, making it entirely redundant. #on blocks that match multiple times for a message will be executed in top down order. It will only return true (signalling success) if all blocks return true.

Publishing messages

Because AMQP is language agnostic and a message can have many unique flags attached to it, this is another area where Symphony remains neutral, and imposes nothing.

If you want to publish or republish from within Symphony, you can get a reference to the exchange object Symphony is binding to via the Symphony::Queue class:

exchange = Symphony::Queue.amqp_exchange

This is a Bunny object that you can interact with directly. See the Bunny documentation for options.

How Do I...

Avoid Infinite Retry and/or Failure Loops?

When publishing, include a message expiration. (Again, this has a different goal than the 'timeout' option for the Symphony::Task class, which can dislodge a potentially stuck worker.)

With an expiration, a task that retries will eventually meet it's maximum lifetime, and AMQP will stop delivering it to consumers. If there is a dead letter queue configured, it will place the message there for later inspection.

Report on Errors?

Because messaging is designed to be asynchronous, you won't receive instantaneous results when pushing an event into the void. RabbitMQ has the concept of a 'dead letter queue', which receives events that have failed due to expiration or rejection. They contain the original route information and the original payload.

RabbitMQ also permits setting 'policies', which apply default settings to any created queue under a virtual host. We've been creating an automatic policy that links any queue that doesn't start with '_' to a dead letter queue called '_failures', so all errors filter there without any further configuration. New queues have the same policy applied automatically.

% rabbitmqctl list_policies
Listing policies ...
/     DLX     queues  ^[^_].* {"dead-letter-exchange":"_failures"}    0

There is an example task (symphony/lib/tasks/failure_logger.rb) that you're welcome to use as a foundation for your error reporting. What you do with the errors is up to you -- the failure_logger example assumes it is binding to a dead letter queue, and it simply logs to screen.

Ensure Resiliency for Very Important Messages?

Like good error reporting, this requires server-side configuration to get rolling.

If messages are published when no queues are bound and available to receive them, AMQP drops the message immediately. (If the publisher sets the 'mandatory' flag, they'll receive an error and can act accordingly.)

Instead of setting 'mandatory', you may want to have AMQP accept the message, and save it for task worker to consume at a later time. To do this, you just need to manually create the queue, and the bindings to it from the exchange (you'll probably want these marked as 'durable' as well, to survive RabbitMQ service restarts.)

With a binding in place, messages will be retained on the RabbitMQ server until a consumer drains them. If the messages are published with the 'persistent' flag, they'll also survive RabbitMQ server restarts.

With this setup, Symphony won't create or modify any queues. It will just attach, start receiving events, and get to work.

Updated