Commits

Chris Perl committed c1c890c

Adding asyncore event loop based bayeux client and making it use bayeuxmessage.py

  • Participants
  • Parent commits 7a3eec7

Comments (0)

Files changed (3)

 
 from bayeuxmessage import *
 
+# Bayeux Spec
+# http://svn.cometd.com/trunk/bayeux/bayeux.html
+
 class BayeuxReceiver(threading.Thread):
 	"""
 	A class for managing a connection with a server.  This class pulls down
 		resp = http_client.getresponse()
 		# TODO: Check for non 200 responses
 		data = resp.read()
-		data = json.loads(data)
+		data = json_decode(data)
 		return [ ResponseMessageFactory(BayeuxResponseMessage.FROMDICT, x) for x in data ]
 
 	def _update_advice(self, m):

bayeuxclientevent.py

+import asyncore
+import socket
+
+from bayeuxmessage import *
+
+# XXX: debug
+import pdb
+
+class HttpRequest(object):
+	def __init__(self, data):
+		self.action  = "POST"
+		self.path    = "/cometd"
+		self.version = "HTTP/1.1"
+		self.body = data
+		self.headers = { 
+				"Host": "localhost:8080",
+				"Content-Type": "application/json",
+				"Content-Length": len(self.body),
+		}
+
+	def dump(self):
+		tmp = []
+		tmp.append("%s %s %s" % (self.action, self.path, self.version))
+		for k in self.headers.keys():
+			tmp.append("%s: %s" % (k, self.headers[k]))
+		tmp.append("")
+		tmp.append(self.body)
+		tmp = '\r\n'.join(tmp)
+		return tmp
+
+class HttpResponse(object):
+	def __init__(self, buffer):
+		self.headers = {}
+		self.body    = ""
+
+		(headers, body) = buffer.split('\r\n\r\n')
+		for h in headers.split('\r\n'):
+			if "HTTP/" in h:
+				(self.version, self.status, self.reason) = h.split()
+				self.status = int(self.status)
+			else:
+				header, value = h.split(':', 1)
+				header = header.strip()
+				value = value.strip()
+				self.headers[header] = value
+		self.body = body
+
+class BayeuxClient(asyncore.dispatcher):
+	BUFSIZ = 4096
+
+	def __init__(self):
+		# Call our super constructor
+		asyncore.dispatcher.__init__(self)
+
+		# initiate the socket connection to the server
+		self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
+		self.connect(("localhost", 8080))
+		self.channel = "/org/fppe/test"
+
+		# setup various state parameters so we know what part
+		# of the protocol we are in
+		self.handshake_sent        = False
+		self.handshake_complete    = False
+		self.connect_sent          = False
+		self.connect_complete      = False
+		self.subscription_sent     = False
+		self.subscription_complete = False
+		self.longpoll_sent         = False
+		self.longpoll_complete     = False
+
+		# used to determine if we're waiting on a response
+		self.awaiting_response     = False
+
+		# setup various other parameters related to the bayeux protocol
+		self.clientId             = None
+
+		# setup input and output buffers for communication
+		self.ibuffer		   = ""
+		self.obuffer               = ""
+
+	def read_data(self):
+		# Read all the data we can from the socket into the input buffer
+		try:
+			while True:
+				tmp = self.recv(self.BUFSIZ)
+				if not tmp:
+					break
+				self.ibuffer += tmp
+		except socket.error, e:
+			pass
+
+	def write_data(self):
+		# write all the data we can from the output buffer into the socket
+		try:
+			while True:
+				sent = self.send(self.obuffer)
+				if sent == len(self.obuffer):
+					break
+				self.obuffer = self.obuffer[sent:]
+		except socket.error, e:
+			pass
+
+	def handle_connect(self):
+		pass
+
+	def handle_expt(self):
+		print "um, not sure about this one..."
+
+	def handle_read(self):
+		self.read_data()
+		h = HttpResponse(self.ibuffer)
+		if h.status != 200: 
+			raise StandardError("Non 200 response from server: %d" % h.status)
+		data = json_decode(h.body)
+		msgs = [ ResponseMessageFactory(BayeuxResponseMessage.FROMDICT, x) for x in data ]
+		event_msgs    = [ x for x in msgs if     isinstance(x, EventMessage) ]
+		response_msgs = [ x for x in msgs if not isinstance(x, EventMessage) ]
+		assert len(event_msgs) + len(response_msgs) == len(msgs)
+		assert len(response_msgs) == 1
+		m = response_msgs[0]
+
+		if m.error:
+			raise StandardError("Error response from server %s" % m)
+
+		# This if block correspondes to the various states of the protocol
+		if   self.longpoll_sent:
+			for msg in msgs:
+				print str(msg)
+		elif self.subscription_sent:
+			print "Subscription Reply: %s" % m
+		elif self.connect_sent:
+			print "Connect Reply: %s" % m
+		elif self.handshake_sent:
+			print "Handshake Reply: %s" % m
+			self.clientId = m.clientId
+		else:
+			raise StandardError("Err, WTF?")
+
+		# Reset the input buffer
+		self.ibuffer = ""
+
+		# We're no longer waiting on a response
+		self.awaiting_response = False
+
+	def handle_write(self):
+		if not self.handshake_sent:
+			# we want to start by sending a handshake message to the server
+			m = RequestMessageFactory(BayeuxRequestMessage.HANDSHAKE)
+			h = HttpRequest(m.as_json())
+			self.obuffer = h.dump()
+			self.write_data()
+			self.handshake_sent = True
+		elif not self.connect_sent:
+			m = RequestMessageFactory(BayeuxRequestMessage.CONNECT, clientId=self.clientId)
+			h = HttpRequest(m.as_json())
+			self.obuffer = h.dump()
+			self.write_data()
+			self.connect_sent = True
+		elif not self.subscription_sent:
+			m = RequestMessageFactory(BayeuxRequestMessage.SUBSCRIBE, clientId=self.clientId, subscription=self.channel)
+			h = HttpRequest(m.as_json())
+			self.obuffer = h.dump()
+			self.write_data()
+			self.subscription_sent = True
+		else:
+			# Here, its time to long poll
+			m = RequestMessageFactory(BayeuxRequestMessage.CONNECT, clientId=self.clientId)
+			h = HttpRequest(m.as_json())
+			self.obuffer = h.dump()
+			self.write_data()
+			self.longpoll_sent = True
+
+		# We're now waiting for the server to respond
+		self.awaiting_response = True
+
+	def readable(self):
+		return True if self.awaiting_response else False
+
+	def writable(self):
+		return True if not self.awaiting_response else False
+
+def main():
+	# Create a crap load of clients for load testing
+	clients = []
+	for i in range(50):
+		clients.append(BayeuxClient())
+	asyncore.loop(use_poll=True)
+
+if __name__ == "__main__":
+	main()
-import json
 import uuid
 import random
 import string
 
+try:
+	import cjson
+	json_encode = cjson.encode
+	json_decode = cjson.decode
+except ImportError:
+	import json
+	json_encode = json.dumps
+	json_decode = json.loads
+
 # Bayeux Spec
 # http://svn.cometd.com/trunk/bayeux/bayeux.html
 
 		Return a string representation of a bayeux message.
 		"""
 		assert self._message
-		return "Bayeux Message: %s" % (json.dumps(self._message))
+		return "Bayeux Message: %s" % (json_encode(self._message))
 
 	def as_json(self):
 		"""
 		Return the message as a json encoded string.
 		"""
-		return json.dumps(self._message)
+		return json_encode(self._message)
 
 	def as_dict(self):
 		"""