Commits

Michael Granger committed e05a422

Checkpoint of work on coverage

Comments (0)

Files changed (4)

+
+[ ] Convert routing / metrics to 'plugins' syntax (inside/outside for prepend/include)
+[ ] Process management w/ Daemon
+[ ] Update pinger/ssh/sshscript
+

lib/groundcontrol/queue.rb

 
 		self.shutting_down = only_one
 		amqp_queue = self.create_amqp_queue( only_one ? 1 : self.prefetch )
-		self.consumer = self.create_consumer( amqp_queue, work_callback )
+		self.consumer = self.create_consumer( amqp_queue, &work_callback )
 
+		self.log.debug "Subscribing to queue with consumer: %p" % [ self.consumer ]
 		amqp_queue.subscribe_with( self.consumer, block: true )
 		amqp_queue.channel.close
 		session.close
 		tag     = self.consumer_tag
 
 		# Last argument is *no_ack*, so need to invert the logic
+		self.log.debug "Creating Bunny::Consumer for %p with tag: %s" % [ amqp_queue, tag ]
 		cons = Bunny::Consumer.new( amqp_queue.channel, amqp_queue, tag, !ackmode )
 
 		cons.on_delivery do |delivery_info, properties, payload|

spec/groundcontrol/queue_spec.rb

 describe GroundControl::Queue do
 
 
-	before( :all ) do
+	before( :each ) do
 		described_class.configure( broker_uri: 'amqp://example.com/%2Ftesty' )
-	end
-
-	before( :each ) do
 		described_class.reset
 	end
 
 	end
 
 
+	it "can use the Bunny-style configuration Hash" do
+		described_class.configure( host: 'spimethorpe.com', port: 23456 )
+		expect( described_class.amqp_session_options ).to include({
+			host: 'spimethorpe.com',
+			port: 23456,
+			heartbeat: :server,
+			logger:    Loggability[ GroundControl ],
+		})
+	end
+
+
+	it "assumes Bunny-style configuration Hash if no broker uri is configured" do
+		described_class.configure( host: 'spimethorpe.com', port: 23456 )
+		described_class.broker_uri = nil
+
+		expect( Bunny ).to receive( :new ).
+			with( described_class.amqp_session_options )
+
+		described_class.amqp_session
+	end
+
+
 	context "bunny interaction" do
 
 
 		end
 
 
-		it "creates a consumer with ACKs enabled if it has acknowledgements enabled" do
+		it "subscribes to the message queue with a configured consumer to wait for messages" do
+			amqp_queue = double( "AMQP queue", channel: described_class.amqp_channel )
+			consumer = double( "Bunny consumer" )
+
+			expect( described_class.amqp_channel ).to receive( :queue ).
+				with( testing_task_class.queue_name, passive: true ).
+				and_return( amqp_queue )
+			expect( described_class.amqp_channel ).to receive( :prefetch ).
+				with( GroundControl::Queue::DEFAULT_PREFETCH )
+
+			expect( Bunny::Consumer ).to receive( :new ).
+				with( described_class.amqp_channel, amqp_queue, queue.consumer_tag, false ).
+				and_return( consumer )
+
+			expect( consumer ).to receive( :on_delivery )
+			expect( consumer ).to receive( :on_cancellation )
+
+			expect( amqp_queue ).to receive( :subscribe_with ).with( consumer, block: true )
+			expect( described_class.amqp_channel ).to receive( :close )
+			expect( session ).to receive( :close )
+
+			queue.wait_for_message {}
+		end
+
+
+		it "raises if wait_for_message is called without a block"
+		it "sets up the queue and consumer to only run once if waiting in one-shot mode"
+
+		it "creates a consumer with acknowledgements enabled if it has acknowledgements enabled" do
 			amqp_channel = double( "AMQP channel" )
 			amqp_queue = double( "AMQP queue", channel: amqp_channel )
 			consumer = double( "Bunny consumer" )
 		end
 
 
-		it "creates a consumer with ACKs disabled if it has acknowledgements disabled" do
+		it "creates a consumer with acknowledgements disabled if it has acknowledgements disabled" do
 			amqp_channel = double( "AMQP channel" )
 			amqp_queue = double( "AMQP queue", channel: amqp_channel )
 			consumer = double( "Bunny consumer" )
 		end
 
 
-		it "yields the payload and metadata to work method"
+		it "it acknowledges the message if acknowledgements are set and the task returns a true value" do
+			channel = double( "amqp channel" )
+			queue.consumer = double( "bunny consumer", channel: channel )
+			delivery_info = double( "delivery info", delivery_tag: 128 )
 
+			expect( channel ).to receive( :acknowledge ).with( delivery_info.delivery_tag )
 
-		it "it NACKs the message if acknowledgements are set and the task raises"
+			queue.handle_message( delivery_info, {content_type: 'text/plain'}, :payload ) do |*|
+				true
+			end
+		end
 
 
-		it "it NACKs the message if acknowledgements are set and the task returns a false value" do
+		it "it rejects the message if acknowledgements are set and the task returns a false value" do
 			channel = double( "amqp channel" )
 			queue.consumer = double( "bunny consumer", channel: channel )
 			delivery_info = double( "delivery info", delivery_tag: 128 )
 		end
 
 
+		it "it permanently rejects the message if acknowledgements are set and the task raises" do
+			channel = double( "amqp channel" )
+			queue.consumer = double( "bunny consumer", channel: channel )
+			delivery_info = double( "delivery info", delivery_tag: 128 )
+
+			expect( channel ).to receive( :reject ).with( delivery_info.delivery_tag, false )
+
+			queue.handle_message( delivery_info, {content_type: 'text/plain'}, :payload ) do |*|
+				raise "Uh-oh!"
+			end
+		end
+
 
 	end
 

spec/groundcontrol/task_spec.rb

 	end
 
 
+	it "cancels the AMQP consumer when it receives an INT signal" do
+		amqp_session = double( "amqp session" )
+		consumer = double( "bunny consumer" )
+
+		allow( Bunny ).to receive( :new ).and_return( amqp_session )
+
+		queue = described_class.queue
+		queue.consumer = consumer
+		task = described_class.new( queue )
+
+		expect( queue.consumer ).to receive( :cancel )
+
+		task.handle_signal( :INT )
+	end
+
+
 	it "closes the AMQP session when it receives a second TERM signal" do
 		amqp_session = double( "amqp session" )
 		channel = double( "AMQP channel" )