Commits

Michael Granger committed 9857865

Start catching specs up with the code.

Comments (0)

Files changed (7)

 
 	spec.require_ruby_version( '>=2.0.0' )
 	spec.hg_sign_tags = true if spec.respond_to?( :hg_sign_tags= )
+
+	self.rdoc_locations << "deveiate:/usr/local/www/public/code/#{remote_rdoc_dir}"
 end
 
 ENV['VERSION'] ||= hoespec.spec.version.to_s

lib/groundcontrol.rb

 	       Configurability
 
 	# Library version constant
-	VERSION = '0.3.0'
+	VERSION = '0.4.0'
 
 	# Version-control revision constant
 	REVISION = %q$Revision$

lib/groundcontrol/queue.rb

 		heartbeat:   'server',
 	}
 
+	# The default number of messages to prefetch
+	DEFAULT_PREFETCH = 10
+
 
 	# Loggability API -- set up groundcontrol's logger
 	log_to :groundcontrol
 	end
 
 
+	### Return a queue configured for the specified +task_class+.
+	def self::for_task( task_class )
+		args = [
+			task_class.queue_name,
+			task_class.acknowledge,
+			task_class.consumer_tag,
+			task_class.routing_keys,
+			task_class.prefetch
+		]
+		return new( *args )
+	end
+
+
+
 	### Create a new Queue with the specified configuration.
 	def initialize( name, acknowledge, consumer_tag, routing_keys, prefetch )
 		@name          = name
 	attr_reader :prefetch
 
 	# The Bunny::Consumer that is dispatching messages for the queue.
-	attr_reader :consumer
+	attr_accessor :consumer
 
 	##
 	# The flag for shutting the queue down.
 
 		self.shutting_down = only_one
 		amqp_queue = self.create_amqp_queue( only_one ? 1 : self.prefetch )
-		@consumer = self.create_consumer( amqp_queue, work_callback )
+		self.consumer = self.create_consumer( amqp_queue, work_callback )
 
-		amqp_queue.subscribe_with( @consumer, block: true )
+		amqp_queue.subscribe_with( self.consumer, block: true )
 		amqp_queue.channel.close
 		session.close
 	end
 
 
 	### Create the Bunny::Consumer that will dispatch messages from the broker.
-	def create_consumer( amqp_queue, work_callback )
+	def create_consumer( amqp_queue, &work_callback )
 		ackmode = self.acknowledge
 		tag     = self.consumer_tag
 
-		consumer = Bunny::Consumer.new( amqp_queue.channel, amqp_queue, tag, !ackmode )
+		# Last argument is *no_ack*, so need to invert the logic
+		cons = Bunny::Consumer.new( amqp_queue.channel, amqp_queue, tag, !ackmode )
 
-		consumer.on_delivery do |delivery_info, properties, payload|
-			rval = self.handle_message( delivery_info, properties, payload, work_callback )
+		cons.on_delivery do |delivery_info, properties, payload|
+			rval = self.handle_message( delivery_info, properties, payload, &work_callback )
 			self.log.debug "Done with message %s. Session is %s" %
 					[ delivery_info.delivery_tag, self.class.amqp_session.closed? ? "closed" : "open" ]
-			consumer.cancel if self.shutting_down?
+			cons.cancel if self.shutting_down?
 		end
 
-		consumer.on_cancellation do 
+		cons.on_cancellation do
 			self.log.warn "Consumer cancelled."
 			self.shutdown
 		end
 
+		return cons
 	end
 
 
 	### Create the AMQP queue from the task class and bind it to the configured exchange.
-	def create_amqp_queue( prefetch_count=10 )
+	def create_amqp_queue( prefetch_count=DEFAULT_PREFETCH )
 		exchange = self.class.amqp_exchange
 		channel = self.class.amqp_channel
 
-		channel.prefetch( prefetch_count )
-
 		begin
 			queue = channel.queue( self.name, passive: true )
+			channel.prefetch( prefetch_count )
 			self.log.info "Using pre-existing queue: %s" % [ self.name ]
 			return queue
 		rescue Bunny::NotFound => err
 			self.log.info "%s; using an auto-delete queue instead." % [ err.message ]
 			channel = self.class.reset_amqp_channel
+			channel.prefetch( prefetch_count )
 
 			queue = channel.queue( self.name, auto_delete: true )
 			self.routing_keys.each do |key|
 
 
 	### Handle each subscribed message.
-	def handle_message( delivery_info, properties, payload, work_callback )
+	def handle_message( delivery_info, properties, payload, &work_callback )
 		metadata = {
 			delivery_info: delivery_info,
 			properties: properties,

lib/groundcontrol/task.rb

 		subclass.instance_variable_set( :@acknowledge, true )
 		subclass.instance_variable_set( :@work_model, :longlived )
 		subclass.instance_variable_set( :@prefetch, 10 )
-		subclass.instance_variable_set( :@queue_name, subclass.default_queue_name )
 		subclass.instance_variable_set( :@timeout_action, :reject )
 	end
 
 	### Fetch the GroundControl::Queue for this task, creating it if necessary.
 	def self::queue
 		unless @queue
-			queue_args = [
-				self.queue_name,
-				self.acknowledge,
-				self.consumer_tag,
-				self.routing_keys,
-				self.prefetch
-			]
-			@queue = GroundControl::Queue.new( *queue_args )
+			@queue = GroundControl::Queue.for_task( self )
 		end
 		return @queue
 	end
 			@queue_name = new_name
 		end
 
+		@queue_name ||= self.default_queue_name
 		return @queue_name
 	end
 

lib/groundcontrol/tasks/pinger.rb

 require 'groundcontrol/task' unless defined?( GroundControl::Task )
 
 
-session = Bunny.new( 'amqp://localhost:5672', vhost: '/acme/jobs' )
-session.start
-channel = session.create_channel
-exchange = channel.topic( 'events' )
-
-payload = Yajl.dump({ hostname: 'www.acme.com', port: 'www' })
-exchange.publish( payload, routing_key: 'monitor.availability.port' )
-
-
 ### A proof-of-concept task to determine ssh availability of a host.
 class GroundControl::Task::Pinger < GroundControl::Task
 

spec/groundcontrol/queue_spec.rb

 
 	context "instance" do
 
+		let( :queue ) { described_class.for_task(testing_task_class) }
+
+		let( :testing_task_class ) { Class.new(GroundControl::Task) }
+		let( :session ) { double("Bunny session", :start => true ) }
+
 		before( :each ) do
-			@testing_task_class = Class.new( GroundControl::Task )
-			@bunny = double( "Bunny session" )
-			@channel = double( "Bunny channel" )
-			@exchange = double( "GroundControl exchange" )
-
-			allow( Bunny ).to receive( :new ).
-				with( described_class.broker_uri, described_class.amqp_session_options ).
-				and_return( @bunny )
-			allow( @bunny ).to receive( :start )
-			allow( @bunny ).to receive( :create_channel ).and_return( @channel )
-			allow( @channel ).to receive( :topic ).
-				with( described_class.exchange, passive: true ).
-				and_return( @exchange )
+			allow( Bunny ).to receive( :new ).and_return( session )
+			described_class.amqp[:exchange] = double( "AMQP exchange" )
+			described_class.amqp[:channel] = double( "AMQP channel" )
 		end
 
 
 		it "creates an auto-deleted queue for the task if one doesn't already exist" do
-			@testing_task_class.subscribe_to 'tests.unit'
-			queue = described_class.new( @testing_task_class )
+			expect( described_class.amqp_channel ).to receive( :queue ).
+				with( queue.name, passive: true ).
+				and_raise( Bunny::NotFound.new("no such queue", described_class.amqp_channel, true) )
+			expect( described_class.amqp_channel ).to receive( :open? ).
+				and_return( false )
 
-			amqp_queue = double( "amqp queue" )
-			allow( @exchange ).to receive( :name ).and_return( "exchange" )
+			# Channel is reset after queue creation fails
+			new_channel = double( "New AMQP channel" )
+			amqp_queue = double( "AMQP queue" )
+			allow( described_class.amqp_session ).to receive( :create_channel ).
+				and_return( new_channel )
+			expect( new_channel ).to receive( :prefetch ).
+				with( GroundControl::Queue::DEFAULT_PREFETCH )
+			expect( new_channel ).to receive( :queue ).
+				with( queue.name, auto_delete: true ).
+				and_return( amqp_queue )
 
-			expected_exception = Bunny::NotFound.new( "oopsie! no queue!", @channel, :frame )
-			expect( @channel ).to receive( :queue ).
-				with( @testing_task_class.queue_name, passive: true ).
-				and_raise( expected_exception )
-			expect( @channel ).to receive( :open? ).and_return( false )
-			expect( @channel ).to receive( :queue ).
-				with( @testing_task_class.queue_name, auto_delete: true ).
-				and_return( amqp_queue )
-			expect( amqp_queue ).to receive( :bind ).
-				with( @exchange, routing_key: 'tests.unit' )
-
-			expect( queue.create_queue ).to be( amqp_queue )
+			expect( queue.create_amqp_queue ).to be( amqp_queue )
 		end
 
 
 		it "re-uses the existing queue on the broker if it already exists" do
-			@testing_task_class.subscribe_to 'tests.unit'
-			queue = described_class.new( @testing_task_class )
+			amqp_queue = double( "AMQP queue" )
+			expect( described_class.amqp_channel ).to receive( :queue ).
+				with( queue.name, passive: true ).
+				and_return( amqp_queue )
+			expect( described_class.amqp_channel ).to receive( :prefetch ).
+				with( GroundControl::Queue::DEFAULT_PREFETCH )
 
-			amqp_queue = double( "amqp queue" )
-			allow( @exchange ).to receive( :name ).and_return( "exchange" )
-
-			expect( @channel ).to receive( :queue ).
-				with( @testing_task_class.queue_name, passive: true ).
-				and_return( amqp_queue )
-
-			expect( queue.create_queue ).to be( amqp_queue )
+			expect( queue.create_amqp_queue ).to be( amqp_queue )
 		end
 
 
-		it "subscribes with ACKs enabled if the task it belongs to has acknowledgements set" do
-			@testing_task_class.acknowledge( true )
-			@testing_task_class.subscribe_to 'tests.unit'
-			queue = described_class.new( @testing_task_class )
+		it "creates a consumer with ACKs enabled if it has acknowledgements enabled" do
+			amqp_channel = double( "AMQP channel" )
+			amqp_queue = double( "AMQP queue", channel: amqp_channel )
+			consumer = double( "Bunny consumer" )
 
-			amqp_queue = double( "amqp queue" )
-			allow( @exchange ).to receive( :name ).and_return( "exchange" )
+			# Ackmode argument is actually 'no_ack'
+			expect( Bunny::Consumer ).to receive( :new ).
+				with( amqp_channel, amqp_queue, queue.consumer_tag, false ).
+				and_return( consumer )
+			expect( consumer ).to receive( :on_delivery )
+			expect( consumer ).to receive( :on_cancellation )
 
-			expect( @channel ).to receive( :queue ).
-				with( @testing_task_class.queue_name, passive: true ).
-				and_return( amqp_queue )
-			expect( amqp_queue ).to receive( :subscribe ).
-				with( ack: true, block: true, consumer_tag: @testing_task_class.consumer_tag )
-
-			queue.each_message { }
+			expect( queue.create_consumer(amqp_queue) ).to be( consumer )
 		end
 
 
-		it "subscribes with ACKs disabled if the task it belongs to has acknowledgements unset" do
-			@testing_task_class.acknowledge( false )
-			@testing_task_class.subscribe_to 'tests.unit'
-			queue = described_class.new( @testing_task_class )
+		it "creates a consumer with ACKs disabled if it has acknowledgements disabled" do
+			amqp_channel = double( "AMQP channel" )
+			amqp_queue = double( "AMQP queue", channel: amqp_channel )
+			consumer = double( "Bunny consumer" )
 
-			amqp_queue = double( "amqp queue" )
-			allow( @exchange ).to receive( :name ).and_return( "exchange" )
+			# Ackmode argument is actually 'no_ack'
+			queue.instance_variable_set( :@acknowledge, false )
+			expect( Bunny::Consumer ).to receive( :new ).
+				with( amqp_channel, amqp_queue, queue.consumer_tag, true ).
+				and_return( consumer )
+			expect( consumer ).to receive( :on_delivery )
+			expect( consumer ).to receive( :on_cancellation )
 
-			expect( @channel ).to receive( :queue ).
-				with( @testing_task_class.queue_name, passive: true ).
-				and_return( amqp_queue )
-			expect( amqp_queue ).to receive( :subscribe ).
-				with( ack: false, block: true, consumer_tag: @testing_task_class.consumer_tag )
-
-			queue.each_message { }
+			expect( queue.create_consumer(amqp_queue) ).to be( consumer )
 		end
 
 
-		it "yields the payload and metadata to the block passed to #each_message" do
-			@testing_task_class.subscribe_to 'tests.unit'
-			queue = described_class.new( @testing_task_class )
+		it "yields the payload and metadata to work method"
+
+
+		it "it NACKs the message if acknowledgements are set and the task raises"
+
+
+		it "it NACKs the message if acknowledgements are set and the task returns a false value" do
+			channel = double( "amqp channel" )
+			queue.consumer = double( "bunny consumer", channel: channel )
 			delivery_info = double( "delivery info", delivery_tag: 128 )
 
-			amqp_queue = double( "amqp queue" )
-			allow( @exchange ).to receive( :name ).and_return( "exchange" )
+			expect( channel ).to receive( :reject ).with( delivery_info.delivery_tag, true )
 
-			expect( @channel ).to receive( :queue ).
-				with( @testing_task_class.queue_name, passive: true ).
-				and_return( amqp_queue )
-			expect( amqp_queue ).to receive( :subscribe ).
-				and_yield( delivery_info, {content_type: 'text/plain'}, :payload )
-			expect( @channel ).to receive( :acknowledge ).with( delivery_info.delivery_tag )
-
-			message_params = []
-			queue.each_message do |payload, metadata|
-				message_params << payload << metadata
-				true
-			end
-			expect( message_params[0] ).to eq( :payload )
-			expect( message_params[1] ).to eq({
-				content_type: 'text/plain',
-				properties: {content_type: 'text/plain'},
-				delivery_info: delivery_info
-			})
-		end
-
-
-		it "it NACKs the message if acknowledgements are set and the task raises" do
-			@testing_task_class.subscribe_to 'tests.unit'
-			queue = described_class.new( @testing_task_class )
-			delivery_info = double( "delivery info", delivery_tag: 128 )
-
-			amqp_queue = double( "amqp queue" )
-			allow( @exchange ).to receive( :name ).and_return( "exchange" )
-
-			expect( @channel ).to receive( :queue ).
-				with( @testing_task_class.queue_name, passive: true ).
-				and_return( amqp_queue )
-			expect( amqp_queue ).to receive( :subscribe ).
-				and_yield( delivery_info, {content_type: 'text/plain'}, :payload )
-			expect( @channel ).to receive( :reject ).with( delivery_info.delivery_tag, true )
-
-			queue.each_message do |*|
-				raise "Ooops! I dropped it!"
-			end
-		end
-
-
-		it "it NACKs the message if acknowledgements are set and the task returns a false value" do
-			@testing_task_class.subscribe_to 'tests.unit'
-			queue = described_class.new( @testing_task_class )
-			delivery_info = double( "delivery info", delivery_tag: 128 )
-
-			amqp_queue = double( "amqp queue" )
-			allow( @exchange ).to receive( :name ).and_return( "exchange" )
-
-			expect( @channel ).to receive( :queue ).
-				with( @testing_task_class.queue_name, passive: true ).
-				and_return( amqp_queue )
-			expect( amqp_queue ).to receive( :subscribe ).
-				and_yield( delivery_info, {content_type: 'text/plain'}, :payload )
-			expect( @channel ).to receive( :reject ).with( delivery_info.delivery_tag, true )
-
-			queue.each_message do |*|
+			queue.handle_message( delivery_info, {content_type: 'text/plain'}, :payload ) do |*|
 				false
 			end
 		end

spec/groundcontrol/task_spec.rb

 		GroundControl::Queue.reset
 	end
 
+	after( :each ) do
+		# reset signal handlers
+		GroundControl::Task::SIGNALS.each do |sig|
+			Signal.trap( sig, :DFL )
+		end
+	end
 
-	it "closes the AMQP session when it receives a TERM signal" do
+
+	it "cancels the AMQP consumer when it receives a TERM signal" do
 		amqp_session = double( "amqp session" )
+		consumer = double( "bunny consumer" )
+
 		allow( Bunny ).to receive( :new ).and_return( amqp_session )
 
-		task = described_class.new( described_class.queue )
+		queue = described_class.queue
+		queue.consumer = consumer
+		task = described_class.new( queue )
 
-		expect( amqp_session ).to receive( :close )
+		expect( queue.consumer ).to receive( :cancel )
+
+		task.handle_signal( :TERM )
+	end
+
+
+	it "closes the AMQP session when it receives a second TERM signal" do
+		amqp_session = double( "amqp session" )
+		channel = double( "AMQP channel" )
+		consumer = double( "bunny consumer", channel: channel )
+
+		allow( Bunny ).to receive( :new ).and_return( amqp_session )
+
+		queue = described_class.queue
+		queue.consumer = consumer
+		task = described_class.new( queue )
+		task.shutting_down = true
+
+		expect( queue.consumer.channel ).to receive( :close )
+
 		task.handle_signal( :TERM )
 	end