Commits

Michael Granger  committed 263e486

Checkpoint commit:
* Moved inline functions into the header.
* Finished implementation of Verse::Session#destroy_node
* Made logging use the color outputter.
* Added Verse::Server#shutdown

  • Participants
  • Parent commits 33f0e3f

Comments (0)

Files changed (12)

 docs/api
 ChangeLog
 \.DS_Store
+^hostid.rsa$
 	ptr->tag_groups = rb_ary_new();
 	ptr->session    = Qnil;
 
-	rbverse_log( "debug", "allocated a rbverse_SESSION <%p>", ptr );
+	DEBUGMSG( "allocated a rbverse_SESSION <%p>", ptr );
 	return ptr;
 }
 
 static void
 rbverse_node_gc_free( struct rbverse_node *ptr ) {
 	if ( ptr ) {
-		DEBUGMSG( "Freeing node 0x%p\n", ptr );
+		DEBUGMSG( "Freeing node 0x%p", ptr );
 
 		if ( ptr->id != ~0 ) {
-			DEBUGMSG( "  removing node ID %d from node map\n", ptr->id );
+			DEBUGMSG( "  removing node ID %d from node map", ptr->id );
 			st_delete_safe( node_map, (st_data_t*)&ptr->id, 0, Qundef );
 			/* TODO: Do I need to do st_cleanup or anything after deletion? */
 		}
 
 		/* Call the node-specific free function if there is one */
 		if ( ptr->type < V_NT_NUM_TYPES ) {
-			DEBUGMSG( "  free function %p for node type %d\n",
+			DEBUGMSG( "  free function %p for node type %d",
 			        node_free_funcs[ptr->type], ptr->type );
 			node_free_funcs[ptr->type]( ptr );
 		}
 
-		DEBUGMSG( "  clearing struct.\n" );
+		DEBUGMSG( "  clearing struct." );
 		ptr->id         = ~0;
 		ptr->type       = V_NT_SYSTEM;
 		ptr->owner      = VN_OWNER_OTHER;
 		xfree( ptr );
 		ptr = NULL;
 
-		DEBUGMSG( "  done.\n" );
+		DEBUGMSG( "  done." );
 	}
 }
 
 }
 
 
-/* 
- * Check the "alive"-ness of the given +node+, raising an exception if it's either not part
- * of a session or has been destroyed.
- */
-static inline void
-rbverse_ensure_node_is_alive( struct rbverse_node *node ) {
-	if ( !node )
-		rb_fatal( "node pointer was NULL!" );
-	if ( !RTEST(node->session) )
-		rb_raise( rbverse_eVerseNodeError, "node is not associated with an active session" );
-	if ( node->destroyed )
-		rb_raise( rbverse_eVerseNodeError, "node is destroyed" );
-}
-
-
-
 
 /* --------------------------------------------------------------
  * Class Methods

File ext/session.c

 	ptr->create_callbacks  = Qnil;
 	ptr->destroy_callbacks = Qnil;
 
-	rbverse_log( "debug", "allocated a rbverse_SESSION <%p>", ptr );
+	DEBUGMSG( "allocated a rbverse_SESSION <%p>", ptr );
 	return ptr;
 }
 
 static VALUE
 rbverse_verse_session_s_update_body( void *ptr ) {
 	uint32 *microseconds = (uint32 *)ptr;
+	DEBUGMSG( "  calling verse_callback_update( %d ).", *microseconds );
 	verse_callback_update( *microseconds );
 	return Qtrue;
 }
 		microseconds = DEFAULT_UPDATE_TIMEOUT;
 
 	slice = microseconds / ( session_table->num_entries + 1 );
-	rbverse_log( "debug", "Update timeslice is %d µs", slice );
+	DEBUGMSG( "Update timeslice is %d µs", slice );
 
-	rbverse_log( "debug", "  updating the global session" );
+	DEBUGMSG( "  updating the global session" );
 	verse_session_set( 0 );
 	rb_thread_blocking_region( rbverse_verse_session_s_update_body, (uint32 *)&slice,
 		RUBY_UBF_IO, NULL );
 
 	if ( session_table->num_entries ) {
-		rbverse_log( "debug", "  updating %d client sessions", session_table->num_entries );
+		DEBUGMSG( "  updating %d client sessions", session_table->num_entries );
 		st_foreach( session_table, rbverse_verse_session_s_update_i, (st_data_t)slice );
 	} else {
-		rbverse_log( "debug", "  no client sessions to update" );
+		DEBUGMSG( "  no client sessions to update" );
 	}
 
 	return Qtrue;
 }
 
 
+
+/* Synchronized portion of rbverse_verse_session_destroy_node() */
+static VALUE
+rbverse_verse_session_destroy_node_l( VALUE node ) {
+	VNodeID node_id = (VNodeID)node;
+	verse_send_node_destroy( node_id );
+	return Qtrue;
+}
+
+
 /*
  * call-seq:
  *    session.destroy_node( node ) {|node| ... }
 static VALUE
 rbverse_verse_session_destroy_node( int argc, VALUE *argv, VALUE self ) {
 	struct rbverse_session *session = rbverse_get_session( self );
-	VALUE nodeclass, callback, callback_queue;
+	VALUE nodeobj, callback;
+	struct rbverse_node *node;
 
 	if ( !session->id )
-		rb_raise( rbverse_eVerseSessionError, "can't create a node via an unconnected session" );
+		rb_raise( rbverse_eVerseSessionError,
+		          "can't destroy a node via an unconnected session" );
 
-	rb_scan_args( argc, argv, "1&", &nodeclass, &callback );
-	callback_queue = rb_hash_aref( session->create_callbacks, nodeclass );
+	/* Unwrap the arguments and check the node object for sanity */
+	rb_scan_args( argc, argv, "1&", &nodeobj, &callback );
+	node = rbverse_get_node( nodeobj );
+	rbverse_ensure_node_is_alive( node );
 
-	Check_Type( nodeclass, T_CLASS );
+	/* Use a no-op destruction callback if none was specified. */
+	if ( !RTEST(callback) ) callback = rb_block_proc();
 
-	if ( !RTEST(callback) )
-		callback = rb_block_proc();
-	if ( !RTEST(callback_queue) )
-		rb_raise( rbverse_eVerseSessionError,
-		          "don't know how to create %s objects (no callback queue)", 
-		          rb_class2name(nodeclass) );
+	/* Add the destruction callback for the node */
+	rb_hash_aset( session->destroy_callbacks, nodeobj, callback );
 
-	/* Add the callback to the queue. This isn't done inside the lock as
-	 * we don't really care if the nodes are created strictly in the order
-	 * in which they're created. */
-	rb_ary_push( callback_queue, callback );
-	return rbverse_with_session_lock( self, rbverse_verse_session_create_node_l, nodeclass );
+	/* Cast: VNodeID -> VALUE */
+	return rbverse_with_session_lock( self, rbverse_verse_session_destroy_node_l, (VALUE)node->id );
 }
 
 

File ext/verse_ext.c

 	RARRAY_PTR(cb_args)[2] = rb_str_new2( args[2] );
 	RARRAY_PTR(cb_args)[3] = rbverse_host_id2str( (const uint8 *)args[3] );
 
+	DEBUGMSG( "Calling the cb_connect iterator." );
 	rb_block_call( observers, rb_intern("each"), 0, 0, rbverse_cb_connect_i, cb_args );
 
 	return NULL;
                     const uint8 *expected_host_id )
 {
 	const char *(args[4]) = { name, pass, address, (const char *)expected_host_id };
-	DEBUGMSG( " Acquiring GVL for 'connect' event.\n" );
-	fflush( stdout );
+	DEBUGMSG( "*** Acquiring GVL for 'connect' event. ***" );
 	rb_thread_call_with_gvl( rbverse_cb_connect_body, args );
 }
 
 	rb_define_singleton_method( rbverse_mVerse, "port=", rbverse_verse_port_eq, 1 );
 	rb_define_alias( rb_singleton_class(rbverse_mVerse), "connect_port=", "port=" );
 	rb_define_singleton_method( rbverse_mVerse, "create_host_id", rbverse_verse_create_host_id, 0 );
+	rb_define_alias( CLASS_OF(rbverse_mVerse), "make_host_id", "create_host_id" );
 	rb_define_singleton_method( rbverse_mVerse, "host_id=", rbverse_verse_host_id_eq, 1 );
 	rb_define_singleton_method( rbverse_mVerse, "connect_accept", rbverse_verse_connect_accept, 3 );
 	rb_define_singleton_method( rbverse_mVerse, "ping", rbverse_verse_ping, 2 );

File ext/verse_ext.h

 #endif /* !RUBY_VM */
 
 #ifdef DEBUG
-#	define DEBUGMSG(format, args...) fprintf( stderr, "\033[31m"format"\033[0m", ##args );
+#	define DEBUGMSG(format, args...) fprintf( stderr, "\033[37mDEBUG: "format"\033[0m\n", ##args );
 #else
 #	define DEBUGMSG(format, args...)
 #endif
 extern void ( *node_free_funcs[] )(struct rbverse_node *);
 
 
-
 /* --------------------------------------------------------------
  * Macros
  * -------------------------------------------------------------- */
 #define DEFAULT_ADDRESS "127.0.0.1"
 #define DEFAULT_UPDATE_TIMEOUT 100000
 
+
+/* --------------------------------------------------------------
+ * Inline functions
+ * -------------------------------------------------------------- */
+
+/* 
+ * Check the "alive"-ness of the given +node+, raising an exception if it's either not part
+ * of a session or has been destroyed.
+ */
+static inline void
+rbverse_ensure_node_is_alive( struct rbverse_node *node ) {
+	if ( !node )
+		rb_fatal( "node pointer was NULL!" );
+	if ( !RTEST(node->session) )
+		rb_raise( rbverse_eVerseNodeError, "node is not associated with an active session" );
+	if ( node->destroyed )
+		rb_raise( rbverse_eVerseNodeError, "node is destroyed" );
+}
+
+
 /* --------------------------------------------------------------
  * Declarations
  * -------------------------------------------------------------- */

File lib/verse/mixins.rb

 
 	end
 
+
+	### A collection of ANSI color utility functions
+	module ANSIColorUtilities
+
+		# Set some ANSI escape code constants (Shamelessly stolen from Perl's
+		# Term::ANSIColor by Russ Allbery <rra@stanford.edu> and Zenin <zenin@best.com>
+		ANSI_ATTRIBUTES = {
+			'clear'      => 0,
+			'reset'      => 0,
+			'bold'       => 1,
+			'dark'       => 2,
+			'underline'  => 4,
+			'underscore' => 4,
+			'blink'      => 5,
+			'reverse'    => 7,
+			'concealed'  => 8,
+
+			'black'      => 30,   'on_black'   => 40,
+			'red'        => 31,   'on_red'     => 41,
+			'green'      => 32,   'on_green'   => 42,
+			'yellow'     => 33,   'on_yellow'  => 43,
+			'blue'       => 34,   'on_blue'    => 44,
+			'magenta'    => 35,   'on_magenta' => 45,
+			'cyan'       => 36,   'on_cyan'    => 46,
+			'white'      => 37,   'on_white'   => 47
+		}
+
+		###############
+		module_function
+		###############
+
+		### Create a string that contains the ANSI codes specified and return it
+		def ansi_code( *attributes )
+			attributes.flatten!
+			attributes.collect! {|at| at.to_s }
+			return '' unless /(?:vt10[03]|xterm(?:-color)?|linux|screen)/i =~ ENV['TERM']
+			attributes = ANSI_ATTRIBUTES.values_at( *attributes ).compact.join(';')
+
+			if attributes.empty?
+				return ''
+			else
+				return "\e[%sm" % attributes
+			end
+		end
+
+
+		### Colorize the given +string+ with the specified +attributes+ and return it, handling 
+		### line-endings, color reset, etc.
+		def colorize( *args )
+			string = ''
+
+			if block_given?
+				string = yield
+			else
+				string = args.shift
+			end
+
+			ending = string[/(\s)$/] || ''
+			string = string.rstrip
+
+			return ansi_code( args.flatten ) + string + ansi_code( 'reset' ) + ending
+		end
+
+	end # module ANSIColorUtilities
+
+
 end # module Verse
 
 # vim: set nosta noet ts=4 sw=4:

File lib/verse/server.rb

 
 # Basic Verse server object class
 class Verse::Server
-	include Singleton,
-	        Verse::Loggable,
+	include Verse::Loggable,
 	        Verse::PingObserver,
-	        Verse::ConnectionObserver
+	        Verse::ConnectionObserver,
+	        Verse::SessionObserver
 
 	# The path to the server's saved hostid
 	HOSTID_FILE = Pathname( 'hostid.rsa' )
 		@host_id      = self.load_host_id
 		@connections = {}
 		@nodes       = {}
+
+		@state       = :stopped
 	end
 
 
 	### Start listening for events.
 	def run
 		self.observe( Verse )
+		self.log.info "Starting run loop"
+
+		@state = :running
+		Verse::Session.update until @state != :running
+	end
+
+
+	### Shut the server down using the specified +reason+.
+	def shutdown( reason="No reason given." )
+		self.reset_signal_handlers
+		self.log.warn "Server shutdown: #{reason}"
+
+		self.stop_observing( Verse )
+		@connections.keys.each do |addr|
+			conn = @connections.delete( addr )
+			self.stop_observing( conn.session )
+			Verse.terminate_connection( addr, reason )
+		end
+
+		@state = :shutdown
 	end
 
 
 
 	### Set up signal handlers to restart/shutdown the server.
 	def reset_signal_handlers
-		Signal.trap( :INT, 'IGN' )
-		Signal.trap( :TERM, 'IGN' )
-		Signal.trap( :HUP, 'IGN' )
+		Signal.trap( :INT, 'DEFAULT' )
+		Signal.trap( :TERM, 'DEFAULT' )
+		Signal.trap( :HUP, 'DEFAULT' )
 	end
 
 
 		if HOSTID_FILE.exist?
 			return HOSTID_FILE.read
 		else
-			hostid = Verse.make_host_id
+			hostid = Verse.create_host_id
 			HOSTID_FILE.open( File::WRONLY|File::CREAT|File::EXCL, 0600 ) do |ofh|
 				ofh.write( hostid )
 			end
 	end
 
 
+	### Remove a node from the server.
+	def remove_node( node )
+		
+	end
+
+
 	#
 	# ConnectionObserver API
 	#
 
 	### Receive a connect event from a client. 
-	def on_connect( user, pass, address, expected_host_id )
-		return unless expected_host_id.nil? || expected_host_id == self.host_id
+	def on_connect( user, pass, address, expected_host_id=nil )
+		self.log.info "Connect: %s@%s" % [ user, address ]
+		unless expected_host_id.nil? || expected_host_id == self.host_id
+			self.log.warn "  connection expected a different hostid. Ignoring connection."
+			return
+		end
 
 		# Create the session's avatar
+		self.log.debug "  creating %s's avatar" % [ user ]
 		avatar = Verse::ObjectNode.new
+		self.log.debug "  adding avatar: %p" % [ avatar ]
 		self.add_node( avatar )
 
+		self.log.debug "  sending connect_accept back to %p" % [ address ]
 		session = Verse.connect_accept( avatar, address, self.host_id )
+		self.observe( session )
 		connection = Verse::Server::Connection.new( address, user, session, avatar )
 
+		self.log.debug "  adding the connection (total: %d)" % [ self.connections.length + 1 ]
 		self.connections[ address ] = connection
 	end
 

File lib/verse/utils.rb

 	end # class LogFormatter
 
 
+	# A ANSI-colorized formatter for Logger instances.
+	# @private
+	class ColorLogFormatter < Logger::Formatter
+		extend Verse::ANSIColorUtilities
+
+		# Color settings
+		LEVEL_FORMATS = {
+			:debug => colorize( :bold, :black ) {"[%1$s.%2$06d %3$d/%4$s] %5$5s {%6$s} -- %7$s\n"},
+			:info  => colorize( :normal ) {"[%1$s.%2$06d %3$d/%4$s] %5$5s -- %7$s\n"},
+			:warn  => colorize( :bold, :yellow ) {"[%1$s.%2$06d %3$d/%4$s] %5$5s -- %7$s\n"},
+			:error => colorize( :red ) {"[%1$s.%2$06d %3$d/%4$s] %5$5s -- %7$s\n"},
+			:fatal => colorize( :bold, :red, :on_white ) {"[%1$s.%2$06d %3$d/%4$s] %5$5s -- %7$s\n"},
+		}
+
+
+		### Initialize the formatter with a reference to the logger so it can check for log level.
+		def initialize( logger, settings={} ) # :notnew:
+			settings = LEVEL_FORMATS.merge( settings )
+
+			@logger   = logger
+			@settings = settings
+
+			super()
+		end
+
+		######
+		public
+		######
+
+		# The Logger object associated with the formatter
+		attr_accessor :logger
+
+		# The formats, by level
+		attr_accessor :settings
+
+
+		### Log using the format associated with the severity
+		def call( severity, time, progname, msg )
+			args = [
+				time.strftime( '%Y-%m-%d %H:%M:%S' ),                         # %1$s
+				time.usec,                                                    # %2$d
+				Process.pid,                                                  # %3$d
+				Thread.current == Thread.main ? 'main' : Thread.object_id,    # %4$s
+				severity,                                                     # %5$s
+				progname,                                                     # %6$s
+				msg                                                           # %7$s
+			]
+
+			return self.settings[ severity.downcase.to_sym ] % args
+		end
+	end # class LogFormatter
+
+
 	# An alternate formatter for Logger instances that outputs +dd+ HTML
 	# fragments.
 	# 

File spec/lib/helpers.rb

 			Verse.logger = Logger.new( logdevice )
 			# Verse.logger.level = level
 			Verse.logger.formatter = Verse::HtmlLogFormatter.new( Verse.logger )
+		elsif $stderr.tty?
+			Verse.logger.formatter = Verse::ColorLogFormatter.new( Verse.logger )
 		end
 	end
 

File spec/verse/node_spec.rb

 			}.to raise_exception( Verse::NodeError, /already set/ )
 		end
 
-		it "can be created on the server" do
-			
-		end
-
-		it "can be destroyed"
-
 		it "can have a name"
 
 		it "can be subscribed to"
 
 		it "can create tag groups"
 		it "can destroy tag groups"
-		it "can subscribe to a tag group"
-		it "can unsubscribe from a tag group"
 
 	end
 end

File spec/verse/session_spec.rb

 	$LOAD_PATH.unshift( extdir ) unless $LOAD_PATH.include?( extdir )
 }
 
+require 'ostruct'
+
 require 'spec'
 require 'spec/lib/constants'
 require 'spec/lib/helpers'
 
 require 'verse'
+require 'verse/server'
 
 
 include Verse::TestConstants
 	before( :each ) do
 		Verse.remove_observers
 		@port = 45196
-		Verse.port = @port
+		@address = "localhost:#@port"
 	end
 
 
 	end
 
 
-	describe "instances" do
+	describe "instances that have not yet connected" do
 		before( :each ) do
 			@session = Verse::Session.new
 		end
 
-		it "raises an exception if the address hasn't been set when connecting" do
+		it "raise an exception if their address hasn't been set when connecting" do
 			expect {
-				@session.connect
+				@session.connect( 'user', 'pass' )
 			}.to raise_exception( Verse::SessionError, /address/i )
 		end
 
 	end
 
+
+	describe "instances that are connected" do
+		before( :all ) do
+			unless @server_pid = Process.fork
+				begin
+					config = OpenStruct.new
+					config.port = 45196
+					Verse::Server.new( config ).run
+				rescue RuntimeError, ScriptError => err
+					$stderr.puts "\e[31;43m %p in the server setup: %s\e[0m" %
+					 	[ err.class, err.message ]
+					err.backtrace.each do |frame|
+						$stderr.puts "   #{frame}"
+					end
+				ensure
+					exit!
+				end
+			end
+		end
+
+		before( :each ) do
+			@session = Verse::Session.new( @address )
+			@session.connect( 'test', 'test' )
+			@update_thread = Thread.new do
+				Thread.current.abort_on_exception = true
+				Thread.current[:running] = true
+				Verse::Session.update until Thread.current[:halt]
+			end
+		end
+
+		after( :each ) do
+			@update_thread[:halt] = true
+			@update_thread.join
+		end
+
+		after( :all ) do
+			Process.kill( :TERM, @server_pid )
+			Process.wait
+		end
+
+		it "raise an exception if asked to destroy a node that doesn't belong to a session" do
+			node = Verse::ObjectNode.new
+			expect {
+				@session.destroy_node( node )
+			}.to raise_exception( Verse::NodeError, /active session/i )
+		end
+
+		it "raise an exception if asked to destroy a node that belongs to another session" # do
+		 # 			other_session = Verse::Session.new( @address )
+		 # 			other_session.connect( 'test2', 'test2' )
+		 # 
+		 # 			finished = false
+		 # 			other_session.create_node( Verse::ObjectNode ) do |node|
+		 # 				expect {
+		 # 					@session.destroy_node( node )
+		 # 				}.to raise_exception( Verse::SessionError, /alive/ )
+		 # 				finished = true
+		 # 			end
+		 # 
+		 # 			sleep 1 until finished
+		 # 		end
+
+	end
+
 end
 
 # vim: set nosta noet ts=4 sw=4:

File spec/verse_spec.rb

 		end
 
 		it "are sent to objects which are PingObservers" do
-			pending "splitting the server and ping APIs a bit more" do
+			pending "Figuring out why the hell no commands are being received" do
 				observer_class = Class.new do
 					include Verse::PingObserver
 
 
 				Verse.add_observer( observer )
 				Verse.ping( addr, data )
-				Verse::Session.update( 0.1 )
-				Verse::Session.update( 0.1 )
+				10.times do
+					Verse::Session.update( 0.1 )
+					Verse::Session.update( 0.1 )
+				end
 
 				observer.address.should == addr
 				observer.data.should == data