Commits

Mahlon Smith committed 694dfa7

Clean up the README a little, add a USAGE document (that may evolve
into a manual later), and allow the #on method in the routing plugin to
accept an array of binding topics.

  • Participants
  • Parent commits 8b74abf

Comments (0)

Files changed (3)

 
 == Description
 
-GroundControl is a subscription-based asynchronous job system. It allows you to define jobs that watch for lightweight events from a distributed-messaging system like an AMQP broker or a ZeroMQ socket, and do work based on their payload.
+GroundControl is a subscription-based asynchronous job system. It
+allows you to define jobs that watch for lightweight events from a
+distributed-messaging AMQP broker, and do work based on their payload.
 
-It's composed of four major parts:
+It includes several executables under bin/:
 
-Queue[rdoc-ref:GroundControl::Queue]::
-Task[rdoc-ref:GroundControl::Task]::
-Worker[rdoc-ref:GroundControl::Worker]::
-Daemon []
+groundcontrol::
+  A daemon which manages startup and shutdown of one or more Workers
+  running Tasks as they are published from a queue.
 
-It also includes several executables under bin/:
+groundcontrol-task::
+  A wrapper that runs a single task, useful for testing, or if you don't
+  require the process management that the groundcontrol daemon provides.
 
-gcworkerd::
-  A daemon which manages startup and shutdown of one or more Workers
-  running Jobs from a Queue.
 
-A few more executables are planned:
+== Synopsis
 
-gcscheduled::
-  A daemon that provides scheduled injection of jobs.
-groundcontrol::
-  A command shell which can manage, inject, schedule, and monitor job
-  execution across multiple Queues.
+	class WorkerTask < GroundControl::Task
+		# Process all events
+		subscribe_to '#'
+
+		def work( payload, metadata )
+			puts "I got a payload! %p" % [ payload ]
+			return true
+		end
+	end
+
+
+For a more detailed description of usage, please refer to the USAGE document.
 
 
 == Installation
+
+= Initial Setup
+
+GroundControl uses the RabbitMQ[http://www.rabbitmq.com/] server
+for speed, redundancy, and high availability.  Installing
+RabbitMQ is outside the scope of this document, please see their
+installation[http://www.rabbitmq.com/download.html] instructions.
+
+Once installed and running, lets move on!
+
+== Nomenclature
+
+Here's a quick review on RabbitMQ vocabulary.  It's helpful to have some
+fundamental understanding of AMQP concepts when using GroundControl, as
+a lot of advanced messaging options are available only via server-side
+configuration.
+
+virtual host::
+  A logical container for exchanges and queues.  You can have as many of
+  these as you like for a RabbitMQ cluster, they are used to partition
+  objects for security purposes.
+
+exchange::
+  An exchange receives incoming messages, and routes them to queues via
+  rules defined in bindings.
+
+queue::
+  Consumers receive messages from a queue, if their binding rules match.
+
+binding::
+  A rule that links/routes messages from an exchange to a queue.
+
+routing key::
+  A period separated hierarchal string that specifies a category for
+  a message.  This is matched against existing bindings for incoming
+  messages.
+
+dead-letter-exchange::
+  An exchange that receives messages that fail for any reason.  Configuring
+  a dead letter exchange is critical for robust error reporting.
+
+
+= Starting Out
+
+GroundControl uses
+Configurability[https://rubygems.org/gems/configurability] to determine
+connection criteria to the RabbitMQ server, which is stored in a
+YAML[http://www.yaml.org/] file.
+
+An example configuration file for GroundControl looks as such, if you
+were connecting to a virtual host called '/test', wanted to log all
+messages to STDERR at a level of 'debug', and had an exchange called
+'groundcontrol' (the default):
+
+	amqp:
+		broker_uri: amqp://USERNAME:PASSWORD@example.com:5672/%2Ftest
+		exchange: groundcontrol
+
+	logging:
+		groundcontrol: debug STDERR (color)
+
+
+GroundControl won't create the exchange for you, it is expected to
+already exist under the specified virtual host.  (There are a lot of
+server-side exchange configuration options, and we don't make any
+assumptions on your behalf.)  The only requirement is that it is a
+'topic' exchange.  If it is of a different kind ('fanout', 'direct',
+etc, it won't work!)  It's also recommended to mark it 'durable', so you
+won't need to re-create it after any RabbitMQ service restarts.
+
+
+= Building a Task
+
+Tasks inherit from the GroundControl::Task class.  You can start an
+individual task directly via the 'groundcontrol-task' binary, or as a
+pool of managed processes via the 'groundcontrol' daemon, or by simply
+calling #run on the class itself.
+
+== The Simplest Task
+
+	#!/usr/bin/env ruby
+	
+	require 'groundcontrol'
+	
+	class Test < GroundControl::Task
+		
+		# Process all events
+		subscribe_to '#'
+		
+		def work( payload, metadata )
+			puts "I received an event: %p with a payload of %p" % [
+				metadata[ :delivery_info ][ :routing_key ],
+				payload
+			]
+			return true
+		end
+	end
+	
+	GroundControl.load_config( 'etc/config.yml' )
+	Test.run
+
+
+The only requirement is the 'subscribe_to' clause, which indicates what
+topics the queue will receive from the exchange.  Subscribing to '#'
+means this task will receive anything and everything that is published.
+In practice, you'll likely want to be more discerning.  You can specify
+as many comma separated topics as you like for a given task, as well as
+use AMQP wildcard characters.
+
+	# Subscribe to hypothetical creation events for all types,
+	# and deletion events for users.
+	#
+	subscribe_to '#.create, 'users.delete'
+
+In this fashion, you can decide to have many separate (and optionally
+distributed) tasks that only respond to a small subset of possible
+events, or more monolithic tasks that can respond to a variety of event
+topics.
+
+Because AMQP manages the messages that the bound consumers receive,
+starting up multiple copies of this Test task (across any number of
+machines) will automatically cause published events to be received in
+a round-robin fashion without any additional effort.  All running task
+workers will receive roughly equal numbers of matching events.
+
+
+== An Aside on Queues and Binding Behavior
+
+A task will automatically create an auto-delete queue for itself with a
+sane (but configurable) name, along with subscription bindings, ONLY if
+the queue doesn't previously exist.  The auto-delete flag ensures that
+when the last worker disconnects, the queue is automatically removed
+from the AMQP broker, setting everything back to the state before a
+GroundControl worker connected.
+
+Along the same logic as the initial exchange creation, if a matching
+queue exists already for a task name (an automatically generated or
+manually specified name with the 'queue_name' option), then a binding is
+NOT created.  GroundControl is 'hands off' for anything server side that
+already exists, so you can enforce specific behaviors, and know that
+GroundControl won't fiddle with them.
+
+This can be confusing if you've created a queue manually for a task,
+and didn't also manually create a binding for the topic key.  There are
+some advanced routing cases where you'll want to set up queues yourself,
+rather than let GroundControl automatically create and remove them,
+we'll talk more on that later.
+
+
+== Return Values
+
+By default, a Task is configured to tell AMQP when it has completed its
+job.  It does this by returning +true+ from the #work method.  When AMQP
+sees the job was completed, the message is considered "delivered", and
+it's lifetime is at an end.  If instead, the task returns an explicit
++false+, the message is retried, potentially on a different task worker.
+
+If something within the task raises an exception, the default behavior
+is to permanently abort the task.  If you need different behavior,
+you'll need to catch the exception.  As the task author, you're in the
+best position to know if a failure state requires an immediate retry,
+or a permanent rejection.  If you'll be allowing message retries, you
+might also want to consider publishing messages with a maximum TTL, to
+avoid any situations that could cause jobs to loop infinitely.  (Default
+Max-TTL is an example of something that is settable when creating queues
+on the RabbitMQ server manually.)
+
+
+= Task Options
+
+== Message acknowledgement
+
+If you don't require retry or reject behaviors on task error states, you
+can set 'acknowledge' to +false+.  With this setting, the AMQP server
+considers a message as "delivered" the moment a consumer receives it.
+This can be useful for additional speed when processing non-important
+events.
+
+	acknowledge false   # (default: true)
+
+
+== Worker Model
+
+By default, a task signals that is ready for more messages as soon as
+it finishes processing one.  This isn't always the optimal environment
+for long running processes.  The work you want to accomplish might
+require heavy computing resources, and you might want to do things
+like relinquish memory in between messages, or disconnect from network
+resources (databases, etc.)
+
+Settings your work_model to 'oneshot' causes the task to exit
+immediately after performing its work.  Clearly, this only makes sense
+if you're running managed processes under the GroundControl daemon, so a
+fresh process can take its place.  You can do all of your expensive work
+within the #work method, leaving the main "waiting for messages" loop as
+low-resource as possible.
+
+	work_model :oneshot  # (default:  :longlived)
+
+
+== Message Prefetch
+
+Prefetch is another tuning for speed on extremely busy exchanges. If
+a task worker sees there are additional pending messages, it can
+concurrently retrieve and store them locally in memory while performing
+current work.  This can reduce consumer/server network traffic, but the
+trade off is that all prefetched message payloads are stored in memory
+until they are processed.  It's a good idea to keep this low if your
+payloads are large.
+
+	prefetch 100   # (default: 10)
+
+Message prefetch is ignored (automatically set to 1) if the task's
+work_model is set to oneshot.
+
+
+== Timeout
+
+The timeout option specifies the maximum length of time (in seconds)
+a task can be within its #work method, and what action to take if it
+exceeds that timeframe.  Please note, this is different from the Max-TTL
+setting for AMQP, which dictates the maximum timeframe a message can
+exist in a queue.
+
+AMQP can't know if a task is in ruby loop or otherwise unfortunate
+state, so this can ensure workers won't ever get permanently stuck.
+There is no default timeout.  If one is set, the default action is to
+act as an exception, which is a permanent rejection.  You can choose to
+retry instead:
+
+	# perform work for maximum 20 seconds, then stop and try
+	# again on another worker
+	#
+	timeout 20.0, :action => :retry
+
+== Queue Name
+
+By default, GroundControl will try and create a queue based on the Class
+name of the task.  You can override this per-task.
+
+	class Test < GroundControl::Task
+		
+		# Process all events
+		subscribe_to '#'
+		
+		# I don't want to connect to a 'test' queue, lets make it interesting:
+		queue_name 'shazzzaaaam'
+			
+		def work( payload, metadata )
+		...
+
+
+= Plugins
+
+Plugins change or enhance the behavior of a GroundControl::Task.  They
+are enabled with the 'plugin' option, with the name of the plugin (or
+comma separated plugins) as a symbol argument.  GroundControl currently
+ships with two.
+
+
+== Metrics
+
+The metrics plugin provides periodic information about the performance
+of a task worker to the log.  It shows processed message averages and
+resource consumption summaries at the "info" log level, and also changes
+the process name to display a total count and jobs per second rate.
+
+	plugin :metrics
+
+
+== Routing
+
+The routing plugin removes the requirement to perform all processing
+in the #work method, and instead links individual units of work to
+separate #on declarations.  This makes tasks that are designed to
+receive multiple message topics much easier to maintain and test.
+
+	class Test < GroundControl::Task
+		
+		# Use the routing plugin
+		plugin :routing
+		
+		on 'users.create', 'workstation.create' do |payload, metadata|
+			puts "A user or workstation wants to be created!"
+			# ...
+			return true
+		end
+		
+		on 'users.delete' do |payload, metadata|
+			puts "A user wants to be deleted!"
+			return true
+		end
+	end
+
+The #on method accepts the same syntax as the 'subscribe_to' option.  In
+fact, if you're using the routing plugin, it removes the requirement of
+'subscribe_to' altogether, making it entirely redundant. #on blocks that
+match multiple times for a message will be executed in top down order.
+It will only return true (signalling success) if all blocks return true.
+
+
+= Publishing messages
+
+Because AMQP is language agnostic and a message can have many unique
+flags attached to it, this is another area where GroundControl remains
+neutral, and imposes nothing.
+
+If you want to publish or republish from within GroundControl, you can
+get a reference to the exchange object GroundControl is binding to via
+the GroundControl::Queue class:
+
+	exchange = GroundControl::Queue.amqp_exchange
+
+This is a Bunny object that you can interact with directly.  See the
+Bunny documentation[http://rubybunny.info/articles/exchanges.html] for
+options.
+
+
+= How Do I... ?
+
+== Avoid Infinite Retry and/or Failure Loops?
+
+When publishing, include a message expiration.  (Again, this has a
+different goal than the 'timeout' option for the GroundControl::Task
+class, which can potentially dislodge a potentially stuck worker.)
+
+With an expiration, a task that retries will eventually meet it's
+maximum lifetime, and AMQP will stop delivering it to consumers.  If
+there is a dead letter queue configured, it will place the message there
+for later inspection.
+
+
+== Report on Errors?
+
+Because messaging is designed to be asynchronous, you won't receive
+instantaneous results when pushing an event into the void.  RabbitMQ has
+the concept of a 'dead letter queue', which receives events that have
+failed due to expiration or rejection.  They contain the original route
+information, and the original payload.
+
+RabbitMQ also permits setting 'policies', which apply default settings
+to any created queue under a virtual host.  We've been creating an
+automatic policy that links any queue that doesn't start with '_' to a
+dead letter queue called '_failures', so all errors filter there without
+any further configuration.  New queues have the same policy applied
+automatically.
+
+	% rabbitmqctl list_policies
+	Listing policies ...
+	/     DLX     queues  ^[^_].* {"dead-letter-exchange":"_failures"}    0
+
+There is an example task (groundcontrol/lib/tasks/failure_logger.rb)
+that you're welcome to use as a foundation for your error reporting.
+What you do with the errors is up to you -- the failure_logger example
+assumes it is binding to a dead letter queue, and it simply logs to
+screen.
+
+
+== Ensure Resiliency for Very Important Messages?
+
+Like good error reporting, this requires server-side configuration to
+get rolling.
+
+If messages are published when no queues are bound and available to
+receive them, AMQP drops the message immediately.  (If the publisher
+sets the 'mandatory' flag, they'll receive an error and can act
+accordingly.)
+
+Instead of setting 'mandatory', you may want to have AMQP accept the
+message, and save it for task worker to consume at a later time.  To do
+this, you just need to manually create the queue, and the bindings to
+it from the exchange (you'll probably want these marked as 'durable' as
+well, to survive RabbitMQ service restarts.)
+
+With a binding in place, messages will be retained on the RabbitMQ
+server until a consumer drains them.  If the messages are published with
+the 'persistent' flag, they'll also survive RabbitMQ server restarts.
+
+With this setup, GroundControl won't create or modify any queues.  It
+will just attach, start receiving events, and get to work.
+

lib/groundcontrol/routing.rb

 
 		### Register an event pattern and a block to execute when an event
 		### matching that pattern is received.
-		def on( pattern, &block )
-			methodobj = self.make_handler_method( pattern, &block )
-			self.routing_keys << pattern
+		def on( *route_patterns, &block )
+			route_patterns.each do |pattern|
+				methodobj = self.make_handler_method( pattern, &block )
+				self.routing_keys << pattern
 
-			pattern_re = self.make_routing_pattern( pattern )
-			self.routes[ pattern_re ] << methodobj
+				pattern_re = self.make_routing_pattern( pattern )
+				self.routes[ pattern_re ] << methodobj
+			end
 		end