Commits

Chris Perl committed 8412a27

More tweaks and refactoring

Comments (0)

Files changed (1)

 class BayeuxServerError(Exception):
 	pass
 
+class BayeuxError(Exception):
+	pass
+
 # Bayeux Message Classes
 class RequestMessageFactory(object):
 	"""
 		Required Fields:
 		  * channel: Must have a value of "/meta/subscribe"
 		  * clientId: The clientId from the handshake
-		  * subscription: channel name or a channel pattern or an array 
-			of channel names and channel patterns
+		  * subscription: channel name or a channel pattern or an array
+			          of channel names and channel patterns
 
 		Optional Fields:
 		  * ext: Same as above
 	@d (dict) : A dictionary representation of a bayeux message
 	"""
 	def __init__(self, d=None, clientId=None, successful=None, id=None):
-		self.required_fields = ('channel', 'clientId', 'successful')
+		# self.required_fields = ('channel', 'clientId', 'successful')
+		# cometd within jetty doesn't appear to be doing to the protocol correctly
+		self.required_fields = ('channel', 'successful')
 		if d is None:
 			assert clientId, "You must supply a clientId for a DisconnectResponse message"
 			assert successful, "You must supply whether the message indicates success or failure"
 
 class BayeuxReceiver(threading.Thread):
 	"""
-	A class for managing a connection with a server.  This class pull down
+	A class for managing a connection with a server.  This class pulls down
 	messages from the remote machine and ensures that everything remains in
 	the connected state.
 	"""
 	def __init__(self, url, listeners=[], http_class=HTTPConnection):
-		threading.Thread.__init__(self)
+		# The call to threading.Thread.__init__ is done for each call to connect
+		# threading.Thread.__init__(self)
 		p = urlparse.urlparse(url)
 		self._host = p.netloc
 		self._path = p.path
+		self._listeners = listeners
 		self._http_class = http_class
+
+	def connect(self):
+		"""
+		Start our object in a new thread of control
+		"""
+		# call threading.Thread.__init__ to reset state so that we can
+		# call connect again after previously calling disconnect
+		threading.Thread.__init__(self)
 		self._http = self._http_class(self._host)
 		self._state = BayeuxStateUNCONNECTED()
 		self._reconnect = 'retry'
 		self._should_run = True
 		self._clientId = None
 		self._lock = threading.Lock()
-		self._listeners = listeners
+		self._listener_lock = threading.Lock()
 		# and start the run method in a new thread of control
 		self.start()
 
 	def run(self):
+		"""
+		The main loop invoked which is started by calling connect, which
+		subsequently calls our own start method, which will eventually
+		call our run method.  self.start is part of the threading.Thread
+		mechanics.
+		"""
 		while self._should_run and self._reconnect is not None:
 			m = self._state.get_next_message(self)
 			msgs = self._send_recv(self._http, m)
-			event_msgs = [ x for x in msgs if isinstance(x, EventMessage) ]
+			event_msgs    = [ x for x in msgs if     isinstance(x, EventMessage) ]
 			response_msgs = [ x for x in msgs if not isinstance(x, EventMessage) ]
 			assert len(event_msgs) + len(response_msgs) == len(msgs)
 			assert len(response_msgs) == 1
 			self._update_advice(m)
 			self._state = self._state.transition(self, m)
 			self._do_callback("on_message", event_msgs)
-			time.sleep(self._interval)
+			# We need to change from milliseconds into seconds for time.sleep
+			time.sleep(float(self._interval)/float(1000))
+		# If we get here, It means we should no longer be running, so
+		# close things down
+		self._state = BayeuxStateUNCONNECTED()
+		self._http.close()
 
 	def _send_recv(self, http_client, m):
 		"""
 		"""
 		Add BayeuxListener l to the list of listeners.
 		"""
-		with self._lock:
+		with self._listener_lock:
 			self._listeners.append(l)
 
 	def stop(self):
 		with self._lock:
 			self._should_run = False
-			self._state = BayeuxStateUNCONNECTED()
+			self.join()
 
 	def subscribe(self, channel):
 		"""
 			# We're not connected, perhaps just an on_error callback?
 			pass
 
+	def disconnect(self):
+		"""
+		Disconnect from the bayeux server.
+		"""
+		if self._connected():
+			m = RequestMessageFactory(BayeuxRequestMessage.DISCONNECT, clientId=self.clientId)
+			msg, event_msgs = self._synchronous_cmd_helper(m)
+			if msg.successful:
+				print "Disconnect successful, should be doing a callback here."
+			else:
+				print "Disconnect unsuccessful, should be doing a callback here."
+			self._do_callback("on_message", event_msgs)
+			self.stop()
+		else:
+			pass
+
 	def _synchronous_cmd_helper(self, m):
 		"""
 		A helper command for subscribe, unsubscribe, and publish.
 		"""
 		http = self._http_class(self._host)
 		msgs = self._send_recv(http, m)
-		event_msgs = [ x for x in msgs if isinstance(x, EventMessage) ]
+		event_msgs    = [ x for x in msgs if     isinstance(x, EventMessage) ]
 		response_msgs = [ x for x in msgs if not isinstance(x, EventMessage) ]
 		assert len(event_msgs) + len(response_msgs) == len(msgs)
 		assert len(response_msgs) == 1
 		"""
 		For each listener, call func once for each msg in msg_list
 		"""
-		with self._lock:
+		with self._listener_lock:
 			for listener in self._listeners:
 				if hasattr(listener, func):
 					c = getattr(listener, func)