1. Michael Granger
  2. symphony

Commits

Michael Granger  committed 221b258

Allow a task to declare its queue as persistant.

  • Participants
  • Parent commits 03efe7b
  • Branches default

Comments (0)

Files changed (4)

File Manifest.txt

View file
 lib/symphony/tasks/auditor.rb
 lib/symphony/tasks/failure_logger.rb
 lib/symphony/tasks/simulator.rb
-lib/symphony/tasks/ssh.rb
-lib/symphony/tasks/sshscript.rb
 spec/helpers.rb
 spec/symphony/daemon_spec.rb
 spec/symphony/intervalexpression_spec.rb

File lib/symphony/daemon.rb

View file
 	CONFIG_DEFAULTS = {
 		throttle_max:    16,
 		throttle_factor: 1,
-		tasks: []
+		tasks:           []
 	}
 
 	# Signals we understand

File lib/symphony/queue.rb

View file
 			task_class.acknowledge,
 			task_class.consumer_tag,
 			task_class.routing_keys,
-			task_class.prefetch
+			task_class.prefetch,
+			task_class.persistent
 		]
 		return new( *args )
 	end
 
 
 	### Create a new Queue with the specified configuration.
-	def initialize( name, acknowledge, consumer_tag, routing_keys, prefetch )
+	def initialize( name, acknowledge, consumer_tag, routing_keys, prefetch, persistent )
 		@name          = name
 		@acknowledge   = acknowledge
 		@consumer_tag  = consumer_tag
 		@routing_keys  = routing_keys
 		@prefetch      = prefetch
+		@persistent    = persistent
 
 		@amqp_queue    = nil
 		@shutting_down = false
 	# The maximum number of un-acked messages to prefetch
 	attr_reader :prefetch
 
+	# Whether or not to create a persistent queue
+	attr_reader :persistent
+
+	# The underlying Bunny::Queue this object manages
+	attr_reader :amqp_queue
+
 	# The Bunny::Consumer that is dispatching messages for the queue.
 	attr_accessor :consumer
 
 			channel = self.class.reset_amqp_channel
 			channel.prefetch( prefetch_count )
 
-			queue = channel.queue( self.name, auto_delete: true )
+			queue = channel.queue( self.name, auto_delete: !self.persistent )
 			self.routing_keys.each do |key|
 				self.log.info "  binding queue %s to the %s exchange with topic key: %s" %
 					[ self.name, exchange.name, key ]

File lib/symphony/task.rb

View file
 		subclass.instance_variable_set( :@work_model, :longlived )
 		subclass.instance_variable_set( :@prefetch, 10 )
 		subclass.instance_variable_set( :@timeout_action, :reject )
+		subclass.instance_variable_set( :@persistent, false )
 	end
 
 
 	end
 
 
+	### Create the queue the task consumes from as a persistent queue, so it
+	### will continue to receive events even if the task is no longer consuming them.
+	### This only effects queues which are not already declared, so if the
+	### bindings for the queue change you'll need to delete the existing queue
+	### before starting up to have them take effect.
+	def self::persistent( new_setting=nil )
+		if new_setting
+			@persistent = new_setting
+		end
+		return @persistent
+	end
+
+
 	#
 	# Instance Methods
 	#