Commits

Anonymous committed 3673b0e Merge

Branch merge from 221b25852ba6.

Comments (0)

Files changed (7)

 lib/symphony/tasks/auditor.rb
 lib/symphony/tasks/failure_logger.rb
 lib/symphony/tasks/simulator.rb
-lib/symphony/tasks/ssh.rb
-lib/symphony/tasks/sshscript.rb
 spec/helpers.rb
 spec/symphony/daemon_spec.rb
 spec/symphony/mixins_spec.rb
 	self.rdoc_locations << "deveiate:/usr/local/www/public/code/#{remote_rdoc_dir}"
 end
 
+# Fix some Hoe retardedness
+hoespec.spec.files.delete( '.gemtest' )
 ENV['VERSION'] ||= hoespec.spec.version.to_s
 
 # Run the tests before checking in
 
 
 task :gemspec => GEMSPEC
-file GEMSPEC => __FILE__ do |task|
+file GEMSPEC => hoespec.spec.files do |task|
 	spec = $hoespec.spec
-	spec.files.delete( '.gemtest' )
 	spec.version = "#{spec.version}.pre#{Time.now.strftime("%Y%m%d%H%M%S")}"
 	File.open( task.name, 'w' ) do |fh|
 		fh.write( spec.to_ruby )
 
 task :default => :gemspec
 
+CLOBBER.include( GEMSPEC.to_s )
+

lib/symphony/daemon.rb

 	CONFIG_DEFAULTS = {
 		throttle_max:    16,
 		throttle_factor: 1,
-		tasks: []
+		tasks:           []
 	}
 
 	# Signals we understand

lib/symphony/queue.rb

 			task_class.acknowledge,
 			task_class.consumer_tag,
 			task_class.routing_keys,
-			task_class.prefetch
+			task_class.prefetch,
+			task_class.persistent
 		]
 		return new( *args )
 	end
 
 
 	### Create a new Queue with the specified configuration.
-	def initialize( name, acknowledge, consumer_tag, routing_keys, prefetch )
+	def initialize( name, acknowledge, consumer_tag, routing_keys, prefetch, persistent )
 		@name          = name
 		@acknowledge   = acknowledge
 		@consumer_tag  = consumer_tag
 		@routing_keys  = routing_keys
 		@prefetch      = prefetch
+		@persistent    = persistent
 
 		@amqp_queue    = nil
 		@shutting_down = false
 	# The maximum number of un-acked messages to prefetch
 	attr_reader :prefetch
 
+	# Whether or not to create a persistent queue
+	attr_reader :persistent
+
+	# The underlying Bunny::Queue this object manages
+	attr_reader :amqp_queue
+
 	# The Bunny::Consumer that is dispatching messages for the queue.
 	attr_accessor :consumer
 
 			channel = self.class.reset_amqp_channel
 			channel.prefetch( prefetch_count )
 
-			queue = channel.queue( self.name, auto_delete: true )
+			queue = channel.queue( self.name, auto_delete: !self.persistent )
 			self.routing_keys.each do |key|
 				self.log.info "  binding queue %s to the %s exchange with topic key: %s" %
 					[ self.name, exchange.name, key ]

lib/symphony/task.rb

 		subclass.instance_variable_set( :@work_model, :longlived )
 		subclass.instance_variable_set( :@prefetch, 10 )
 		subclass.instance_variable_set( :@timeout_action, :reject )
+		subclass.instance_variable_set( :@persistent, false )
 	end
 
 
 	end
 
 
+	### Create the queue the task consumes from as a persistent queue, so it
+	### will continue to receive events even if the task is no longer consuming them.
+	### This only effects queues which are not already declared, so if the
+	### bindings for the queue change you'll need to delete the existing queue
+	### before starting up to have them take effect.
+	def self::persistent( new_setting=nil )
+		if new_setting
+			@persistent = new_setting
+		end
+		return @persistent
+	end
+
+
 	#
 	# Instance Methods
 	#

lib/symphony/tasks/ssh.rb

-#!/usr/bin/env ruby
-
-require 'shellwords'
-require 'symphony/task' unless defined?( Symphony::Task )
-
-
-### A base SSH class for connecting to remote hosts, running commands,
-### and collecting output.
-class Symphony::Task::SSH < Symphony::Task
-	extend MethodUtilities
-
-	### Create a new SSH task for the given +job+ and +queue+.
-	def initialize( queue, job )
-		super
-
-		# The default path to the ssh binary.
-		@path = self.options[:ssh_path] || '/usr/bin/ssh'
-
-		# Default ssh behavior arguments.
-		@ssh_args = self.options[:ssh_args] || [
-			'-e', 'none',
-			'-T',
-			'-x',
-			'-q',
-			'-o', 'CheckHostIP=no',
-			'-o', 'BatchMode=yes',
-			'-o', 'StrictHostKeyChecking=no'
-		]
-
-		# required arguments
-		@hostname = self.options[:hostname] or raise ArgumentError, "no hostname specified"
-		@command  = self.options[:command]  or raise ArgumentError, "no command specified"
-
-		# optional arguments
-		@port = self.options[:port] || 22
-		@user = self.options[:user] || 'root'
-		@key  = self.options[:key]
-
-		@output = nil
-		@return_value = nil
-	end
-
-	# The default path to the ssh binary.
-	attr_reader :path
-
-	# Default ssh behavior arguments.
-	attr_reader :ssh_args
-
-	# The hostname to connect to.
-	attr_reader :hostname
-
-	# The command to run on the remote host.
-	attr_reader :command
-
-	# The key to use for authentication.
-	attr_reader :key
-
-	# The remote ssh port.
-	attr_reader :port
-
-	# Connect to the remote host as this user. Defaults to 'root'.
-	attr_reader :user
-
-
-	### Call ssh and capture output.
-	def run
-		@return_value = self.open_connection do |reader, writer|
-			self.log.debug "Writing command #{self.command}..."
-			writer.puts( self.command )
-			self.log.debug "  closing child's writer."
-			writer.close
-			self.log.debug "  reading from child."
-			reader.read
-		end
-	end
-
-
-	### Emit the output from the remote ssh call
-	def on_completion
-		if @return_value
-			self.log.info "Remote exited with %d, output: %s" % [ @return_value.exitstatus, @output ]
-		end
-	end
-
-
-	#########
-	protected
-	#########
-
-	### Call ssh and yield the remote IO objects to the caller,
-	### cleaning up afterwards.
-	def open_connection
-		raise LocalJumpError, "no block given" unless block_given?
-
-		fqdn = self.expand_hostname( self.hostname ).
-			find {|hostname| self.ping(hostname, self.port) } or
-			raise "Unable to find an on-network host for %s:%d" % [ self.hostname, self.port ]
-
-		cmd = []
-		cmd << self.path
-		cmd += self.ssh_args
-		cmd << '-p' << self.port.to_s
-		cmd << '-i' << self.key if self.key
-		cmd << '-l' << self.user
-		cmd << fqdn
-		cmd.flatten!
-		self.log.debug "Running SSH command with: %p" % [ Shellwords.shelljoin(cmd) ]
-
-		parent_reader, child_writer = IO.pipe
-		child_reader, parent_writer = IO.pipe
-
-		pid = spawn( *cmd, :out => child_writer, :in => child_reader, :close_others => true )
-		child_writer.close
-		child_reader.close
-
-		self.log.debug "Yielding back to the run block."
-		@output = yield( parent_reader, parent_writer )
-		self.log.debug "  run block done."
-
-		pid, status = Process.waitpid2( pid )
-		return status
-	end
-
-
-end # class Symphony::Task::SSH
-

lib/symphony/tasks/sshscript.rb

-#!/usr/bin/env ruby
-
-require 'net/ssh'
-require 'net/sftp'
-require 'tmpdir'
-require 'inversion'
-require 'symphony/task' unless defined?( Symphony::Task )
-
-
-# A task to execute a script on a remote host via SSH.
-class Symphony::Task::SSHScript < Symphony::Task
-	extend Loggability,
-	       MethodUtilities
-
-	# Loggability API -- Log to symphony's logger
-	log_to :symphony
-
-
-	# Template config
-	TEMPLATE_OPTS = {
-		:ignore_unknown_tags => false,
-		:on_render_error     => :propagate,
-		:strip_tag_lines     => true
-	}
-
-	# The defaults to use when connecting via SSH
-	DEFAULT_SSH_OPTIONS = {
-		:auth_methods            => [ "publickey" ],
-		:compression             => true,
-		:config                  => false,
-		:keys_only               => true,
-		# :logger                  => Loggability[ Net::SSH ],
-		:paranoid                => false,
-		:timeout                 => 10.seconds,
-		# :verbose                 => :debug,
-		:global_known_hosts_file => '/dev/null',
-		:user_known_hosts_file   => '/dev/null',
-	}
-
-
-	### Create a new SSH task for the given +job+ and +queue+.
-	def initialize( queue, job )
-		super
-
-		# required arguments
-		@hostname   = self.options[:hostname] or raise ArgumentError, "no hostname specified"
-		@template   = self.options[:template] or raise ArgumentError, "no script template specified"
-		@key        = self.options[:key]      or raise ArgumentError, "no private key specified"
-
-		# optional arguments
-		@port       = self.options[:port] || 22
-		@user       = self.options[:user] || 'root'
-		@attributes = self.options[:attributes] || {}
-		@nocleanup  = self.options[:nocleanup] ? true : false
-	end
-
-
-	######
-	public
-	######
-
-	# The name of the host to connect to
-	attr_reader :hostname
-
-	# The path to the script template
-	attr_accessor :template
-
-	# The path to the SSH key to use for auth
-	attr_reader :key
-
-	# The SSH port to use
-	attr_reader :port
-
-	# The user to connect as
-	attr_reader :user
-
-	# Attributes that will be set on the script template.
-	attr_reader :attributes
-
-	# Flag that will cause the uploaded script to not be cleaned up after running. Useful for
-	# diagnostics.
-	attr_reader :nocleanup
-
-
-	### Load the script as an Inversion template, sending and executing
-	### it on the remote host.
-	def run
-		fqdn = self.expand_hostname( self.hostname ).
-			find {|hostname| self.ping(hostname, self.port) }
-
-		unless fqdn
-			self.log.debug "Unable to find an on-network host for %s:%d" %
-				[ self.hostname, self.port ]
-			return
-		end
-
-		remote_filename = self.make_remote_filename
-		source = self.generate_script
-
-		# Establish the SSH connection
-		ssh_options = DEFAULT_SSH_OPTIONS.merge( :port => self.port, :keys => [self.key] )
-		self.with_timeout do
-			Net::SSH.start( fqdn, self.user, ssh_options ) do |conn|
-				self.upload_script( conn, source, remote_filename )
-				self.run_script( conn, remote_filename )
-			end
-		end
-	end
-
-
-	#########
-	protected
-	#########
-
-	### Return a human-readable description of details of the task.
-	def description
-		return "Running script '%s' on '%s:%d' as '%s'" % [
-			File.basename( self.template ),
-			self.hostname,
-			self.port,
-			self.user,
-		]
-	end
-
-
-	### Generate a unique filename for the script on the remote host.
-	def make_remote_filename
-		template = self.template
-		basename = File.basename( template, File.extname(template) )
-
-		tmpname = Dir::Tmpname.make_tmpname( basename, Process.pid )
-
-		return "/tmp/#{tmpname}"
-	end
-
-
-	### Generate a script by loading the script template, populating it with
-	### attributes, and rendering it.
-	def generate_script
-		tmpl = Inversion::Template.load( self.template, TEMPLATE_OPTS )
-
-		tmpl.attributes.merge!( self.attributes )
-		tmpl.task   = self
-
-		return tmpl.render
-	end
-
-	### Render the given +template+ as script source, then use the specified +conn+ object
-	### to upload it.
-	def upload_script( conn, source, remote_filename )
-		self.log.debug "Uploading script (%d bytes) to %s:%s." %
-			[ source.bytesize, self.hostname, remote_filename ]
-		conn.sftp.file.open( remote_filename, "w", 0755 ) do |fh|
-			fh.print( source )
-		end
-		self.log.debug "  done with the upload."
-	end
-
-
-	### Run the script on the remote host.
-	def run_script( conn, remote_filename )
-		output = conn.exec!( remote_filename )
-		self.log.debug "Output was:\n#{output}"
-		conn.exec!( "rm #{remote_filename}" ) unless self.nocleanup
-	end
-
-end # class Symphony::Task::SSHScript
-