Commits

Michael Granger  committed 796ecc9

Checkpoint of work on the Daemon

  • Participants
  • Parent commits cf61ced

Comments (0)

Files changed (5)

 $LOAD_PATH.unshift( 'lib' )
 
 begin
-	require 'groundcontrol'
+	require 'symphony'
 
 	Loggability.level = :debug
 	Loggability.format_with( :color )
 
-	GroundControl.load_config( 'etc/config.yml' )
+	Symphony.load_config( 'etc/config.yml' )
 
 rescue Exception => e
-	$stderr.puts "Ack! groundcontrol libraries failed to load: #{e.message}\n\t" +
+	$stderr.puts "Ack! symphony libraries failed to load: #{e.message}\n\t" +
 		e.backtrace.join( "\n\t" )
 end
 

File bin/symphony

 #!/usr/bin/env ruby
 
 require 'symphony'
-require 'symphony/worker_daemon'
+require 'symphony/daemon'
 
 Encoding.default_internal = Encoding::UTF_8
 Symphony::Daemon.run( ARGV )
-

File lib/symphony/daemon.rb

 
 require 'configurability'
 require 'loggability'
-require 'fcntl'
-require 'trollop'
 
 require 'symphony' unless defined?( Symphony )
-require 'symphony/worker'
 require 'symphony/task'
+require 'symphony/signal_handling'
 
-
-# The Symphony worker daemon. Watches a Symphony job queue, and runs the tasks
-# contained in the jobs it fetches.
+# A daemon which manages startup and shutdown of one or more Workers
+# running Tasks as they are published from a queue.
 class Symphony::Daemon
 	extend Loggability,
-	       Configurability
+	       Configurability,
+	       Symphony::MethodUtilities
 
 	include Symphony::SignalHandling
 
 	log_to :symphony
 
 	# Configurability API -- use the 'worker_daemon' section of the config
-	config_key :worker_daemon
+	config_key :symphony
 
 
+	# Default configuration
+	CONFIG_DEFAULTS = {
+		throttle_max:    16,
+		throttle_factor: 1,
+		tasks: []
+	}
+
 	# Signals we understand
 	QUEUE_SIGS = [
-		:QUIT, :INT, :TERM, :HUP,
+		:QUIT, :INT, :TERM, :HUP, :CHLD,
 		# :TODO: :WINCH, :USR1, :USR2, :TTIN, :TTOU
 	]
 
-	# The maximum throttle value caused by failing workers
-	THROTTLE_MAX = 16
-
-	# The factor which controls how much incrementing the throttle factor
-	# affects the pause between workers being started.
-	THROTTLE_FACTOR = 2
 
 
 	#
 	# Class methods
 	#
 
+	##
+	# The maximum throttle factor caused by failing workers
+	singleton_attr_accessor :throttle_max
+
+	##
+	# The factor which controls how much incrementing the throttle factor
+	# affects the pause between workers being started.
+	singleton_attr_accessor :throttle_factor
+
+	##
+	# The Array of Symphony::Task classes that are configured to run
+	singleton_attr_accessor :tasks
+
+
 	### Get the daemon's version as a String.
 	def self::version_string( include_buildnum=false )
 		vstring = "%s %s" % [ self.name, Symphony::VERSION ]
 	end
 
 
+	### Configurability API -- configure the daemon.
+	def self::configure( config=nil )
+		config = self.defaults.merge( config || {} )
+
+		self.throttle_max    = config[:throttle_max]
+		self.throttle_factor = config[:throttle_factor]
+
+		self.tasks = self.load_configured_tasks( config[:tasks] )
+	end
+
+
+	### Load the tasks with the specified +task_names+ and return them
+	### as an Array.
+	def self::load_configured_tasks( task_names )
+		return task_names.map do |task_name|
+			Symphony::Task.get_subclass( task_name )
+		end
+	end
+
+
 	### Start the daemon.
-	def self::run( argv )
+	def self::run( args )
 		Loggability.format_with( :color ) if $stdout.tty?
 
-		progname = File.basename( $0 )
-		opts = Trollop.options do
-			banner "Usage: #{progname} OPTIONS"
-			version self.version_string( true )
-
-			opt :config, "The config file to load instead of the default",
-				:type => :string
-			opt :crew_size, "Number of workers to maintain.", :default => DEFAULT_CREW_SIZE
-			opt :queue, "The name of the queue to monitor.", :default => '_default_'
-
-			opt :debug, "Turn on debugging output."
-		end
-
 		# Turn on debugging if it's enabled
-		if opts.debug
-			$DEBUG = true
-			Loggability.level = :debug
-		end
+		Loggability.level = :debug if $DEBUG
 
 		# Now load the config file
-		Symphony.load_config( opts.config )
+		Symphony.load_config( args.shift )
 
 		# Re-enable debug-level logging if the config reset it
-		Loggability.level = :debug if opts.debug
+		Loggability.level = :debug if $DEBUG
 
 		# And start the daemon
-		self.new( opts ).run
+		self.new.run
 	end
 
 
 	#
 
 	### Create a new Daemon instance.
-	def initialize( options )
-		@options             = options
-		@queue               = Symphony::Queue.new( options.queue )
+	def initialize
+		# Process control
+		@tasks               = self.class.tasks
 
-		# Process control
-		@crew_size           = options.crew_size
-		@crew_workers        = []
+		@running_tasks       = {}
 		@running             = false
 		@shutting_down       = false
 		@throttle            = 0
 	public
 	######
 
-	# The Array of PIDs of currently-running workers
-	attr_reader :crew_workers
-
-	# The maximum number of children to have running at any given time
-	attr_reader :crew_size
+	# The Hash of PIDs to task class
+	attr_reader :running_tasks
 
 	# A self-pipe for deferred signal-handling
 	attr_reader :selfpipe
 
 	### Set up the daemon and start running.
 	def run
-		self.log.info "Starting worker supervisor"
+		self.log.info "Starting task daemon"
 
 		# Become session leader if we can
 		if Process.euid.zero?
 		self.set_signal_traps( *QUEUE_SIGS )
 
 		# Listen for new jobs and handle them as they come in
-		self.start_handling_jobs
+		self.run_tasks
 
 		# Restore the default signal handlers
 		self.reset_signal_traps( *QUEUE_SIGS )
 
 	### The main loop of the daemon -- wait for signals, children dying, or jobs, and
 	### take appropriate action.
-	def start_handling_jobs
+	def run_tasks
 		@running = true
 
 		self.log.debug "Starting supervisor loop..."
 		while self.running?
 			self.start_missing_children unless self.shutting_down?
-
-			timeout = self.throttle_seconds
-			timeout = nil if timeout.zero?
-
 			self.wait_for_signals
 			self.reap_children
 		end
-		self.log.info "Supervisor job loop done."
 
 	rescue => err
 		self.log.fatal "%p in job-handler loop: %s" % [ err.class, err.message ]
 		self.log.debug { '  ' + err.backtrace.join("\n  ") }
 
 	ensure
+		self.log.info "Done running tasks."
 		@running = false
 		self.stop
 	end
 
 		self.log.warn "Stopping children."
 		3.times do |i|
-			self.reap_children( *self.crew_workers )
+			self.reap_children
 			sleep( 1 )
 			self.kill_children
 			sleep( 1 )
-			break if self.crew_workers.empty?
+			break if self.running_tasks.empty?
 			sleep( 1 )
-		end unless self.crew_workers.empty?
+		end unless self.running_tasks.empty?
 
 		# Give up on our remaining children.
 		Signal.trap( :CHLD, :IGNORE )
-		if !self.crew_workers.empty?
-			self.log.warn "  %d workers remain: sending KILL" % [ self.crew_workers.length ]
+		if !self.running_tasks.empty?
+			self.log.warn "  %d workers remain: sending KILL" % [ self.running_tasks.length ]
 			self.kill_children( :KILL )
 		end
 	end
 
 	### Handle signals.
 	def handle_signal( sig )
-		self.log.debug "Handling signal %s" % [ sig ]
+		self.log.debug "Handling signal %s in PID %d" % [ sig, Process.pid ]
 		case sig
 		when :INT, :TERM
 			if @running
-				self.log.warn "%s signal: immediate shutdown" % [ sig ]
+				self.log.warn "%s signal: graceful shutdown" % [ sig ]
 				@running = false
 			else
 				self.ignore_signals
 			self.reload_config
 
 		when :CHLD
+			self.log.warn "Got SIGCHLD."
 			# Just need to wake up, nothing else necessary
 
 		else
 	end
 
 
-	### Fill out the work crew with new children if necessary
+	### Start any tasks which aren't already running
 	def start_missing_children
-		missing_count = self.crew_size - self.crew_workers.length
-		return unless missing_count > 0
+		missing_tasks = self.class.tasks - self.running_tasks.values
+		return if missing_tasks.empty?
 
 		# Return unless the throttle period has lapsed
 		unless self.throttle_seconds < (Time.now - @last_child_started)
 			return
 		end
 
-		self.log.debug "Starting %d workers for a crew of %d" % [ missing_count, self.crew_size ]
-		missing_count.times do |i|
-			pid = self.start_worker
-			self.log.debug "  started worker %d" % [ pid ]
-			self.crew_workers << pid
+		self.log.debug "Starting %d tasks out of %d" % [ missing_tasks.size, self.class.tasks.size ]
+		missing_tasks.each do |task_class|
+			pid = self.start_worker( task_class )
+			self.log.debug "  started task %p at pid %d" % [ task_class, pid ]
+			self.running_tasks[ pid ] = task_class
 		end
 
 		@last_child_started = Time.now
 	### Return the number of seconds between child startup times.
 	def throttle_seconds
 		return 0 unless @throttle.nonzero?
-		return Math.log( @throttle ) * THROTTLE_FACTOR
+		return Math.log( @throttle ) * self.class.throttle_factor
 	end
 
 
 		self.log.debug "Adjusting worker throttle by %d" % [ adjustment ]
 		@throttle += adjustment
 		@throttle = 0 if @throttle < 0
-		@throttle = THROTTLE_MAX if @throttle > THROTTLE_MAX
+		@throttle = self.class.throttle_max if @throttle > self.class.throttle_max
 	end
 
 
 	### Kill all current children with the specified +signal+. Returns +true+ if the signal was
 	### sent to one or more children.
 	def kill_children( signal=:TERM )
-		return false if self.crew_workers.empty?
+		return false if self.running_tasks.empty?
 
-		self.log.info "Sending %s signal to %d workers: %p." %
-			 [ signal, self.crew_workers.length, self.crew_workers ]
-		Process.kill( signal, *self.crew_workers )
+		self.log.info "Sending %s signal to %d task pids: %p." %
+			 [ signal, self.running_tasks.length, self.running_tasks.keys ]
+		Process.kill( signal, *self.running_tasks.keys )
 
 		return true
 	rescue Errno::ESRCH
 	end
 
 
-	### Start a new Symphony::Worker and return its PID.
-	def start_worker
+	### Start a new Symphony::Task and return its PID.
+	def start_worker( task_class )
 		return if self.shutting_down?
-		self.log.debug "Starting a worker."
-		return Symphony::Worker.start( self.queue )
+		self.log.debug "Starting a %p." % [ task_class ]
+		return Process.fork do
+			self.reset_signal_traps( *QUEUE_SIGS )
+			@selfpipe.each {|_,io| io.close }.clear
+			task_class.run
+		end
 	end
 
 
 
 
 	### Reap any children that have died within the caller's process group
-	### and remove them from the work crew.
+	### and remove them from the Hash of running tasks.
 	def reap_any_child
 		self.log.debug "  no pids; waiting on any child in this process group"
 
-		pid, status = Process.waitpid2( -1, Process::WNOHANG )
+		pid, status = Process.waitpid2( -1, Process::WNOHANG|Process::WUNTRACED )
+		self.log.debug "  waitpid2 returned: [ %p, %p ]" % [ pid, status ]
 		while pid
 			self.adjust_throttle( status.success? ? -1 : 1 )
 			self.log.debug "Child %d exited: %p." % [ pid, status ]
-			self.crew_workers.delete( pid )
+			self.running_tasks.delete( pid )
 
-			pid, status = Process.waitpid2( -1, Process::WNOHANG )
+			pid, status = Process.waitpid2( -1, Process::WNOHANG|Process::WUNTRACED )
+			self.log.debug "  waitpid2 returned: [ %p, %p ]" % [ pid, status ]
 		end
 	end
 
 
 	### Wait on the child associated with the given +pid+, deleting it from the
-	### crew workers if successful.
+	### running tasks Hash if successful.
 	def reap_specific_child( pid )
 		spid, status = Process.waitpid2( pid )
 		if spid
 			self.log.debug "Child %d exited: %p." % [ spid, status ]
-			self.crew_workers.delete( spid )
+			self.running_tasks.delete( spid )
 			self.adjust_throttle( status.success? ? -1 : 1 )
 		else
 			self.log.debug "Child %d no reapy." % [ pid ]

File lib/symphony/signal_handling.rb

 	def ignore_signals( *signals )
 		self.log.debug "Ignoring signals."
 		signals.each do |sig|
+			next if sig == :CHLD
 			Signal.trap( sig, :IGNORE )
 		end
 	end

File spec/symphony/task_spec.rb

 
 	context "a concrete subclass" do
 
-		before( :each ) do
-			@task_class = Class.new( described_class ) do
+		let( :task_class ) do
+			Class.new( described_class ) do
 				def self::name; 'ACME::TestingTask'; end
 			end
 		end
+		let( :queue ) do
+			Symphony::Queue.for_task( task_class )
+		end
 
 
 		it "raises an exception if run without specifying any subscriptions" do
-			expect { @task_class.run }.to raise_error( ScriptError, /no subscriptions/i )
+			expect { task_class.run }.to raise_error( ScriptError, /no subscriptions/i )
 		end
 
 
 		it "can set an explicit queue name" do
-			@task_class.queue_name( 'happy.fun.queue' )
-			expect( @task_class.queue_name ).to eq( 'happy.fun.queue' )
+			task_class.queue_name( 'happy.fun.queue' )
+			expect( task_class.queue_name ).to eq( 'happy.fun.queue' )
 		end
 
 
 		it "can retry on timeout instead of rejecting" do
-			@task_class.timeout_action( :retry )
-			expect( @task_class.timeout_action ).to eq( :retry )
+			task_class.timeout_action( :retry )
+			expect( task_class.timeout_action ).to eq( :retry )
 		end
 
 
 		it "provides a default name for its queue based on its name" do
-			expect( @task_class.queue_name ).to eq( 'acme.testingtask' )
+			expect( task_class.queue_name ).to eq( 'acme.testingtask' )
 		end
 
 
 		it "can declare a pattern to use when subscribing" do
-			@task_class.subscribe_to( 'foo.test' )
-			expect( @task_class.routing_keys ).to include( 'foo.test' )
+			task_class.subscribe_to( 'foo.test' )
+			expect( task_class.routing_keys ).to include( 'foo.test' )
 		end
 
 
 		it "has acknowledgements enabled by default" do
-			expect( @task_class.acknowledge ).to eq( true )
+			expect( task_class.acknowledge ).to eq( true )
 		end
 
 
 		it "can enable acknowledgements" do
-			@task_class.acknowledge( true )
-			expect( @task_class.acknowledge ).to eq( true )
+			task_class.acknowledge( true )
+			expect( task_class.acknowledge ).to eq( true )
 		end
 
 
 		it "can disable acknowledgements" do
-			@task_class.acknowledge( false )
-			expect( @task_class.acknowledge ).to eq( false )
+			task_class.acknowledge( false )
+			expect( task_class.acknowledge ).to eq( false )
 		end
 
 
 		it "can set a timeout" do
-			@task_class.timeout( 10 )
-			expect( @task_class.timeout ).to eq( 10 )
+			task_class.timeout( 10 )
+			expect( task_class.timeout ).to eq( 10 )
 		end
 
 
 		it "can declare a one-shot work model" do
-			@task_class.work_model( :oneshot )
-			expect( @task_class.work_model ).to eq( :oneshot )
+			task_class.work_model( :oneshot )
+			expect( task_class.work_model ).to eq( :oneshot )
 		end
 
 
 		it "can declare a long-lived work model" do
-			@task_class.work_model( :longlived )
-			expect( @task_class.work_model ).to eq( :longlived )
+			task_class.work_model( :longlived )
+			expect( task_class.work_model ).to eq( :longlived )
 		end
 
 
 		it "raises an error if an invalid work model is declared " do
 			expect {
-				@task_class.work_model( :lazy )
+				task_class.work_model( :lazy )
 			}.to raise_error( /unknown work_model/i )
 		end
 
 
+		context "an instance" do
+
+			let( :task ) { task_class.new(queue) }
+
+
+			it "raises an exception if it doesn't declare a #work method" do
+				expect {
+					task.work( 'payload', {} )
+				}.to raise_error( NotImplementedError, /#work/ )
+			end
+
+
+			it "sets signal handlers and waits for messages when started"
+
+		end
+
 	end
 
 end