Commits

Jamie Turner  committed 1c7a971

Moved the main lib to 'diesel', and removed cluster code from
default.

  • Participants
  • Parent commits 26d2514

Comments (0)

Files changed (25)

 General
-  * defer_to_thread-style behavior
   * Reactor.until clarification, vs. sleep/up, etc?
   * Fake raising an exception through the gen stack in cycle_all..
     if no one catches it, raise it back at the top-most level
   * @message support for "schedule this generator but don't block me"
-  * Write tests, somehow, for all this fancy crap and the cluster code
-  * Then, generalize and clean up!
-  * cluster. broadcast, listen, call
-  * Specialize the paxos database more for our purposes
+  * Write tests, somehow--check out twisted.trial
   * Clean up logging and debug statements
-  * Handle connection errors much more robustly on the cluster.. Connection object
-    probably would be helped by the whole "raise through the fake stack" point
-
-Clustering...
-
-register()  -- register a mailbox
-send()      -- send a single message to a mailbox
-rpc()      -- block rpc style on a call to a mailbox
-
-listen()    -- listen to a class of broadcast message
-broadcast() -- send out a broadcast message
+  * Rename to diesel
+  * Test WSGI stuff
+  * Docstrings
+  * Setup, and pyevent bundling

File concussion/__init__.py

-import logmod
-log = logmod.log
-from timers import call_later, call_every
-from core import until, until_eol, bytes, sleep, up, Connection, ConnectionClosed, Loop
-from app import Application, Service 
-from client import Client, call, response
-from cluster import Cluster, register, send

File concussion/app.py

-import socket
-import event
-import traceback
-import os
-
-from concussion import logmod, log
-from concussion import timers
-from concussion import Connection
-from concussion import Loop
-
-class Application(object):
-	def __init__(self, logger=None, cluster=None):
-		self._run = False
-		if logger is None:
-			logger = logmod.Logger()
-		self.logger = logger
-		self.add_log = self.logger.add_log
-		self._services = []
-		self._loops = []
-		self.cluster = cluster
-
-	def run(self):
-		if self.cluster:
-			self.add_service(Service(self.cluster.service, self.cluster.port))
-			self.add_loop(self.cluster.loop, front=True)
-			from concussion import cluster as clustermod
-			clustermod._cluster = self.cluster
-
-		self._run = True
-		logmod.set_current_application(self)
-		log.info('Starting concussion application')
-
-		for s in self._services:
-			s.bind_and_listen()
-			event.event(s.accept_new_connection,
-			handle=s.sock, evtype=event.EV_READ | event.EV_PERSIST, arg=s).add()
-		for l in self._loops:
-			l.iterate()
-
-		def checkpoint():
-			if not self._run:
-				raise SystemExit
-
-		timers.call_every(1.0, checkpoint)
-#		import gc, sys # XXX debug
-#		timers.call_every(5.0, lambda: sys.stdout.write(repr(gc.get_objects()) + '\n'))
-		
-		self.setup()
-		while self._run:
-			try:
-				event.dispatch()
-			except SystemExit:
-				log.warn("-- SystemExit raised.. exiting main loop --")
-				break
-			except KeyboardInterrupt:
-				log.warn("-- KeyboardInterrupt raised.. exiting main loop --")
-				break
-			except Exception, e:
-				log.error("-- Unhandled Exception in main loop --")
-				log.error(traceback.format_exc())
-
-		log.info('Ending concussion application')
-
-	def add_service(self, service):
-		service.application = self
-		if self._run:
-			s.bind_and_listen()
-			event.event(s.accept_new_connection,
-			handle=s.sock, evtype=event.EV_READ | event.EV_PERSIST, arg=s).add()
-		else:
-			self._services.append(service)
-
-	def add_loop(self, loop, front=False):
-		loop.application = self
-		if self._run:
-			loop.iterate()
-		else:
-			if front:
-				self._loops.insert(0, loop)
-			else:
-				self._loops.append(loop)
-		
-	def halt(self):	
-		self._run = False
-
-	def setup(self):
-		pass
-
-class Service(object):
-	LQUEUE_SIZ = 500
-	def __init__(self, connection_handler, port, iface=''):
-		self.port = port
-		self.iface = iface
-		self.sock = None
-		self.connection_handler = connection_handler
-		self.application = None
-
-	def handle_cannot_bind(self, reason):
-		log.critical("service at %s:%s cannot bind: %s" % (self.iface or '*', 
-				self.port, reason))
-		raise
-
-	def bind_and_listen(self):
-		sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-		sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-
-		try:
-			sock.bind((self.iface, self.port))
-		except socket.error, e:
-			self.handle_cannot_bind(str(e))
-
-		sock.listen(self.LQUEUE_SIZ)
-		self.sock = sock
-
-	def _get_listening(self):
-		return self.sock is not None
-
-	listening = property(_get_listening)
-
-	def accept_new_connection(self, *args):
-		sock, addr = self.sock.accept()
-		Connection(sock, addr, self.connection_handler).iterate()

File concussion/buffer.py

-class Buffer(object):
-	def __init__(self):
-		self._atinbuf = []
-		self._atterm = None
-		self._atmark = 0
-		
-	def set_term(self, term):
-		self._atterm = term
-
-	def feed(self, data):
-		self._atinbuf.append(data)
-		self._atmark += len(data)
-		return self.check()
-
-	def check(self):
-		'''Look for the message
-		'''
-		ind = None
-		all = None
-		if type(self._atterm) is int:
-			if self._atmark >= self._atterm:
-				ind = self._atterm
-		else:
-			all = ''.join(self._atinbuf)
-			res = all.find(self._atterm)
-			if res != -1:
-				ind = res + len(self._atterm)
-		if ind is None:
-			return None
-		if all is None:
-			all = ''.join(self._atinbuf)
-		use = all[:ind]
-		new_all = all[ind:]
-		self._atinbuf = [new_all]
-		self._atmark = len(new_all)
-
-		return use

File concussion/client.py

-import socket
-from collections import deque
-
-class call(object):
-	def __init__(self, f, inst=None):
-		self.f = f
-		self.client = inst
-
-	def __call__(self, *args, **kw):
-		self.gen = self.f(self.client, *args, **kw)
-		return self
-
-	def __get__(self, inst, cls):
-		return call(self.f, inst)
-
-	def go(self, callback): # XXX errback-type stuff?
-		self.client.conn.callbacks.append(callback)
-		self.client.jobs.append(self.gen)
-		self.client.conn.wake()
-
-class response(object):
-	def __init__(self, value):
-		self.value = value
-
-class Client(object):
-	def __init__(self, connection_handler=None):
-		self.connection_handler = connection_handler or self.client_conn_handler
-		self.jobs = deque()
-		self.conn = None
-	 
-	def connect(self, addr, port):  
-		remote_addr = (addr, port)
-		sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-		sock.connect(remote_addr)
-		from concussion.core import Connection
-		self.conn = Connection(sock, (addr, port), self.client_conn_handler)
-		self.conn.iterate()
-
-	def close(self):
-		self.conn = None
-
-	@property
-	def is_closed(self):
-		return self.conn is None
-
-	def client_conn_handler(self, addr):
-		from concussion.core import sleep, ConnectionClosed
-		yield self.on_connect()
-
-		while True:
-			try:
-				if not self.jobs:
-					yield sleep()
-				if not self.jobs:
-					continue
-				mygen = self.jobs.popleft()
-				yield mygen
-			except ConnectionClosed:
-				self.close()
-				self.on_close()
-
-	def on_connect(self):
-		if 0: yield 0
-
-	def on_close(self):
-		pass

File concussion/cluster.py

-import os
-import socket
-import time
-import hashlib
-import cjson
-import random
-from collections import defaultdict
-from struct import pack, unpack
-
-prot_encode = cjson.encode
-prot_decode = cjson.decode
-
-from concussion import Client, call, response, Service, bytes, up, Loop
-from concussion import until_eol, log as olog, sleep, ConnectionClosed
-
-HOSTNAME = os.uname()[1].lower()
-
-class PaxosAgent(object):
-	(_,
-	PROPOSAL_OUTDATED,
-	PROPOSAL_ACCEPTED,
-	) = range(3)
-
-	def __init__(self, cluster):
-		self.cluster = cluster
-		self.node_id = hashlib.md5(str(cluster.me)).hexdigest()
-		self.promise_file = os.path.join('/tmp', 'concussion_promises_%s' % self.node_id)
-		self.load()
-		self.db = {}
-		self.states = {}
-		self.refs = defaultdict(set)
-		self.counter = 0
-	
-	def gen_proposal_id(self):
-		self.counter += 1
-		return [int(time.time() * 1000), self.node_id, self.counter % 10000]
-
-	def load(self):
-		if os.path.isfile(self.promise_file):
-			self.promises = prot_decode(open(self.promise_file, 'rb').read())
-		else:
-			self.promises = {}
-
-	def save(self):
-		open(self.promise_file, 'wb', 0).write(prot_encode(self.promises))
-
-	def db_sync(self, db):
-		for key, (state, value) in db.iteritems():
-			if key not in self.db:
-				self.db[key] = (state, value)
-				self.states[key] = state
-				if type(value) is list:
-					self.refs[tuple(value)].add(key)
-			else:
-				(ostate, _) = self.db[key] 
-				if state > ostate:
-					self.db[key] = (state, value)
-					self.states[key] = state
-					if type(value) is list:
-						self.refs[tuple(value)].add(key)
-
-	def handle_prepare(self, state, number, key, value):
-		if key in self.db:
-			(ostate, ovalue) = self.db[key]
-		else:
-			ostate = None
-			ovalue = None
-		
-		if ostate is not None and state != ostate:
-			return self.PROPOSAL_OUTDATED, ostate, ovalue
-	
-		if key in self.promises:
-			(ptime, pnumber, pvalue) = self.promises[key]
-			if pnumber > number:
-				return self.PROPOSAL_ACCEPTED, pnumber, pvalue
-			else:
-				self.promises[key] = time.time(), number, value
-				self.save()
-				return self.PROPOSAL_ACCEPTED, number, value
-		else:
-			self.promises[key] = time.time(), number, value
-			self.save()
-			return self.PROPOSAL_ACCEPTED, number, value
-
-	def handle_accept(self, number, key, value):
-		if self.states.get(key) != number:
-			if key in self.db:
-				ostate, ovalue = self.db[key]
-				if type(ovalue) is list:
-					self.refs[tuple(ovalue)].remove(key)
-			self.db[key] = number, value
-			self.states[key] = number
-			if type(value) is list:
-				self.refs[tuple(value)].add(key)
-			self.cluster.log.debug(" accepting %s -> %s" % (key, value))
-
-	def remove_node(self, node):
-		node = tuple(node)
-		if node in self.refs:
-			refs = self.refs.pop(node)
-			for r in refs:
-				state, val = self.db[r]
-				self.db[r] = state, None
-		else:
-			refs = set()
-			
-		return refs
-
-class Cluster(object):
-	def __init__(self, nodes, port=9715):
-		global _cluster
-		self.port = port
-		self.me = (HOSTNAME, port)
-		self.connected_nodes = {}
-		self.quorum_amount = (len(nodes) / 2) + 1
-		assert self.me in nodes, "Could not find self in nodes"
-		self.nodes = nodes[:]
-
-		nodes.remove(self.me)
-		self.remote_nodes = nodes
-		self.service = ClusterService(self)
-		self.paxos = PaxosAgent(self)
-		self.registry = {}
-		self.loop = Loop(self.cluster_loop)
-		self.need_election = set()
-		_cluster = self
-
-	@property
-	def operational(self):
-		return self.network_size >= self.quorum_amount
-
-	@property
-	def network_size(self):
-		return 1 + len(self.connected_nodes)
-
-	@property
-	def cluster_size(self):
-		return 1 + len(self.remote_nodes)
-
-	@property
-	def missing_nodes(self):
-		return set(self.remote_nodes) - set(self.connected_nodes)
-
-	def register(self, name, handler):
-		self.registry[name] = handler
-		yield self.elect_name(name)
-
-	def elect_name(self, name):
-		self.log.warn("Starting master election for %s" % name)
-		last_log = 0
-		while not self.operational:
-			if time.time() - last_log > 5:
-				self.log.warn("Blocking election on %s until cluster is ready..."
-				% name)
-				last_log = time.time()
-			yield sleep(1)
-		state = self.paxos.states.get(name)
-		value = random.choice([self.me] + [[h,p] for (h,p) in self.connected_nodes])
-		for x in [0.07, 0.15, 0.3, 0.6, 1.2, 2.5, 5, None]:
-			self.log.debug("suggesting %s -> %s" % (name, value))
-			consensus = yield self._elect_name(name, value)
-			if consensus:
-				self.log.info("consensus on %s -> %s" % (name, self.paxos.db[name][1]))
-				break
-			nstate = self.paxos.states.get(name)
-			assert x != None, "MASSIVE failure: Couldn't resolve consensus!!!"
-			if random.randint(0, len(self.connected_nodes)):
-				yield sleep(x)
-				if self.paxos.states.get(name) != nstate:
-					break
-			if name in self.paxos.db:
-				value = self.paxos.db[name][1]
-
-	def _elect_name(self, name, value):
-		'''If True is returned, our state is _definitely_ good.
-		If False, then it is _maybe_ good.  Checking message status
-		via delivery is ideal, or re-trying claim based on updated 
-		state.
-		'''
-		if type(value) is tuple:
-			value = list(value)
-		number = self.paxos.gen_proposal_id()
-		state = self.paxos.states.get(name)
-
-		status, number, value = self.paxos.handle_prepare(state, number, name, value)
-		assert status == PaxosAgent.PROPOSAL_ACCEPTED
-
-
-		p_accept_count = 1
-		latest_state = None
-		latest_value = None
-
-		for nid, client in self.connected_nodes.items():
-			print 'iter with', state, number, value, nid
-			status, tnumber, tvalue = yield client.prepare(state, number, name, value)
-			if status == PaxosAgent.PROPOSAL_ACCEPTED:
-				p_accept_count += 1
-				number = tnumber
-				value = tvalue
-				if p_accept_count >= self.quorum_amount:
-					break
-			else:
-				if tnumber > latest_state:
-					latest_state = tnumber
-					latest_value = tvalue
-
-		if p_accept_count >= self.quorum_amount:
-			self.paxos.handle_accept(number, name, value)
-			for nid, client in self.connected_nodes.items():
-				yield client.accept(number, name, value)
-			yield up(True)
-		else:
-			self.paxos.handle_accept(latest_state, name, latest_value)
-			yield up(False)
-
-	LOG_STATUS_EVERY = 5
-	def cluster_loop(self):
-		self.log = log = olog.get_sublogger("cluster")
-		last_log = 0
-		while True:
-			if self.missing_nodes:
-				added = False
-				for n in self.missing_nodes:
-					try:
-						client = ClusterClient(self)
-						client.connect(*n)
-					except socket.error:
-						pass
-					else:
-						server_id = (yield client.handshake())
-						log.debug(".. connected to %s" % (server_id,))
-						self.connected_nodes[n] = client
-						db = yield client.sync()
-						self.paxos.db_sync(db)
-						added = True
-				tm = time.time()
-				if self.operational:
-					if added:
-						log.info("cluster operational (%s/%s nodes connected, %s required)" 
-						% (self.network_size, self.cluster_size, self.quorum_amount))
-				else:
-					if tm - last_log > self.LOG_STATUS_EVERY:
-						log.info("no quorum yet, cluster down (%s/%s nodes connected, %s required)" 
-						% (self.network_size, self.cluster_size, self.quorum_amount))
-						last_log = tm
-
-			if not self.operational:
-				yield sleep(0.1)
-			else:
-				ne_copy = self.need_election.copy()
-				for name in ne_copy:
-					yield self.elect_name(name)
-				self.need_election = self.need_election - ne_copy
-
-				if self.missing_nodes:
-					yield sleep(1)
-					print hashlib.md5(str(list(sorted(self.paxos.db.items())))).hexdigest()
-				else:
-					yield sleep(3)
-		#			print list(sorted(self.paxos.db.items()))
-					print hashlib.md5(str(list(sorted(self.paxos.db.items())))).hexdigest()
-
-	def node_disconnect(self, id):
-		del self.connected_nodes[id]
-		self.log.debug(".. %s dropped our connection" % (id, ))
-		refs = self.paxos.remove_node(id)
-		elect_refs = [r for r in refs if r in self.registry]
-		self.log.warn(".. %s names held by dropped node %s need to be re-elected" 
-		% (len(elect_refs), id))
-		self.need_election.update(elect_refs)
-		self.loop.schedule()
-
-	def send(self, service, method, *args, **kw):
-		for x in xrange(10):
-			service = self.paxos.db.get(service)
-			if service:
-				_, host = service
-				if host is not None:
-					break
-			yield sleep(1)
-		else:
-			raise MessageSendError("No registered service '%s'" % service)
-		
-		if host == self.me:
-			inst = self.registry[service]
-			method = getattr(inst, method)
-			method(inst, *args, **kw)
-		else:
-			client = self.connected_nodes[host]
-			yield client.message(service, method, args, kw)
-
-def parse_id(s):
-	host, port = s.strip().split(':')
-	port = int(port)
-	return host, port
-
-( _,
-CLUSTER_MESSAGE_PREPARE,
-CLUSTER_MESSAGE_ACCEPT,
-CLUSTER_MESSAGE_MESSAGE,
-CLUSTER_MESSAGE_CALL,
-CLUSTER_MESSAGE_SYNC,
-) = range(6)
-
-class ClusterService(object):
-	def __init__(self, cluster):
-		self.cluster = cluster
-
-	def __call__(self, addr):
-		try:
-			yield "%s:%s\r\n" % self.cluster.me
-			client_id = parse_id((yield until_eol()))
-			self.cluster.log.debug(".. got connection from %s" % (client_id,))
-			self.cluster.loop.schedule()
-			while True:
-				paxos = self.cluster.paxos
-				size_raw = yield bytes(4)
-				(msize,) = unpack('=I', size_raw)
-				message = prot_decode((yield bytes(msize)))
-				mtype = message['type']
-				if mtype == CLUSTER_MESSAGE_MESSAGE:
-					raise NotImplementedError # XXX TODO
-				elif mtype == CLUSTER_MESSAGE_CALL:
-					raise NotImplementedError # XXX TODO
-				elif mtype == CLUSTER_MESSAGE_PREPARE:
-					#print 'PREPARE!'
-					status, number, value = paxos.handle_prepare(*message['args'])
-					out = prot_encode([status, number, value])
-					yield pack('=I%ss' % len(out), len(out), out)
-				elif mtype == CLUSTER_MESSAGE_ACCEPT:
-					#print 'ACCEPT!'
-					paxos.handle_accept(*message['args'])
-					yield pack('=I', 0)
-				elif mtype == CLUSTER_MESSAGE_SYNC:
-					#print 'SYNC!'
-					out = prot_encode(paxos.db)
-					yield pack('=I%ss' % len(out), len(out), out)
-			
-		except ConnectionClosed:
-			self.cluster.log.debug(".. connection dropped from %s" % (client_id,))
-
-class ClusterClient(Client):
-	def __init__(self, cluster, *args, **kw):
-		Client.__init__(self, *args, **kw)
-		self.cluster = cluster
-
-	@call
-	def handshake(self):
-		yield "%s:%s\r\n" % self.cluster.me
-		self.server_id = parse_id((yield until_eol()))
-		yield response(self.server_id)
-
-	@call
-	def prepare(self, state, number, name, value):
-		msg = {'type' : CLUSTER_MESSAGE_PREPARE, 'args' : [state, number, name, value]}
-		out = prot_encode(msg)
-		yield pack('=I%ss' % len(out), len(out), out)
-		(retsize,) = unpack('=I', (yield bytes(4)))
-		yield response(tuple(prot_decode((yield bytes(retsize)))))	
-
-	@call
-	def accept(self, number, name, value):
-		msg = {'type' : CLUSTER_MESSAGE_ACCEPT, 'args' : [number, name, value]}
-		out = prot_encode(msg)
-		yield pack('=I%ss' % len(out), len(out), out)
-		(retsize,) = unpack('=I', (yield bytes(4)))
-		assert retsize == 0
-		yield response(None)
-
-	@call
-	def sync(self):
-		msg = {'type' : CLUSTER_MESSAGE_SYNC}
-		out = prot_encode(msg)
-		yield pack('=I%ss' % len(out), len(out), out)
-		(retsize,) = unpack('=I', (yield bytes(4)))
-		ret = prot_decode((yield bytes(retsize)))
-		yield response(ret)
-
-	def on_close(self):
-		self.cluster.node_disconnect(self.server_id)
-
-_cluster = None
-
-def register(name, handler):
-	yield _cluster.register(name, handler)
-
-def send(service, message, *args, **kw):
-	yield _cluster.send(service, message, *args, **kw)
-
-class MessageRouter(object):
-	def __init__(self):
-		pass
-
-	def become_master(self):
-		pass

File concussion/core.py

-import socket
-import event
-from types import GeneratorType
-from collections import deque, defaultdict
-
-from concussion import pipeline
-from concussion import buffer
-from concussion import call_later
-from concussion.client import call, response
-
-class ConnectionClosed(socket.error): pass
-
-CRLF = '\r\n'
-BUFSIZ = 2 ** 14
-
-class until(object):
-	def __init__(self, sentinel):
-		self.sentinel = sentinel
-
-def until_eol():
-	return until(CRLF)
-
-class bytes(object):
-	def __init__(self, sentinel):
-		self.sentinel = sentinel
-
-class sleep(object):
-	def __init__(self, duration=0):
-		self.duration = duration
-
-class up(object):
-	def __init__(self, value):
-		self.value = value
-
-class wait(object):
-	def __init__(self, event):
-		self.event = event
-
-class fire(object):
-	def __init__(self, event, value):
-		self.event = event
-		self.value = value
-
-global_waits = defaultdict(set)
-	
-class Connection(object):
-	def __init__(self, sock, addr, connection_handler):
-		self.sock = sock
-		self.addr = addr
-		self.pipeline = pipeline.Pipeline()
-		self.buffer = buffer.Buffer()
-		self._rev = event.event(self.handle_read, handle=sock, evtype=event.EV_READ | event.EV_PERSIST, arg=None)
-		self._rev.add()
-		self._wev = None
-		self.g = self.cycle_all(connection_handler(addr))
-		self.callbacks = deque()
-		self._wakeup_timer = None
-
-	def cycle_all(self, current, error=None):
-		'''Effectively flattens all iterators.
-		'''
-		last = None
-		stack = []
-		while True:
-			try:
-				if error != None:
-					item = current.throw(*error)
-				elif last != None:
-					item = current.send(last)
-				else:
-					item = current.next()
-			except StopIteration:
-				if stack:
-					current = stack.pop()
-				else:
-					raise
-			else:
-				if type(item) is GeneratorType:
-					stack.append(current)
-					current = item
-					last = None
-				else:
-					if type(item) is response:
-						assert stack, "Cannot return a response from main handler"
-						current = stack.pop()
-					try:
-						last = (yield item)
-					except ConnectionClosed, e:
-						error = (ConnectionClosed, str(e))
-
-	def set_writable(self, val):
-		if val and self._wev is None:
-			self._wev = event.event(self.handle_write, handle=self.sock, evtype=event.EV_WRITE | event.EV_PERSIST, arg=None)
-			self._wev.add()
-		elif not val and self._wev is not None:
-			self._wev.delete()
-			self._wev = None
-
-	def shutdown(self, remote_closed=False):
-		if self._rev != None:
-			self._rev.delete()
-			self._rev = None
-
-		self.set_writable(False)
-
-		if remote_closed:
-			try:
-				self.g.throw(ConnectionClosed)
-			except StopIteration:
-				pass
-
-		self.g = None
-
-	def handle_write(self, ev, handle, evtype, _):
-		if not self.pipeline.empty:
-			try:
-				data = self.pipeline.read(BUFSIZ)
-			except pipeline.PipelineCloseRequest:
-				self.sock.close()
-				self.shutdown()
-			else:
-				try:
-					bsent = self.sock.send(data)
-				except socket.error, s:
-					g = self.g
-					self.shutdown(True)
-				else:
-					if bsent != len(data):
-						self.pipeline.backup(data[bsent:])
-
-					if not self.pipeline.empty:
-						return True
-					else:
-						self.set_writable(False)
-
-	def handle_read(self, ev, handle, evtype, _):
-		disconnect_reason = None
-		try:
-			data = self.sock.recv(BUFSIZ)
-		except socket.error, e:
-			data = ''
-			disconnect_reason = str(e)
-
-		if not data:
-			g = self.g
-			self.shutdown(True)
-		else:
-			res = self.buffer.feed(data)
-			if res:
-				self.iterate(res)
-
-	def schedule(self, value=None):
-		if self._wakeup_timer:
-			self._wakeup_timer.cancel()
-			self._wakeup_timer = call_later(0, self.wake, value)
-
-	def wake(self, value=None):
-		if self._wakeup_timer:
-			self._wakeup_timer.cancel()
-		self.iterate(value)
-
-	def iterate(self, n_val=None):
-		self._wakeup_timer = None
-		while True:
-			try:
-				if n_val is not None:
-					rets = self.g.send(n_val)
-				else:
-					rets = self.g.next()
-			except StopIteration:
-				if hasattr(self, 'sock'):
-					self.pipeline.close_request()
-				break
-			n_val = None
-			if type(rets) != tuple:
-				rets = (rets,)
-
-			exit = False
-			for ret in rets:
-				
-				if isinstance(ret, response):
-					c = self.callbacks.popleft()
-					c(ret.value)
-					assert len(rets) == 1, "response cannot be paired with any other yield token"
-					exit = True
-				elif isinstance(ret, call):
-					ret.go(self.iterate)
-					assert len(rets) == 1, "call cannot be paired with any other yield token"
-					exit = True
-				elif isinstance(ret, basestring) or hasattr(ret, 'seek'):
-					self.pipeline.add(ret)
-					assert len(rets) == 1, "a string or file cannot be paired with any other yield token"
-				elif type(ret) is up:
-					n_val = ret.value
-					assert len(rets) == 1, "up cannot be paired with any other yield token"
-				elif type(ret) is fire:
-					assert len(rets) == 1, "fire cannot be paired with any other yield token"
-					waiters = global_waits[ret.event]
-					for w in waiters:
-						w(ret)
-					global_waits[fire.event] = set()
-				elif type(ret) is until or type(ret) is bytes:
-					self.buffer.set_term(ret.sentinel)
-					n_val = self.buffer.check()
-					if n_val == None:
-						exit = True
-				elif type(ret) is sleep:
-					if ret.duration:
-						self._wakeup_timer = call_later(ret.duration, self.wake, ret)
-					exit = True
-				elif type(ret) is wait:
-					global_waits[ret.event].add(self.schedule)
-					exit = True
-				if exit: 
-					break
-			if exit: 
-				break
-
-		if hasattr(self, 'sock') and not self.pipeline.empty:
-			self.set_writable(True)
-
-class Loop(Connection):
-	'''A way to write a connection-less loop.
-
-	XXX
-	This is probably upside down right now.  Fix it eventually.
-	'''
-	def __init__(self, loop_callable):
-		self.g = self.cycle_all(loop_callable())
-		self._wakeup_timer = None

File concussion/logmod.py

-import sys
-import time
-
-_current_application = None
-(
-LOGLVL_DEBUG,
-LOGLVL_INFO,
-LOGLVL_WARN,
-LOGLVL_ERR,
-LOGLVL_CRITICAL,
-) = range(1,6)
-
-_lvl_text = {
-	LOGLVL_DEBUG : 'debug',
-	LOGLVL_INFO : 'info',
-	LOGLVL_WARN : 'warn',
-	LOGLVL_ERR : 'error',
-	LOGLVL_CRITICAL : 'critical',
-}
-	
-
-class Logger:
-	def __init__(self, fd=sys.stdout, verbosity=LOGLVL_WARN):
-		self.fdlist = [fd]
-		self.level = verbosity
-		self.component = None
-
-	def add_log(self, fd):
-		self.fdlist.append(fd)
-
-	def _writelogline(self, lvl, message):
-		if lvl >= self.level:
-			for fd in self.fdlist:
-				fd.write('[%s] {%s%s} %s\n' % (time.asctime(), 
-										self.component and ('%s:' % self.component) or '',
-										_lvl_text[lvl],
-										message))
-
-	debug = lambda s, m: s._writelogline(LOGLVL_DEBUG, m)
-	info = lambda s, m: s._writelogline(LOGLVL_INFO, m)
-	warn = lambda s, m: s._writelogline(LOGLVL_WARN, m)
-	error = lambda s, m: s._writelogline(LOGLVL_ERR, m)
-	critical = lambda s, m: s._writelogline(LOGLVL_CRITICAL, m)
-
-	def get_sublogger(self, component, verbosity=None):
-		copy = Logger(verbosity=verbosity or self.level)
-		copy.fdlist = self.fdlist
-		copy.component = component
-		return copy
-
-def set_current_application(app):
-	global _current_application
-	_current_application = app
-
-class _currentLogger:
-	def __getattr__(self, n):
-		return getattr(_current_application.logger, n)
-
-log = _currentLogger()

File concussion/pipeline.py

-try:
-	import cStringIO
-except ImportError:
-	raise ImportError, "cStringIO is required"
-
-_obj_SIO = cStringIO.StringIO
-_type_SIO = cStringIO.OutputType
-def make_SIO(d):
-	t = _obj_SIO()
-	t.write(d)
-	t.seek(0)
-	return t
-
-def get_file_length(f):
-	m = f.tell()
-	f.seek(0, 2)
-	r = f.tell()
-	f.seek(m)
-	return r
-
-class PipelineLimitReached(Exception): pass
-class PipelineCloseRequest(Exception): pass
-class PipelineClosed(Exception): pass
-	
-class Pipeline(object):
-	def __init__(self, limit=0):
-		self.line = []
-		self.limit = limit
-		self.used = 0
-		self.callbacks = []
-		self.want_close = False
-
-	def add(self, d):
-		if self.want_close:
-			raise PipelineClosed
-
-		if self.limit > 0 and self.used >= self.limit:
-			raise PipelineLimitReached
-
-		if type(d) is str:
-			if self.line and type(self.line[-1][0]) is _type_SIO and \
-			(self.limit == 0 or self.line[-1][1] < (self.limit / 2)):
-				fd, l = self.line[-1]
-				m = fd.tell()
-				fd.seek(0, 2)
-				fd.write(d)
-				fd.seek(m)
-				self.line[-1] = [fd, l + len(d)]
-			else:
-				self.line.append([make_SIO(d), len(d)])
-			self.used += len(d)
-		else:
-			self.line.append([d, get_file_length(d)])
-
-	def close_request(self):
-		self.want_close = True
-
-	def read(self, amt):
-		if self.line == [] and self.want_close:
-			raise PipelineCloseRequest
-
-		rbuf = []
-		read = 0
-		while self.line and read < amt:
-			data = self.line[0][0].read(amt - read)
-			if data == '':
-				if type(self.line[0][0]) is _type_SIO:
-					self.used -= self.line[0][1]
-				del self.line[0]
-			else:
-				rbuf.append(data)
-				read += len(data)
-
-		while self.line and self.line[0][1] == self.line[0][0].tell():
-			self.used -= self.line[0][1]
-			del self.line[0]
-
-		while self.callbacks and self.used < self.limit:
-			self.callbacks.pop(0).callback(self)
-
-		return ''.join(rbuf)
-	
-	def backup(self, d):
-		self.line.insert(0, [make_SIO(d), len(d)])
-		self.used += len(d)
-
-	def _get_empty(self):
-		return self.want_close == False and self.line == []
-	empty = property(_get_empty)
-
-	def _get_full(self):
-		return self.used == 0 or self.used >= self.limit
-	full = property(_get_full)

File concussion/protocols/__init__.py

Empty file removed.

File concussion/protocols/http.py

-import sys, socket
-import urllib
-from collections import defaultdict
-
-from concussion import up, until, until_eol, bytes, ConnectionClosed
-
-status_strings = {
-100 : "Continue",
-101 : "Switching Protocols",
-200 : "OK",
-201 : "Created",
-202 : "Accepted",
-203 : "Non-Authoritative Information",
-204 : "No Content",
-205 : "Reset Content",
-206 : "Partial Content",
-300 : "Multiple Choices",
-301 : "Moved Permanently",
-302 : "Found",
-303 : "See Other",
-304 : "Not Modified",
-305 : "Use Proxy",
-307 : "Temporary Redirect",
-400 : "Bad Request",
-401 : "Unauthorized",
-402 : "Payment Required",
-403 : "Forbidden",
-404 : "Not Found",
-405 : "Method Not Allowed",
-406 : "Not Acceptable",
-407 : "Proxy Authentication Required",
-408 : "Request Time-out",
-409 : "Conflict",
-410 : "Gone",
-411 : "Length Required",
-412 : "Precondition Failed",
-413 : "Request Entity Too Large",
-414 : "Request-URI Too Large",
-415 : "Unsupported Media Type",
-416 : "Requested range not satisfiable",
-417 : "Expectation Failed",
-500 : "Internal Server Error",
-501 : "Not Implemented",
-502 : "Bad Gateway",
-503 : "Service Unavailable",
-504 : "Gateway Time-out",
-505 : "HTTP Version not supported",
-}
-
-def parse_request_line(line):
-	items = line.split(' ')
-	items[0] = items[0].upper()
-	if len(items) == 2:
-		return tuple(items) + ('0.9',)
-	items[1] = urllib.unquote(items[1])
-	items[2] = items[2].split('/')[-1].strip()
-	return tuple(items)
-
-class HttpHeaders(object):
-	def __init__(self):
-		self._headers = defaultdict(list)
-		self.link()
-
-	def add(self, k, v):
-		self._headers[k.lower()].append(str(v).strip())
-
-	def remove(self, k):
-		if k.lower() in self._headers:
-			del self._headers[k.lower()]
-
-	def set(self, k, v):
-		self._headers[k.lower()] = [str(v).strip()]
-
-	def format(self):
-		s = []
-		for h, vs in self._headers.iteritems():
-			for v in vs:
-				s.append('%s: %s' % (h.title(), v))
-		return '\r\n'.join(s)
-	
-	def link(self):
-		self.items = self._headers.items
-		self.keys = self._headers.keys
-		self.values = self._headers.values
-		self.itervalues = self._headers.itervalues
-		self.iteritems = self._headers.iteritems
-
-	def parse(self, rawInput):
-		ws = ' \t'
-		heads = {}
-		curhead = None
-		curbuf = []
-		for line in rawInput.splitlines():
-			if not line.strip():
-				continue
-			if line[0] in ws:
-				curbuf.append(line.strip())
-			else:
-				if curhead:
-					heads.setdefault(curhead, []).append(' '.join(curbuf))
-				name, body = map(str.strip, line.split(':', 1))
-				curhead = name.lower()
-				curbuf = [body]
-		if curhead:
-			heads.setdefault(curhead, []).append(' '.join(curbuf))
-		self._headers = heads
-		self.link()
-
-	def __contains__(self, k):
-		return k.lower() in self._headers
-
-	def __getitem__(self, k):
-		return self._headers[k.lower()]
-
-	def get(self, k, d=None):
-		return self._headers.get(k.lower(), d)
-
-	def __iter__(self):
-		return self._headers
-
-	def __str__(self):
-		return self.format()
-
-class HttpRequest(object):
-	def __init__(self, cmd, url, version, id=None):
-		self.cmd = cmd
-		self.url = url
-		self.version = version
-		self.headers = None
-		self.body = None
-		self.id = id
-		
-	def format(self):	
-		return '%s %s HTTP/%s' % (self.cmd, self.url, self.version)
-		
-class HttpProtocolError(Exception): pass	
-class HttpClose(object): pass	
-
-class HttpServer(object):
-	def __init__(self, request_handler):
-		self.request_handler = request_handler
-
-	BODY_CHUNKED, BODY_CL, BODY_NONE = range(3)
-
-	def check_for_http_body(self, heads):
-		if heads.get('Transfer-Encoding') == ['chunked']:
-			return self.BODY_CHUNKED
-		elif 'Content-Length' in heads:
-			return self.BODY_CL
-		return self.BODY_NONE
-
-	def __call__(self, addr):
-		req_id = 1
-		while True:
-			chunks = []
-			try:
-				header_line = yield until_eol()
-			except ConnectionClosed:
-				break
-
-			cmd, url, version = parse_request_line(header_line)	
-			req = HttpRequest(cmd, url, version, req_id)
-			req_id += 1
-
-			header_block = yield until('\r\n\r\n')
-
-			heads = HttpHeaders()
-			heads.parse(header_block)
-			req.headers = heads
-
-			if req.version >= '1.1' and heads.get('Expect') == ['100-continue']:
-				yield 'HTTP/1.1 100 Continue\r\n\r\n'
-
-			more_mode = self.check_for_http_body(heads)
-
-			if more_mode is self.BODY_NONE:
-				req.body = None
-
-			elif more_mode is self.BODY_CL:
-				req.body = yield bytes(int(heads['Content-Length']))
-
-			elif more_mode is self.BODY_CHUNKED:
-				
-				chunks = []
-				while True:
-					chunk_head = yield until_eol()
-					if ';' in chunk_head:
-						# we don't support any chunk extensions
-						chunk_head = chunk_head[:chunk_head.find(';')]
-					size = int(chunk_head, 16)
-					if size == 0:
-						break
-					else:
-						chunks.append((yield bytes(size)))
-						_ = yield bytes(2) # ignore trailing CRLF
-
-				while True:
-					trailer = yield until_eol()
-					if trailer.strip():
-						req.headers.add(*tuple(trailer.split(':', 1)))
-					else:
-						req.body = ''.join(chunks)
-						req.headers.set('Content-Length', len(req.body))
-						req.headers.remove('Transfer-Encoding')
-
-			leave_loop = False
-			for i in self.request_handler(req): 
-				if i == HttpClose:
-					leave_loop = True
-				else:
-					yield i
-			if leave_loop:
-				break
-
-def http_response(req, code, heads, body):
-	if req.version <= '1.0' and req.headers.get('Connection') != 'Keep-Alive':
-		close = True
-	elif req.headers.get('Connection') == ['close'] or  \
-		heads.get('Connection') == ['close']:
-		close = True
-	else:
-		close = False
-		heads.set('Connection', 'keep-alive')
-	yield '''HTTP/%s %s %s\r\n%s\r\n\r\n''' % (
-    req.version, code, status_strings.get(code, "Unknown Status"), 
-	heads.format())
-	if body:
-		yield body
-	if close:
-		yield HttpClose
-
-from concussion import Client, call, response
-
-def handle_chunks(headers):
-	chunks = []
-	while True:
-		chunk_head = yield until_eol()
-		if ';' in chunk_head:
-			# we don't support any chunk extensions
-			chunk_head = chunk_head[:chunk_head.find(';')]
-		size = int(chunk_head, 16)
-		if size == 0:
-			break
-		else:
-			chunks.append((yield bytes(size)))
-			_ = yield bytes(2) # ignore trailing CRLF
-
-	while True:
-		trailer = yield until_eol()
-		if trailer.strip():
-			headers.add(*tuple(trailer.split(':', 1)))
-		else:
-			body = ''.join(chunks)
-			headers.set('Content-Length', len(body))
-			headers.remove('Transfer-Encoding')
-			yield up(body)
-			break
-
-class HttpClient(Client):
-	@call
-	def request(self, method, path, headers, body=None):
-		req = HttpRequest(method, path, '1.1')
-		
-		if body:
-			headers.set('Content-Length', len(body))
-		
-		yield '%s\r\n%s\r\n\r\n' % (req.format(), 
-		headers.format())
-
-		if body:	
-			yield body
-
-		resp_line = yield until_eol()
-		version, code, status = resp_line.split(None, 2)
-		code = int(code)
-
-		header_block = yield until('\r\n\r\n')
-		heads = HttpHeaders()
-		heads.parse(header_block)
-
-		if heads.get('Transfer-Encoding') == ['chunked']:
-			body = yield handle_chunks(heads)
-		else:
-			cl = int(heads.get('Content-Length', [0])[0])
-			if cl:
-				body = yield bytes(cl)
-			else:
-				body = None
-
-		if version < '1.0' or heads.get('Connection', ['keep-alive'])[0] == 'close':
-			self.close()
-		yield response((code, heads, body))

File concussion/protocols/wsgi.py

-"""A minimal WSGI container.
-"""
-import urlparse
-import os
-import cStringIO
-import traceback
-
-from concussion import Application, Service, log
-from concussion.protocols.http import HttpServer, HttpHeaders, http_response
-
-HOSTNAME = os.uname()[1] # win32?
-
-def cgiish_name(nm):
-	return nm.upper().replace('-', '_')
-
-class FileLikeErrorLogger:
-	def __init__(self, logger):
-		self.logger = logger
-
-	def write(self, s):
-		self.logger.error(s)
-
-	def writelines(self, lns):
-		self.logger.error('\n'.join(list(lns)))
-
-	def flush(self):
-		pass
-
-def build_wsgi_env(req, port):
-	url_info = urlparse.urlparse(req.url)
-	env = {}
-
-	# CGI bits
-	env['REQUEST_METHOD'] = req.cmd
-	env['SCRIPT_NAME'] = ''
-	env['PATH_INFO'] = url_info[2]
-	env['QUERY_STRING'] = url_info[4]
-	if 'Content-Type' in req.headers:
-		env['CONTENT_TYPE'] = req.headers['Content-Type'][0]
-	if 'Content-Length' in req.headers:
-		env['CONTENT_LENGTH'] = int(req.headers['Content-Length'][0])
-	env['SERVER_NAME'] = HOSTNAME
-	env['SERVER_PORT'] = port
-	env['SERVER_PROTOCOL'] = 'HTTP/' + req.version
-	for name, v in req.headers.iteritems():
-		env['HTTP_%s' % cgiish_name(name)] = v[0]
-
-	# WSGI-specific bits
-	env['wsgi.version'] = (1,0)
-	env['wsgi.url_scheme'] = 'http' # XXX incomplete
-	env['wsgi.input'] = cStringIO.StringIO(req.body or '')
-	env['wsgi.errors'] = FileLikeErrorLogger(log)
-	env['wsgi.multithread'] = False
-	env['wsgi.multiprocess'] = False
-	env['wsgi.run_once'] = False
-	return env
-
-class WSGIRequestHandler(object):
-	def __init__(self, app):
-		self.app = app
-
-	def _start_response(self, status, response_headers, exc_info=None):
-		if exc_info:
-			raise exc_info[0], exc_info[1], exc_info[2]
-		else:
-			self.status = status
-			self.response_headers = response_headers
-		return self.write_output
-
-	def write_output(self, output):
-		self.outbuf.append(output)
-
-	def __call__(self, req):
-		env = build_wsgi_env(req, self.app.port)
-		self.outbuf = []
-		for output in self.app.wsgi_callable(env, self._start_response):
-			self.write_output(output)
-		return self.finalize_request(req)
-
-	def finalize_request(self, req):
-		code = int(self.status.split()[0])
-		heads = HttpHeaders()
-		for n, v in self.response_headers:
-			heads.add(n, v)
-		body = ''.join(self.outbuf)
-		return http_response(req, code, heads, body)
-
-class WSGIApplication(Application):
-	def __init__(self, wsgi_callable, port=80, iface=''):
-		Application.__init__(self)
-		self.port = port
-		self.wsgi_callable = wsgi_callable
-		http_service = Service(HttpServer(WSGIRequestHandler(self)), port, iface)
-		self.add_service(http_service)
-
-if __name__ == '__main__':
-	def simple_app(environ, start_response):
-		"""Simplest possible application object"""
-		status = '200 OK'
-		response_headers = [('Content-type','text/plain')]
-		start_response(status, response_headers)
-		return ["Hello World!"]
-	app = WSGIApplication(simple_app, port=7080)
-	app.run()

File concussion/timers.py

-import event
-import time
-
-class Timer(object):
-	def __init__(self, t, f, *args):
-		tm = event.timeout(t, f, *args)
-		tm.add()
-		self._approx_fire = time.time() + t
-		self._ev_timer = tm
-
-	def cancel(self):
-		if self.pending:
-			self._ev_timer.delete()
-	
-	@property
-	def pending(self):
-		return self._ev_timer.pending()
-
-	@property
-	def countdown(self):
-		if self.pending:
-			return max(self._approx_fire - time.time(), 0)
-		return None
-
-def call_later(t, f, *args):
-	return Timer(t, f, *args)
-
-def call_every(t, f, *args):
-	event.timeout(t, _call_again, t, f, *args).add()
-
-def _call_again(t, f, *args):
-	f(*args)
-	event.timeout(t, _call_again, t, f, *args).add()

File diesel/__init__.py

+import logmod
+log = logmod.log
+from timers import call_later, call_every
+from core import until, until_eol, bytes, sleep, up, Connection, ConnectionClosed, Loop
+from app import Application, Service 
+from client import Client, call, response

File diesel/app.py

+import socket
+import event
+import traceback
+import os
+
+from diesel import logmod, log
+from diesel import timers
+from diesel import Connection
+from diesel import Loop
+
+class Application(object):
+	def __init__(self, logger=None):
+		self._run = False
+		if logger is None:
+			logger = logmod.Logger()
+		self.logger = logger
+		self.add_log = self.logger.add_log
+		self._services = []
+		self._loops = []
+
+	def run(self):
+		self._run = True
+		logmod.set_current_application(self)
+		log.info('Starting diesel application')
+
+		for s in self._services:
+			s.bind_and_listen()
+			event.event(s.accept_new_connection,
+			handle=s.sock, evtype=event.EV_READ | event.EV_PERSIST, arg=s).add()
+		for l in self._loops:
+			l.iterate()
+
+		def checkpoint():
+			if not self._run:
+				raise SystemExit
+
+		timers.call_every(1.0, checkpoint)
+		
+		self.setup()
+		while self._run:
+			try:
+				event.dispatch()
+			except SystemExit:
+				log.warn("-- SystemExit raised.. exiting main loop --")
+				break
+			except KeyboardInterrupt:
+				log.warn("-- KeyboardInterrupt raised.. exiting main loop --")
+				break
+			except Exception, e:
+				log.error("-- Unhandled Exception in main loop --")
+				log.error(traceback.format_exc())
+
+		log.info('Ending diesel application')
+
+	def add_service(self, service):
+		service.application = self
+		if self._run:
+			s.bind_and_listen()
+			event.event(s.accept_new_connection,
+			handle=s.sock, evtype=event.EV_READ | event.EV_PERSIST, arg=s).add()
+		else:
+			self._services.append(service)
+
+	def add_loop(self, loop, front=False):
+		loop.application = self
+		if self._run:
+			loop.iterate()
+		else:
+			if front:
+				self._loops.insert(0, loop)
+			else:
+				self._loops.append(loop)
+		
+	def halt(self):	
+		self._run = False
+
+	def setup(self):
+		pass
+
+class Service(object):
+	LQUEUE_SIZ = 500
+	def __init__(self, connection_handler, port, iface=''):
+		self.port = port
+		self.iface = iface
+		self.sock = None
+		self.connection_handler = connection_handler
+		self.application = None
+
+	def handle_cannot_bind(self, reason):
+		log.critical("service at %s:%s cannot bind: %s" % (self.iface or '*', 
+				self.port, reason))
+		raise
+
+	def bind_and_listen(self):
+		sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+		sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+
+		try:
+			sock.bind((self.iface, self.port))
+		except socket.error, e:
+			self.handle_cannot_bind(str(e))
+
+		sock.listen(self.LQUEUE_SIZ)
+		self.sock = sock
+
+	def _get_listening(self):
+		return self.sock is not None
+
+	listening = property(_get_listening)
+
+	def accept_new_connection(self, *args):
+		sock, addr = self.sock.accept()
+		Connection(sock, addr, self.connection_handler).iterate()

File diesel/buffer.py

+class Buffer(object):
+	def __init__(self):
+		self._atinbuf = []
+		self._atterm = None
+		self._atmark = 0
+		
+	def set_term(self, term):
+		self._atterm = term
+
+	def feed(self, data):
+		self._atinbuf.append(data)
+		self._atmark += len(data)
+		return self.check()
+
+	def check(self):
+		'''Look for the message
+		'''
+		ind = None
+		all = None
+		if type(self._atterm) is int:
+			if self._atmark >= self._atterm:
+				ind = self._atterm
+		else:
+			all = ''.join(self._atinbuf)
+			res = all.find(self._atterm)
+			if res != -1:
+				ind = res + len(self._atterm)
+		if ind is None:
+			return None
+		if all is None:
+			all = ''.join(self._atinbuf)
+		use = all[:ind]
+		new_all = all[ind:]
+		self._atinbuf = [new_all]
+		self._atmark = len(new_all)
+
+		return use

File diesel/client.py

+import socket
+from collections import deque
+
+class call(object):
+	def __init__(self, f, inst=None):
+		self.f = f
+		self.client = inst
+
+	def __call__(self, *args, **kw):
+		self.gen = self.f(self.client, *args, **kw)
+		return self
+
+	def __get__(self, inst, cls):
+		return call(self.f, inst)
+
+	def go(self, callback): # XXX errback-type stuff?
+		self.client.conn.callbacks.append(callback)
+		self.client.jobs.append(self.gen)
+		self.client.conn.wake()
+
+class response(object):
+	def __init__(self, value):
+		self.value = value
+
+class Client(object):
+	def __init__(self, connection_handler=None):
+		self.connection_handler = connection_handler or self.client_conn_handler
+		self.jobs = deque()
+		self.conn = None
+	 
+	def connect(self, addr, port):  
+		remote_addr = (addr, port)
+		sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+		sock.connect(remote_addr)
+		from diesel.core import Connection
+		self.conn = Connection(sock, (addr, port), self.client_conn_handler)
+		self.conn.iterate()
+
+	def close(self):
+		self.conn = None
+
+	@property
+	def is_closed(self):
+		return self.conn is None
+
+	def client_conn_handler(self, addr):
+		from diesel.core import sleep, ConnectionClosed
+		yield self.on_connect()
+
+		while True:
+			try:
+				if not self.jobs:
+					yield sleep()
+				if not self.jobs:
+					continue
+				mygen = self.jobs.popleft()
+				yield mygen
+			except ConnectionClosed:
+				self.close()
+				self.on_close()
+
+	def on_connect(self):
+		if 0: yield 0
+
+	def on_close(self):
+		pass

File diesel/core.py

+import socket
+import event
+from types import GeneratorType
+from collections import deque, defaultdict
+
+from diesel import pipeline
+from diesel import buffer
+from diesel import call_later
+from diesel.client import call, response
+
+class ConnectionClosed(socket.error): pass
+
+CRLF = '\r\n'
+BUFSIZ = 2 ** 14
+
+class until(object):
+	def __init__(self, sentinel):
+		self.sentinel = sentinel
+
+def until_eol():
+	return until(CRLF)
+
+class bytes(object):
+	def __init__(self, sentinel):
+		self.sentinel = sentinel
+
+class sleep(object):
+	def __init__(self, duration=0):
+		self.duration = duration
+
+class up(object):
+	def __init__(self, value):
+		self.value = value
+
+class wait(object):
+	def __init__(self, event):
+		self.event = event
+
+class fire(object):
+	def __init__(self, event, value):
+		self.event = event
+		self.value = value
+
+global_waits = defaultdict(set)
+	
+class Connection(object):
+	def __init__(self, sock, addr, connection_handler):
+		self.sock = sock
+		self.addr = addr
+		self.pipeline = pipeline.Pipeline()
+		self.buffer = buffer.Buffer()
+		self._rev = event.event(self.handle_read, handle=sock, evtype=event.EV_READ | event.EV_PERSIST, arg=None)
+		self._rev.add()
+		self._wev = None
+		self.g = self.cycle_all(connection_handler(addr))
+		self.callbacks = deque()
+		self._wakeup_timer = None
+
+	def cycle_all(self, current, error=None):
+		'''Effectively flattens all iterators.
+		'''
+		last = None
+		stack = []
+		while True:
+			try:
+				if error != None:
+					item = current.throw(*error)
+				elif last != None:
+					item = current.send(last)
+				else:
+					item = current.next()
+			except StopIteration:
+				if stack:
+					current = stack.pop()
+				else:
+					raise
+			else:
+				if type(item) is GeneratorType:
+					stack.append(current)
+					current = item
+					last = None
+				else:
+					if type(item) is response:
+						assert stack, "Cannot return a response from main handler"
+						current = stack.pop()
+					try:
+						last = (yield item)
+					except ConnectionClosed, e:
+						error = (ConnectionClosed, str(e))
+
+	def set_writable(self, val):
+		if val and self._wev is None:
+			self._wev = event.event(self.handle_write, handle=self.sock, evtype=event.EV_WRITE | event.EV_PERSIST, arg=None)
+			self._wev.add()
+		elif not val and self._wev is not None:
+			self._wev.delete()
+			self._wev = None
+
+	def shutdown(self, remote_closed=False):
+		if self._rev != None:
+			self._rev.delete()
+			self._rev = None
+
+		self.set_writable(False)
+
+		if remote_closed:
+			try:
+				self.g.throw(ConnectionClosed)
+			except StopIteration:
+				pass
+
+		self.g = None
+
+	def handle_write(self, ev, handle, evtype, _):
+		if not self.pipeline.empty:
+			try:
+				data = self.pipeline.read(BUFSIZ)
+			except pipeline.PipelineCloseRequest:
+				self.sock.close()
+				self.shutdown()
+			else:
+				try:
+					bsent = self.sock.send(data)
+				except socket.error, s:
+					g = self.g
+					self.shutdown(True)
+				else:
+					if bsent != len(data):
+						self.pipeline.backup(data[bsent:])
+
+					if not self.pipeline.empty:
+						return True
+					else:
+						self.set_writable(False)
+
+	def handle_read(self, ev, handle, evtype, _):
+		disconnect_reason = None
+		try:
+			data = self.sock.recv(BUFSIZ)
+		except socket.error, e:
+			data = ''
+			disconnect_reason = str(e)
+
+		if not data:
+			g = self.g
+			self.shutdown(True)
+		else:
+			res = self.buffer.feed(data)
+			if res:
+				self.iterate(res)
+
+	def schedule(self, value=None):
+		if self._wakeup_timer:
+			self._wakeup_timer.cancel()
+			self._wakeup_timer = call_later(0, self.wake, value)
+
+	def wake(self, value=None):
+		if self._wakeup_timer:
+			self._wakeup_timer.cancel()
+		self.iterate(value)
+
+	def iterate(self, n_val=None):
+		self._wakeup_timer = None
+		while True:
+			try:
+				if n_val is not None:
+					rets = self.g.send(n_val)
+				else:
+					rets = self.g.next()
+			except StopIteration:
+				if hasattr(self, 'sock'):
+					self.pipeline.close_request()
+				break
+			n_val = None
+			if type(rets) != tuple:
+				rets = (rets,)
+
+			exit = False
+			for ret in rets:
+				
+				if isinstance(ret, response):
+					c = self.callbacks.popleft()
+					c(ret.value)
+					assert len(rets) == 1, "response cannot be paired with any other yield token"
+					exit = True
+				elif isinstance(ret, call):
+					ret.go(self.iterate)
+					assert len(rets) == 1, "call cannot be paired with any other yield token"
+					exit = True
+				elif isinstance(ret, basestring) or hasattr(ret, 'seek'):
+					self.pipeline.add(ret)
+					assert len(rets) == 1, "a string or file cannot be paired with any other yield token"
+				elif type(ret) is up:
+					n_val = ret.value
+					assert len(rets) == 1, "up cannot be paired with any other yield token"
+				elif type(ret) is fire:
+					assert len(rets) == 1, "fire cannot be paired with any other yield token"
+					waiters = global_waits[ret.event]
+					for w in waiters:
+						w(ret)
+					global_waits[fire.event] = set()
+				elif type(ret) is until or type(ret) is bytes:
+					self.buffer.set_term(ret.sentinel)
+					n_val = self.buffer.check()
+					if n_val == None:
+						exit = True
+				elif type(ret) is sleep:
+					if ret.duration:
+						self._wakeup_timer = call_later(ret.duration, self.wake, ret)
+					exit = True
+				elif type(ret) is wait:
+					global_waits[ret.event].add(self.schedule)
+					exit = True
+				if exit: 
+					break
+			if exit: 
+				break
+
+		if hasattr(self, 'sock') and not self.pipeline.empty:
+			self.set_writable(True)
+
+class Loop(Connection):
+	'''A way to write a connection-less loop.
+
+	XXX
+	This is probably upside down right now.  Fix it eventually.
+	'''
+	def __init__(self, loop_callable):
+		self.g = self.cycle_all(loop_callable())
+		self._wakeup_timer = None

File diesel/logmod.py

+import sys
+import time
+
+_current_application = None
+(
+LOGLVL_DEBUG,
+LOGLVL_INFO,
+LOGLVL_WARN,
+LOGLVL_ERR,
+LOGLVL_CRITICAL,
+) = range(1,6)
+
+_lvl_text = {
+	LOGLVL_DEBUG : 'debug',
+	LOGLVL_INFO : 'info',
+	LOGLVL_WARN : 'warn',
+	LOGLVL_ERR : 'error',
+	LOGLVL_CRITICAL : 'critical',
+}
+	
+
+class Logger:
+	def __init__(self, fd=sys.stdout, verbosity=LOGLVL_WARN):
+		self.fdlist = [fd]
+		self.level = verbosity
+		self.component = None
+
+	def add_log(self, fd):
+		self.fdlist.append(fd)
+
+	def _writelogline(self, lvl, message):
+		if lvl >= self.level:
+			for fd in self.fdlist:
+				fd.write('[%s] {%s%s} %s\n' % (time.asctime(), 
+										self.component and ('%s:' % self.component) or '',
+										_lvl_text[lvl],
+										message))
+
+	debug = lambda s, m: s._writelogline(LOGLVL_DEBUG, m)
+	info = lambda s, m: s._writelogline(LOGLVL_INFO, m)
+	warn = lambda s, m: s._writelogline(LOGLVL_WARN, m)
+	error = lambda s, m: s._writelogline(LOGLVL_ERR, m)
+	critical = lambda s, m: s._writelogline(LOGLVL_CRITICAL, m)
+
+	def get_sublogger(self, component, verbosity=None):
+		copy = Logger(verbosity=verbosity or self.level)
+		copy.fdlist = self.fdlist
+		copy.component = component
+		return copy
+
+def set_current_application(app):
+	global _current_application
+	_current_application = app
+
+class _currentLogger:
+	def __getattr__(self, n):
+		return getattr(_current_application.logger, n)
+
+log = _currentLogger()

File diesel/pipeline.py

+try:
+	import cStringIO
+except ImportError:
+	raise ImportError, "cStringIO is required"
+
+_obj_SIO = cStringIO.StringIO
+_type_SIO = cStringIO.OutputType
+def make_SIO(d):
+	t = _obj_SIO()
+	t.write(d)
+	t.seek(0)
+	return t
+
+def get_file_length(f):
+	m = f.tell()
+	f.seek(0, 2)
+	r = f.tell()
+	f.seek(m)
+	return r
+
+class PipelineLimitReached(Exception): pass
+class PipelineCloseRequest(Exception): pass
+class PipelineClosed(Exception): pass
+	
+class Pipeline(object):
+	def __init__(self, limit=0):
+		self.line = []
+		self.limit = limit
+		self.used = 0
+		self.callbacks = []
+		self.want_close = False
+
+	def add(self, d):
+		if self.want_close:
+			raise PipelineClosed
+
+		if self.limit > 0 and self.used >= self.limit:
+			raise PipelineLimitReached
+
+		if type(d) is str:
+			if self.line and type(self.line[-1][0]) is _type_SIO and \
+			(self.limit == 0 or self.line[-1][1] < (self.limit / 2)):
+				fd, l = self.line[-1]
+				m = fd.tell()
+				fd.seek(0, 2)
+				fd.write(d)
+				fd.seek(m)
+				self.line[-1] = [fd, l + len(d)]
+			else:
+				self.line.append([make_SIO(d), len(d)])
+			self.used += len(d)
+		else:
+			self.line.append([d, get_file_length(d)])
+
+	def close_request(self):
+		self.want_close = True
+
+	def read(self, amt):
+		if self.line == [] and self.want_close:
+			raise PipelineCloseRequest
+
+		rbuf = []
+		read = 0
+		while self.line and read < amt:
+			data = self.line[0][0].read(amt - read)
+			if data == '':
+				if type(self.line[0][0]) is _type_SIO:
+					self.used -= self.line[0][1]
+				del self.line[0]
+			else:
+				rbuf.append(data)
+				read += len(data)
+
+		while self.line and self.line[0][1] == self.line[0][0].tell():
+			self.used -= self.line[0][1]
+			del self.line[0]
+
+		while self.callbacks and self.used < self.limit:
+			self.callbacks.pop(0).callback(self)
+
+		return ''.join(rbuf)
+	
+	def backup(self, d):
+		self.line.insert(0, [make_SIO(d), len(d)])
+		self.used += len(d)
+
+	def _get_empty(self):
+		return self.want_close == False and self.line == []
+	empty = property(_get_empty)
+
+	def _get_full(self):
+		return self.used == 0 or self.used >= self.limit
+	full = property(_get_full)

File diesel/protocols/__init__.py

Empty file added.

File diesel/protocols/http.py

+import sys, socket
+import urllib
+from collections import defaultdict
+
+from diesel import up, until, until_eol, bytes, ConnectionClosed
+
+status_strings = {
+100 : "Continue",
+101 : "Switching Protocols",
+200 : "OK",
+201 : "Created",
+202 : "Accepted",
+203 : "Non-Authoritative Information",
+204 : "No Content",
+205 : "Reset Content",
+206 : "Partial Content",
+300 : "Multiple Choices",
+301 : "Moved Permanently",
+302 : "Found",
+303 : "See Other",
+304 : "Not Modified",
+305 : "Use Proxy",
+307 : "Temporary Redirect",
+400 : "Bad Request",
+401 : "Unauthorized",
+402 : "Payment Required",
+403 : "Forbidden",
+404 : "Not Found",
+405 : "Method Not Allowed",
+406 : "Not Acceptable",
+407 : "Proxy Authentication Required",
+408 : "Request Time-out",
+409 : "Conflict",
+410 : "Gone",
+411 : "Length Required",
+412 : "Precondition Failed",