Anonymous avatar Anonymous committed b7abedd

\t -> ' '

Comments (0)

Files changed (21)

diesel/__init__.py

+# vim:ts=4:sw=4:expandtab
 import logmod
 log = logmod.log
 from logmod import Logger, LOGLVL_DEBUG, LOGLVL_INFO, LOGLVL_WARN, LOGLVL_ERR, LOGLVL_CRITICAL
+# vim:ts=4:sw=4:expandtab
 import socket
 import traceback
 import os
 current_app = None
 
 class Application(object):
-	def __init__(self, logger=None):
-		global current_app
-		current_app = self
-		self.hub = EventHub()
-		self._run = False
-		if logger is None:
-			logger = logmod.Logger()
-		self.logger = logger
-		self._services = []
-		self._loops = []
+    def __init__(self, logger=None):
+        global current_app
+        current_app = self
+        self.hub = EventHub()
+        self._run = False
+        if logger is None:
+            logger = logmod.Logger()
+        self.logger = logger
+        self._services = []
+        self._loops = []
 
-	def run(self):
-		self._run = True
-		logmod.set_current_application(self)
-		log.info('Starting diesel application')
+    def run(self):
+        self._run = True
+        logmod.set_current_application(self)
+        log.info('Starting diesel application')
 
-		for s in self._services:
-			s.bind_and_listen()
-			self.hub.register(s.sock, s.accept_new_connection, None)
-		for l in self._loops:
-			l.iterate()
+        for s in self._services:
+            s.bind_and_listen()
+            self.hub.register(s.sock, s.accept_new_connection, None)
+        for l in self._loops:
+            l.iterate()
 
-		self.setup()
-		while self._run:
-			try:
-				self.hub.handle_events()
-			except SystemExit:
-				log.warn("-- SystemExit raised.. exiting main loop --")
-				break
-			except KeyboardInterrupt:
-				log.warn("-- KeyboardInterrupt raised.. exiting main loop --")
-				break
-			except Exception, e:
-				log.error("-- Unhandled Exception in main loop --")
-				log.error(traceback.format_exc())
+        self.setup()
+        while self._run:
+            try:
+                self.hub.handle_events()
+            except SystemExit:
+                log.warn("-- SystemExit raised.. exiting main loop --")
+                break
+            except KeyboardInterrupt:
+                log.warn("-- KeyboardInterrupt raised.. exiting main loop --")
+                break
+            except Exception, e:
+                log.error("-- Unhandled Exception in main loop --")
+                log.error(traceback.format_exc())
 
-		log.info('Ending diesel application')
+        log.info('Ending diesel application')
 
-	def add_service(self, service):
-		service.application = self
-		if self._run:
-			s.bind_and_listen()
-			self.hub.register(s.sock, s.accept_new_connection, None)
-		else:
-			self._services.append(service)
+    def add_service(self, service):
+        service.application = self
+        if self._run:
+            s.bind_and_listen()
+            self.hub.register(s.sock, s.accept_new_connection, None)
+        else:
+            self._services.append(service)
 
-	def add_loop(self, loop, front=False):
-		loop.application = self
-		if self._run:
-			loop.iterate()
-		else:
-			if front:
-				self._loops.insert(0, loop)
-			else:
-				self._loops.append(loop)
-		
-	def halt(self):	
-		self.hub.run = False
-		self._run = False
+    def add_loop(self, loop, front=False):
+        loop.application = self
+        if self._run:
+            loop.iterate()
+        else:
+            if front:
+                self._loops.insert(0, loop)
+            else:
+                self._loops.append(loop)
+        
+    def halt(self):    
+        self.hub.run = False
+        self._run = False
 
-	def setup(self):
-		pass
+    def setup(self):
+        pass
 
 class Service(object):
-	LQUEUE_SIZ = 500
-	def __init__(self, connection_handler, port, iface=''):
-		self.port = port
-		self.iface = iface
-		self.sock = None
-		self.connection_handler = connection_handler
-		self.application = None
+    LQUEUE_SIZ = 500
+    def __init__(self, connection_handler, port, iface=''):
+        self.port = port
+        self.iface = iface
+        self.sock = None
+        self.connection_handler = connection_handler
+        self.application = None
 
-	def handle_cannot_bind(self, reason):
-		log.critical("service at %s:%s cannot bind: %s" % (self.iface or '*', 
-				self.port, reason))
-		raise
+    def handle_cannot_bind(self, reason):
+        log.critical("service at %s:%s cannot bind: %s" % (self.iface or '*', 
+                self.port, reason))
+        raise
 
-	def bind_and_listen(self):
-		sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-		sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+    def bind_and_listen(self):
+        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 
-		try:
-			sock.bind((self.iface, self.port))
-		except socket.error, e:
-			self.handle_cannot_bind(str(e))
+        try:
+            sock.bind((self.iface, self.port))
+        except socket.error, e:
+            self.handle_cannot_bind(str(e))
 
-		sock.listen(self.LQUEUE_SIZ)
-		self.sock = sock
+        sock.listen(self.LQUEUE_SIZ)
+        self.sock = sock
 
-	def _get_listening(self):
-		return self.sock is not None
+    def _get_listening(self):
+        return self.sock is not None
 
-	listening = property(_get_listening)
+    listening = property(_get_listening)
 
-	def accept_new_connection(self):
-		sock, addr = self.sock.accept()
-		Connection(sock, addr, self.connection_handler).iterate()
+    def accept_new_connection(self):
+        sock, addr = self.sock.accept()
+        Connection(sock, addr, self.connection_handler).iterate()
+# vim:ts=4:sw=4:expandtab
 class Buffer(object):
-	def __init__(self):
-		self._atinbuf = []
-		self._atterm = None
-		self._atmark = 0
-		
-	def set_term(self, term):
-		self._atterm = term
+    def __init__(self):
+        self._atinbuf = []
+        self._atterm = None
+        self._atmark = 0
+        
+    def set_term(self, term):
+        self._atterm = term
 
-	def feed(self, data):
-		self._atinbuf.append(data)
-		self._atmark += len(data)
-		return self.check()
+    def feed(self, data):
+        self._atinbuf.append(data)
+        self._atmark += len(data)
+        return self.check()
 
-	def check(self):
-		'''Look for the message
-		'''
-		ind = None
-		all = None
-		if type(self._atterm) is int:
-			if self._atmark >= self._atterm:
-				ind = self._atterm
-		elif self._atterm is None:
-			return None
-		else:
-			all = ''.join(self._atinbuf)
-			res = all.find(self._atterm)
-			if res != -1:
-				ind = res + len(self._atterm)
-		if ind is None:
-			return None
-		if all is None:
-			all = ''.join(self._atinbuf)
-		use = all[:ind]
-		new_all = all[ind:]
-		self._atinbuf = [new_all]
-		self._atmark = len(new_all)
+    def check(self):
+        '''Look for the message
+        '''
+        ind = None
+        all = None
+        if type(self._atterm) is int:
+            if self._atmark >= self._atterm:
+                ind = self._atterm
+        elif self._atterm is None:
+            return None
+        else:
+            all = ''.join(self._atinbuf)
+            res = all.find(self._atterm)
+            if res != -1:
+                ind = res + len(self._atterm)
+        if ind is None:
+            return None
+        if all is None:
+            all = ''.join(self._atinbuf)
+        use = all[:ind]
+        new_all = all[ind:]
+        self._atinbuf = [new_all]
+        self._atmark = len(new_all)
 
-		return use
+        return use
+# vim:ts=4:sw=4:expandtab
 import socket
 from collections import deque
 
 class call(object):
-	def __init__(self, f, inst=None):
-		self.f = f
-		self.client = inst
+    def __init__(self, f, inst=None):
+        self.f = f
+        self.client = inst
 
-	def __call__(self, *args, **kw):
-		self.gen = self.f(self.client, *args, **kw)
-		return self
+    def __call__(self, *args, **kw):
+        self.gen = self.f(self.client, *args, **kw)
+        return self
 
-	def __get__(self, inst, cls):
-		return call(self.f, inst)
+    def __get__(self, inst, cls):
+        return call(self.f, inst)
 
-	def go(self, callback): # XXX errback-type stuff?
-		self.client.conn.callbacks.append(callback)
-		self.client.jobs.append(self.gen)
-		self.client.conn.wake()
+    def go(self, callback): # XXX errback-type stuff?
+        self.client.conn.callbacks.append(callback)
+        self.client.jobs.append(self.gen)
+        self.client.conn.wake()
 
 class response(object):
-	def __init__(self, value):
-		self.value = value
+    def __init__(self, value):
+        self.value = value
 
 class Client(object):
-	def __init__(self, connection_handler=None):
-		self.connection_handler = connection_handler or self.client_conn_handler
-		self.jobs = deque()
-		self.conn = None
-	 
-	def connect(self, addr, port):  
-		remote_addr = (addr, port)
-		sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-		sock.connect(remote_addr)
-		from diesel.core import Connection
-		self.conn = Connection(sock, (addr, port), self.client_conn_handler)
-		self.conn.iterate()
+    def __init__(self, connection_handler=None):
+        self.connection_handler = connection_handler or self.client_conn_handler
+        self.jobs = deque()
+        self.conn = None
+     
+    def connect(self, addr, port):  
+        remote_addr = (addr, port)
+        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        sock.connect(remote_addr)
+        from diesel.core import Connection
+        self.conn = Connection(sock, (addr, port), self.client_conn_handler)
+        self.conn.iterate()
 
-	def close(self):
-		self.conn = None
+    def close(self):
+        self.conn = None
 
-	@property
-	def is_closed(self):
-		return self.conn is None
+    @property
+    def is_closed(self):
+        return self.conn is None
 
-	def client_conn_handler(self, addr):
-		from diesel.core import sleep, ConnectionClosed
-		yield self.on_connect()
+    def client_conn_handler(self, addr):
+        from diesel.core import sleep, ConnectionClosed
+        yield self.on_connect()
 
-		while True:
-			try:
-				if not self.jobs:
-					yield sleep()
-				if not self.jobs:
-					continue
-				mygen = self.jobs.popleft()
-				yield mygen
-			except ConnectionClosed:
-				self.close()
-				self.on_close()
-				break
+        while True:
+            try:
+                if not self.jobs:
+                    yield sleep()
+                if not self.jobs:
+                    continue
+                mygen = self.jobs.popleft()
+                yield mygen
+            except ConnectionClosed:
+                self.close()
+                self.on_close()
+                break
 
-	def on_connect(self):
-		if 0: yield 0
+    def on_connect(self):
+        if 0: yield 0
 
-	def on_close(self):
-		pass
+    def on_close(self):
+        pass
+# vim:ts=4:sw=4:expandtab
 import socket
 from types import GeneratorType
 from collections import deque, defaultdict
 BUFSIZ = 2 ** 14
 
 class until(object):
-	def __init__(self, sentinel):
-		self.sentinel = sentinel
+    def __init__(self, sentinel):
+        self.sentinel = sentinel
 
 def until_eol():
-	return until(CRLF)
+    return until(CRLF)
 
 class bytes(object):
-	def __init__(self, sentinel):
-		self.sentinel = sentinel
+    def __init__(self, sentinel):
+        self.sentinel = sentinel
 
 class sleep(object):
-	def __init__(self, duration=0):
-		self.duration = duration
+    def __init__(self, duration=0):
+        self.duration = duration
 
 class up(object):
-	def __init__(self, value):
-		self.value = value
+    def __init__(self, value):
+        self.value = value
 
 class wait(object):
-	def __init__(self, event):
-		self.event = event
+    def __init__(self, event):
+        self.event = event
 
 class fire(object):
-	def __init__(self, event, value=None):
-		self.event = event
-		self.value = value
+    def __init__(self, event, value=None):
+        self.event = event
+        self.value = value
 
 class WaitPool(object):
-	def __init__(self):
-		self.waits = defaultdict(set)
-		self.loop_refs = defaultdict(set)
+    def __init__(self):
+        self.waits = defaultdict(set)
+        self.loop_refs = defaultdict(set)
 
-	def wait(self, who, what):
-		self.waits[what].add(who)
-		self.loop_refs[who].add(what)
+    def wait(self, who, what):
+        self.waits[what].add(who)
+        self.loop_refs[who].add(what)
 
-	def fire(self, what, value):
-		for handler in self.waits[what].copy():
-			handler.fire(what, value)
+    def fire(self, what, value):
+        for handler in self.waits[what].copy():
+            handler.fire(what, value)
 
-	def clear(self, who):
-		for what in self.loop_refs[who]:
-			self.waits[what].remove(who)
-		del self.loop_refs[who]
+    def clear(self, who):
+        for what in self.loop_refs[who]:
+            self.waits[what].remove(who)
+        del self.loop_refs[who]
 
 waits = WaitPool()
 
 class NoPipeline(object):
-	def __getattr__(self, *args, **kw):
-		return ValueError("Cannot write to the outgoing pipeline for socketless Loops (yield string, file)")
-	empty = True
+    def __getattr__(self, *args, **kw):
+        return ValueError("Cannot write to the outgoing pipeline for socketless Loops (yield string, file)")
+    empty = True
 
 class NoBuffer(object):
-	def __getattr__(self, *args, **kw):
-		return ValueError("Cannot check incoming buffer on socketless Loops (yield until, bytes, etc)")
+    def __getattr__(self, *args, **kw):
+        return ValueError("Cannot check incoming buffer on socketless Loops (yield until, bytes, etc)")
 
 def id_gen():
-	x = 1
-	while True:
-		yield x
-		x += 1
+    x = 1
+    while True:
+        yield x
+        x += 1
 ids = id_gen()
 
 class Loop(object):
-	'''A cooperative generator.
-	'''
-	def __init__(self, loop_callable, *callable_args):
-		self.g = self.cycle_all(loop_callable(*callable_args))
-		self.pipeline = NoPipeline()
-		self.buffer = NoBuffer()
-		from diesel.app import current_app
-		self.hub = current_app.hub
-		self.id = ids.next()
-		self._wakeup_timer = None
-		self.fire_handlers = {}
+    '''A cooperative generator.
+    '''
+    def __init__(self, loop_callable, *callable_args):
+        self.g = self.cycle_all(loop_callable(*callable_args))
+        self.pipeline = NoPipeline()
+        self.buffer = NoBuffer()
+        from diesel.app import current_app
+        self.hub = current_app.hub
+        self.id = ids.next()
+        self._wakeup_timer = None
+        self.fire_handlers = {}
 
-	def __hash__(self):
-		return self.id
+    def __hash__(self):
+        return self.id
 
-	def __eq__(self, other):
-		return other.id == self.id
+    def __eq__(self, other):
+        return other.id == self.id
 
-	def fire(self, what, value):
-		if what in self.fire_handlers:
-			handler = self.fire_handlers.pop(what)
-			self.fire_handlers = {}
-			handler(value)
+    def fire(self, what, value):
+        if what in self.fire_handlers:
+            handler = self.fire_handlers.pop(what)
+            self.fire_handlers = {}
+            handler(value)
 
-	def cycle_all(self, current, error=None):
-		'''Effectively flattens all iterators.
-		'''
-		last = None
-		stack = []
-		while True:
-			try:
-				if error != None:
-					item = current.throw(*error)
-				elif last != None:
-					item = current.send(last)
-				else:
-					item = current.next()
-			except StopIteration:
-				if stack:
-					current = stack.pop()
-				else:
-					raise
-			except Exception, e:
-				if stack:
-					error = e.__class__, str(e)
-					current = stack.pop()
-				else:
-					raise
-			else:
-				if type(item) is GeneratorType:
-					stack.append(current)
-					current = item
-					last = None
-				else:
-					if type(item) is response:
-						assert stack, "Cannot return a response from main handler"
-						current = stack.pop()
-					try:
-						last = (yield item)
-					except ConnectionClosed, e:
-						error = (ConnectionClosed, str(e))
+    def cycle_all(self, current, error=None):
+        '''Effectively flattens all iterators.
+        '''
+        last = None
+        stack = []
+        while True:
+            try:
+                if error != None:
+                    item = current.throw(*error)
+                elif last != None:
+                    item = current.send(last)
+                else:
+                    item = current.next()
+            except StopIteration:
+                if stack:
+                    current = stack.pop()
+                else:
+                    raise
+            except Exception, e:
+                if stack:
+                    error = e.__class__, str(e)
+                    current = stack.pop()
+                else:
+                    raise
+            else:
+                if type(item) is GeneratorType:
+                    stack.append(current)
+                    current = item
+                    last = None
+                else:
+                    if type(item) is response:
+                        assert stack, "Cannot return a response from main handler"
+                        current = stack.pop()
+                    try:
+                        last = (yield item)
+                    except ConnectionClosed, e:
+                        error = (ConnectionClosed, str(e))
 
-	def multi_callin(self, pos, tot, real_f=None):
-		real_f = real_f or self.wake
-		if tot == 1:
-			return real_f
-		def f(res):
-			real_arg = [None] * tot
-			real_arg[pos] = res
-			return real_f(tuple(real_arg))
-		return f
+    def multi_callin(self, pos, tot, real_f=None):
+        real_f = real_f or self.wake
+        if tot == 1:
+            return real_f
+        def f(res):
+            real_arg = [None] * tot
+            real_arg[pos] = res
+            return real_f(tuple(real_arg))
+        return f
 
-	def iterate(self, n_val=None):
-		while True:
-			try:
-				if n_val is not None:
-					rets = self.g.send(n_val)
-				else:
-					rets = self.g.next()
-			except StopIteration:
-				if hasattr(self, 'sock'):
-					self.pipeline.close_request()
-				break
-			n_val = None
-			if type(rets) != tuple:
-				rets = (rets,)
+    def iterate(self, n_val=None):
+        while True:
+            try:
+                if n_val is not None:
+                    rets = self.g.send(n_val)
+                else:
+                    rets = self.g.next()
+            except StopIteration:
+                if hasattr(self, 'sock'):
+                    self.pipeline.close_request()
+                break
+            n_val = None
+            if type(rets) != tuple:
+                rets = (rets,)
 
-			exit = False
-			used_term = False
-			nrets = len(rets)
-			for pos, ret in enumerate(rets):
-				
-				if isinstance(ret, response):
-					assert nrets == 1, "response cannot be paired with any other yield token"
-					c = self.callbacks.popleft()
-					c(ret.value)
-					exit = True
-				elif isinstance(ret, call):
-					assert nrets == 1, "call cannot be paired with any other yield token"
-					ret.go(self.iterate)
-					exit = True
-				elif isinstance(ret, basestring) or hasattr(ret, 'seek'):
-					assert nrets == 1, "a string or file cannot be paired with any other yield token"
-					self.pipeline.add(ret)
-				elif type(ret) is up:
-					assert nrets == 1, "up cannot be paired with any other yield token"
-					n_val = ret.value
-				elif type(ret) is fire:
-					assert nrets == 1, "fire cannot be paired with any other yield token"
-					waits.fire(ret.event, ret.value)
-				elif type(ret) is until or type(ret) is bytes:
-					assert used_term == False, "only one terminal specifier (bytes, until) per yield"
-					used_term = True
-					self.buffer.set_term(ret.sentinel)
-					n_val = self.buffer.check()
-					if n_val == None:
-						exit = True
-						self.new_data = self.multi_callin(pos, nrets)
-					else:
-						if nrets > 1:
-							t = [None] * nrets
-							t[pos] = n_val
-							n_val = tuple(t)
-						self.clear_pending_events()
-						exit = False
-						break
+            exit = False
+            used_term = False
+            nrets = len(rets)
+            for pos, ret in enumerate(rets):
+                
+                if isinstance(ret, response):
+                    assert nrets == 1, "response cannot be paired with any other yield token"
+                    c = self.callbacks.popleft()
+                    c(ret.value)
+                    exit = True
+                elif isinstance(ret, call):
+                    assert nrets == 1, "call cannot be paired with any other yield token"
+                    ret.go(self.iterate)
+                    exit = True
+                elif isinstance(ret, basestring) or hasattr(ret, 'seek'):
+                    assert nrets == 1, "a string or file cannot be paired with any other yield token"
+                    self.pipeline.add(ret)
+                elif type(ret) is up:
+                    assert nrets == 1, "up cannot be paired with any other yield token"
+                    n_val = ret.value
+                elif type(ret) is fire:
+                    assert nrets == 1, "fire cannot be paired with any other yield token"
+                    waits.fire(ret.event, ret.value)
+                elif type(ret) is until or type(ret) is bytes:
+                    assert used_term == False, "only one terminal specifier (bytes, until) per yield"
+                    used_term = True
+                    self.buffer.set_term(ret.sentinel)
+                    n_val = self.buffer.check()
+                    if n_val == None:
+                        exit = True
+                        self.new_data = self.multi_callin(pos, nrets)
+                    else:
+                        if nrets > 1:
+                            t = [None] * nrets
+                            t[pos] = n_val
+                            n_val = tuple(t)
+                        self.clear_pending_events()
+                        exit = False
+                        break
 
-				elif type(ret) is sleep:
-					self._wakeup_timer = self.hub.call_later(ret.duration, self.multi_callin(pos, nrets), True)
-					exit = True
+                elif type(ret) is sleep:
+                    self._wakeup_timer = self.hub.call_later(ret.duration, self.multi_callin(pos, nrets), True)
+                    exit = True
 
-				elif type(ret) is wait:
-					self.fire_handlers[ret.event] = self.multi_callin(pos, nrets, self.schedule)
-					waits.wait(self, ret.event)
-					exit = True
-			if exit: 
-				break
+                elif type(ret) is wait:
+                    self.fire_handlers[ret.event] = self.multi_callin(pos, nrets, self.schedule)
+                    waits.wait(self, ret.event)
+                    exit = True
+            if exit: 
+                break
 
-		if not self.pipeline.empty:
-			self.set_writable(True)
+        if not self.pipeline.empty:
+            self.set_writable(True)
 
-	def clear_pending_events(self):
-		if self._wakeup_timer and self._wakeup_timer.pending:
-			self._wakeup_timer.cancel()
-		self.fire_handlers = {}
-		waits.clear(self)
+    def clear_pending_events(self):
+        if self._wakeup_timer and self._wakeup_timer.pending:
+            self._wakeup_timer.cancel()
+        self.fire_handlers = {}
+        waits.clear(self)
 
-	def schedule(self, value=None):
-		self.clear_pending_events()
-		self._wakeup_timer = self.hub.call_later(0, self.wake, value)
+    def schedule(self, value=None):
+        self.clear_pending_events()
+        self._wakeup_timer = self.hub.call_later(0, self.wake, value)
 
-	def wake(self, value=None):
-		self.clear_pending_events()
-		self.iterate(value)
+    def wake(self, value=None):
+        self.clear_pending_events()
+        self.iterate(value)
 
 class Connection(Loop):
-	'''A cooperative loop hooked up to a socket.
-	'''
-	def __init__(self, sock, addr, connection_handler):
-		Loop.__init__(self, connection_handler, addr)
-		self.pipeline = pipeline.Pipeline()
-		self.buffer = buffer.Buffer()
-		self.sock = sock
-		self.addr = addr
-		self.hub.register(sock, self.handle_read, self.handle_write)
-		self._wakeup_timer = None
-		self._writable = False
-		self.callbacks = deque()
-		self.closed = False
+    '''A cooperative loop hooked up to a socket.
+    '''
+    def __init__(self, sock, addr, connection_handler):
+        Loop.__init__(self, connection_handler, addr)
+        self.pipeline = pipeline.Pipeline()
+        self.buffer = buffer.Buffer()
+        self.sock = sock
+        self.addr = addr
+        self.hub.register(sock, self.handle_read, self.handle_write)
+        self._wakeup_timer = None
+        self._writable = False
+        self.callbacks = deque()
+        self.closed = False
 
-	def set_writable(self, val):
-		if self.closed:
-			return
-		if val and not self._writable:
-			self.hub.enable_write(self.sock)
-			self._writable = True
-			return
-		if not val and self._writable:
-			self.hub.disable_write(self.sock)
-			self._writable = False
+    def set_writable(self, val):
+        if self.closed:
+            return
+        if val and not self._writable:
+            self.hub.enable_write(self.sock)
+            self._writable = True
+            return
+        if not val and self._writable:
+            self.hub.disable_write(self.sock)
+            self._writable = False
 
-	def shutdown(self, remote_closed=False):
-		self.hub.unregister(self.sock)
-		self.closed = True
-		if not remote_closed:
-			self.sock.close()
-		else:
-			try:
-				self.g.throw(ConnectionClosed)
-			except StopIteration:
-				pass
+    def shutdown(self, remote_closed=False):
+        self.hub.unregister(self.sock)
+        self.closed = True
+        if not remote_closed:
+            self.sock.close()
+        else:
+            try:
+                self.g.throw(ConnectionClosed)
+            except StopIteration:
+                pass
 
-		self.g = None
+        self.g = None
 
-	def handle_write(self):
-		if not self.pipeline.empty:
-			try:
-				data = self.pipeline.read(BUFSIZ)
-			except pipeline.PipelineCloseRequest:
-				self.shutdown()
-			else:
-				try:
-					bsent = self.sock.send(data)
-				except socket.error, s:
-					g = self.g
-					self.shutdown(True)
-				else:
-					if bsent != len(data):
-						self.pipeline.backup(data[bsent:])
+    def handle_write(self):
+        if not self.pipeline.empty:
+            try:
+                data = self.pipeline.read(BUFSIZ)
+            except pipeline.PipelineCloseRequest:
+                self.shutdown()
+            else:
+                try:
+                    bsent = self.sock.send(data)
+                except socket.error, s:
+                    g = self.g
+                    self.shutdown(True)
+                else:
+                    if bsent != len(data):
+                        self.pipeline.backup(data[bsent:])
 
-					if not self.pipeline.empty:
-						return True
-					else:
-						self.set_writable(False)
+                    if not self.pipeline.empty:
+                        return True
+                    else:
+                        self.set_writable(False)
 
-	def handle_read(self):
-		disconnect_reason = None
-		try:
-			data = self.sock.recv(BUFSIZ)
-		except socket.error, e:
-			data = ''
-			disconnect_reason = str(e)
+    def handle_read(self):
+        disconnect_reason = None
+        try:
+            data = self.sock.recv(BUFSIZ)
+        except socket.error, e:
+            data = ''
+            disconnect_reason = str(e)
 
-		if not data:
-			g = self.g
-			self.shutdown(True)
-		else:
-			res = self.buffer.feed(data)
-			if res:
-				self.new_data(res)
+        if not data:
+            g = self.g
+            self.shutdown(True)
+        else:
+            res = self.buffer.feed(data)
+            if res:
+                self.new_data(res)
+# vim:ts=4:sw=4:expandtab
 import select
 from select import EPOLLIN, EPOLLOUT, EPOLLPRI
 from collections import deque
 from time import time
 
 class Timer(object):
-	ALLOWANCE = 0.03
-	def __init__(self, interval, f, *args, **kw):
-		self.trigger_time = time() + interval
-		self.f = f
-		self.args = args
-		self.kw = kw
-		self.pending = True
+    ALLOWANCE = 0.03
+    def __init__(self, interval, f, *args, **kw):
+        self.trigger_time = time() + interval
+        self.f = f
+        self.args = args
+        self.kw = kw
+        self.pending = True
 
-	def cancel(self):
-		self.pending = False
+    def cancel(self):
+        self.pending = False
 
-	def callback(self):
-		self.pending = False
-		return self.f(*self.args, **self.kw)
+    def callback(self):
+        self.pending = False
+        return self.f(*self.args, **self.kw)
 
-	@property
-	def due(self):
-		return (self.trigger_time - time()) < self.ALLOWANCE
+    @property
+    def due(self):
+        return (self.trigger_time - time()) < self.ALLOWANCE
 
 class EventHub(object):
-	SIZE_HINT = 50000
-	def __init__(self):
-		self.epoll = select.epoll(self.SIZE_HINT)
-		self.timers = deque()
-		self.new_timers = []
-		self.run = True
-		def two_item_list():
-			return [None, None]
-		self.events = {}
+    SIZE_HINT = 50000
+    def __init__(self):
+        self.epoll = select.epoll(self.SIZE_HINT)
+        self.timers = deque()
+        self.new_timers = []
+        self.run = True
+        def two_item_list():
+            return [None, None]
+        self.events = {}
 
-	def handle_events(self):
-		if self.new_timers:
-			self.timers.extend(self.new_timers)
-			self.timers = deque(sorted(self.timers))
-			self.new_timers = []
-			
-		tm = time()
-		timeout = (self.timers[0][1].trigger_time - tm) if self.timers else 1e6
-		if timeout < 0:
-			timeout = 0
-		events = self.epoll.poll(timeout)
+    def handle_events(self):
+        if self.new_timers:
+            self.timers.extend(self.new_timers)
+            self.timers = deque(sorted(self.timers))
+            self.new_timers = []
+            
+        tm = time()
+        timeout = (self.timers[0][1].trigger_time - tm) if self.timers else 1e6
+        if timeout < 0:
+            timeout = 0
+        events = self.epoll.poll(timeout)
 
-		while self.timers:
-			if self.timers[0][1].due:
-				t = self.timers.popleft()[1]
-				if t.pending:
-					t.callback()
-					if not self.run:
-						return
-			else:
-				break
-		
-		for (fd, evtype) in events:
-			if evtype == EPOLLIN or evtype == EPOLLPRI:
-				self.events[fd][0]()
-			else:
-				self.events[fd][1]()
-			if not self.run:
-				return
+        while self.timers:
+            if self.timers[0][1].due:
+                t = self.timers.popleft()[1]
+                if t.pending:
+                    t.callback()
+                    if not self.run:
+                        return
+            else:
+                break
+        
+        for (fd, evtype) in events:
+            if evtype == EPOLLIN or evtype == EPOLLPRI:
+                self.events[fd][0]()
+            else:
+                self.events[fd][1]()
+            if not self.run:
+                return
 
-		runs = -1
-		while runs != 0:
-			runs = 0
-			if self.new_timers:
-				self.timers.extend(self.new_timers)
-				self.timers = deque(sorted(self.timers))
-				self.new_timers = []
-			while self.timers:
-				if self.timers[0][1].due:
-					t = self.timers.popleft()[1]
-					if t.pending:
-						t.callback()
-						runs += 1
-						if not self.run:
-							return
-				else:
-					break
+        runs = -1
+        while runs != 0:
+            runs = 0
+            if self.new_timers:
+                self.timers.extend(self.new_timers)
+                self.timers = deque(sorted(self.timers))
+                self.new_timers = []
+            while self.timers:
+                if self.timers[0][1].due:
+                    t = self.timers.popleft()[1]
+                    if t.pending:
+                        t.callback()
+                        runs += 1
+                        if not self.run:
+                            return
+                else:
+                    break
 
-	def call_later(self, interval, f, *args, **kw):
-		t = Timer(interval, f, *args, **kw)
-		self.new_timers.append((t.trigger_time, t))
-		return t
+    def call_later(self, interval, f, *args, **kw):
+        t = Timer(interval, f, *args, **kw)
+        self.new_timers.append((t.trigger_time, t))
+        return t
 
-	def register(self, fd, read_callback, write_callback):
-		assert fd not in self.events
-		self.events[fd.fileno()] = (read_callback, write_callback)
-		self.epoll.register(fd, EPOLLIN | EPOLLPRI)
+    def register(self, fd, read_callback, write_callback):
+        assert fd not in self.events
+        self.events[fd.fileno()] = (read_callback, write_callback)
+        self.epoll.register(fd, EPOLLIN | EPOLLPRI)
 
-	def enable_write(self, fd):
-		self.epoll.modify(fd, EPOLLIN | EPOLLPRI | EPOLLOUT)
+    def enable_write(self, fd):
+        self.epoll.modify(fd, EPOLLIN | EPOLLPRI | EPOLLOUT)
 
-	def disable_write(self, fd):
-		self.epoll.modify(fd, EPOLLIN | EPOLLPRI)
+    def disable_write(self, fd):
+        self.epoll.modify(fd, EPOLLIN | EPOLLPRI)
 
-	def unregister(self, fd):
-		fn = fd.fileno()
-		if fn in self.events:
-			del self.events[fn]
-			self.epoll.unregister(fd)
+    def unregister(self, fd):
+        fn = fd.fileno()
+        if fn in self.events:
+            del self.events[fn]
+            self.epoll.unregister(fd)
 
 if __name__ == '__main__':
-	hub = EventHub()
-	def whatever(message, other=None):
-		print 'got', message, other
-	hub.call_later(3.0, whatever, 'yes!', other='rock!')
-	import socket, sys
-	s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-	s.bind(('', 11911))
-	s.listen(5)
-	hub.register(s, lambda: sys.stdout.write('new socket!'), lambda: sys.stdout.write('arg!'))
-	while True:
-		hub.handle_events()
+    hub = EventHub()
+    def whatever(message, other=None):
+        print 'got', message, other
+    hub.call_later(3.0, whatever, 'yes!', other='rock!')
+    import socket, sys
+    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+    s.bind(('', 11911))
+    s.listen(5)
+    hub.register(s, lambda: sys.stdout.write('new socket!'), lambda: sys.stdout.write('arg!'))
+    while True:
+        hub.handle_events()
+# vim:ts=4:sw=4:expandtab
 import sys
 import time
 
 ) = range(1,6)
 
 _lvl_text = {
-	LOGLVL_DEBUG : 'debug',
-	LOGLVL_INFO : 'info',
-	LOGLVL_WARN : 'warn',
-	LOGLVL_ERR : 'error',
-	LOGLVL_CRITICAL : 'critical',
+    LOGLVL_DEBUG : 'debug',
+    LOGLVL_INFO : 'info',
+    LOGLVL_WARN : 'warn',
+    LOGLVL_ERR : 'error',
+    LOGLVL_CRITICAL : 'critical',
 }
-	
+    
 
 class Logger(object):
-	def __init__(self, fd=None, verbosity=LOGLVL_WARN):
-		if fd is None:
-			fd = [sys.stdout]
-		if type(fd) not in (list, tuple):
-			fd = [fd]
-		self.fdlist = list(fd)
-		self.level = verbosity
-		self.component = None
+    def __init__(self, fd=None, verbosity=LOGLVL_WARN):
+        if fd is None:
+            fd = [sys.stdout]
+        if type(fd) not in (list, tuple):
+            fd = [fd]
+        self.fdlist = list(fd)
+        self.level = verbosity
+        self.component = None
 
-	def _writelogline(self, lvl, message):
-		if lvl >= self.level:
-			for fd in self.fdlist:
-				fd.write('[%s] {%s%s} %s\n' % (time.asctime(), 
-										self.component and ('%s:' % self.component) or '',
-										_lvl_text[lvl],
-										message))
+    def _writelogline(self, lvl, message):
+        if lvl >= self.level:
+            for fd in self.fdlist:
+                fd.write('[%s] {%s%s} %s\n' % (time.asctime(), 
+                                        self.component and ('%s:' % self.component) or '',
+                                        _lvl_text[lvl],
+                                        message))
 
-	debug = lambda s, m: s._writelogline(LOGLVL_DEBUG, m)
-	info = lambda s, m: s._writelogline(LOGLVL_INFO, m)
-	warn = lambda s, m: s._writelogline(LOGLVL_WARN, m)
-	error = lambda s, m: s._writelogline(LOGLVL_ERR, m)
-	critical = lambda s, m: s._writelogline(LOGLVL_CRITICAL, m)
+    debug = lambda s, m: s._writelogline(LOGLVL_DEBUG, m)
+    info = lambda s, m: s._writelogline(LOGLVL_INFO, m)
+    warn = lambda s, m: s._writelogline(LOGLVL_WARN, m)
+    error = lambda s, m: s._writelogline(LOGLVL_ERR, m)
+    critical = lambda s, m: s._writelogline(LOGLVL_CRITICAL, m)
 
-	def get_sublogger(self, component, verbosity=None):
-		copy = Logger(verbosity=verbosity or self.level)
-		copy.fdlist = self.fdlist
-		copy.component = component
-		return copy
+    def get_sublogger(self, component, verbosity=None):
+        copy = Logger(verbosity=verbosity or self.level)
+        copy.fdlist = self.fdlist
+        copy.component = component
+        return copy
 
 def set_current_application(app):
-	global _current_application
-	_current_application = app
+    global _current_application
+    _current_application = app
 
 class _currentLogger(object):
-	def __getattr__(self, n):
-		return getattr(_current_application.logger, n)
+    def __getattr__(self, n):
+        return getattr(_current_application.logger, n)
 
 log = _currentLogger()

diesel/pipeline.py

+# vim:ts=4:sw=4:expandtab
 try:
-	import cStringIO
+    import cStringIO
 except ImportError:
-	raise ImportError, "cStringIO is required"
+    raise ImportError, "cStringIO is required"
 
 _obj_SIO = cStringIO.StringIO
 _type_SIO = cStringIO.OutputType
 def make_SIO(d):
-	t = _obj_SIO()
-	t.write(d)
-	t.seek(0)
-	return t
+    t = _obj_SIO()
+    t.write(d)
+    t.seek(0)
+    return t
 
 def get_file_length(f):
-	m = f.tell()
-	f.seek(0, 2)
-	r = f.tell()
-	f.seek(m)
-	return r
+    m = f.tell()
+    f.seek(0, 2)
+    r = f.tell()
+    f.seek(m)
+    return r
 
 class PipelineLimitReached(Exception): pass
 class PipelineCloseRequest(Exception): pass
 class PipelineClosed(Exception): pass
-	
+    
 class Pipeline(object):
-	def __init__(self, limit=0):
-		self.line = []
-		self.limit = limit
-		self.used = 0
-		self.callbacks = []
-		self.want_close = False
+    def __init__(self, limit=0):
+        self.line = []
+        self.limit = limit
+        self.used = 0
+        self.callbacks = []
+        self.want_close = False
 
-	def add(self, d):
-		if self.want_close:
-			raise PipelineClosed
+    def add(self, d):
+        if self.want_close:
+            raise PipelineClosed
 
-		if self.limit > 0 and self.used >= self.limit:
-			raise PipelineLimitReached
+        if self.limit > 0 and self.used >= self.limit:
+            raise PipelineLimitReached
 
-		if type(d) is str:
-			if self.line and type(self.line[-1][0]) is _type_SIO and \
-			(self.limit == 0 or self.line[-1][1] < (self.limit / 2)):
-				fd, l = self.line[-1]
-				m = fd.tell()
-				fd.seek(0, 2)
-				fd.write(d)
-				fd.seek(m)
-				self.line[-1] = [fd, l + len(d)]
-			else:
-				self.line.append([make_SIO(d), len(d)])
-			self.used += len(d)
-		else:
-			self.line.append([d, get_file_length(d)])
+        if type(d) is str:
+            if self.line and type(self.line[-1][0]) is _type_SIO and \
+            (self.limit == 0 or self.line[-1][1] < (self.limit / 2)):
+                fd, l = self.line[-1]
+                m = fd.tell()
+                fd.seek(0, 2)
+                fd.write(d)
+                fd.seek(m)
+                self.line[-1] = [fd, l + len(d)]
+            else:
+                self.line.append([make_SIO(d), len(d)])
+            self.used += len(d)
+        else:
+            self.line.append([d, get_file_length(d)])
 
-	def close_request(self):
-		self.want_close = True
+    def close_request(self):
+        self.want_close = True
 
-	def read(self, amt):
-		if self.line == [] and self.want_close:
-			raise PipelineCloseRequest
+    def read(self, amt):
+        if self.line == [] and self.want_close:
+            raise PipelineCloseRequest
 
-		rbuf = []
-		read = 0
-		while self.line and read < amt:
-			data = self.line[0][0].read(amt - read)
-			if data == '':
-				if type(self.line[0][0]) is _type_SIO:
-					self.used -= self.line[0][1]
-				del self.line[0]
-			else:
-				rbuf.append(data)
-				read += len(data)
+        rbuf = []
+        read = 0
+        while self.line and read < amt:
+            data = self.line[0][0].read(amt - read)
+            if data == '':
+                if type(self.line[0][0]) is _type_SIO:
+                    self.used -= self.line[0][1]
+                del self.line[0]
+            else:
+                rbuf.append(data)
+                read += len(data)
 
-		while self.line and self.line[0][1] == self.line[0][0].tell():
-			self.used -= self.line[0][1]
-			del self.line[0]
+        while self.line and self.line[0][1] == self.line[0][0].tell():
+            self.used -= self.line[0][1]
+            del self.line[0]
 
-		while self.callbacks and self.used < self.limit:
-			self.callbacks.pop(0).callback(self)
+        while self.callbacks and self.used < self.limit:
+            self.callbacks.pop(0).callback(self)
 
-		return ''.join(rbuf)
-	
-	def backup(self, d):
-		self.line.insert(0, [make_SIO(d), len(d)])
-		self.used += len(d)
+        return ''.join(rbuf)
+    
+    def backup(self, d):
+        self.line.insert(0, [make_SIO(d), len(d)])
+        self.used += len(d)
 
-	def _get_empty(self):
-		return self.want_close == False and self.line == []
-	empty = property(_get_empty)
+    def _get_empty(self):
+        return self.want_close == False and self.line == []
+    empty = property(_get_empty)
 
-	def _get_full(self):
-		return self.used == 0 or self.used >= self.limit
-	full = property(_get_full)
+    def _get_full(self):
+        return self.used == 0 or self.used >= self.limit
+    full = property(_get_full)

diesel/protocols/__init__.py

+# vim:ts=4:sw=4:expandtab

diesel/protocols/http.py

+# vim:ts=4:sw=4:expandtab
 import sys, socket
 import urllib
 from collections import defaultdict
 }
 
 def parse_request_line(line):
-	items = line.split(' ')
-	items[0] = items[0].upper()
-	if len(items) == 2:
-		return tuple(items) + ('0.9',)
-	items[1] = urllib.unquote(items[1])
-	items[2] = items[2].split('/')[-1].strip()
-	return tuple(items)
+    items = line.split(' ')
+    items[0] = items[0].upper()
+    if len(items) == 2:
+        return tuple(items) + ('0.9',)
+    items[1] = urllib.unquote(items[1])
+    items[2] = items[2].split('/')[-1].strip()
+    return tuple(items)
 
 class HttpHeaders(object):
-	def __init__(self):
-		self._headers = defaultdict(list)
-		self.link()
+    def __init__(self):
+        self._headers = defaultdict(list)
+        self.link()
 
-	def add(self, k, v):
-		self._headers[k.lower()].append(str(v).strip())
+    def add(self, k, v):
+        self._headers[k.lower()].append(str(v).strip())
 
-	def remove(self, k):
-		if k.lower() in self._headers:
-			del self._headers[k.lower()]
+    def remove(self, k):
+        if k.lower() in self._headers:
+            del self._headers[k.lower()]
 
-	def set(self, k, v):
-		self._headers[k.lower()] = [str(v).strip()]
+    def set(self, k, v):
+        self._headers[k.lower()] = [str(v).strip()]
 
-	def format(self):
-		s = []
-		for h, vs in self._headers.iteritems():
-			for v in vs:
-				s.append('%s: %s' % (h.title(), v))
-		return '\r\n'.join(s)
-	
-	def link(self):
-		self.items = self._headers.items
-		self.keys = self._headers.keys
-		self.values = self._headers.values
-		self.itervalues = self._headers.itervalues
-		self.iteritems = self._headers.iteritems
+    def format(self):
+        s = []
+        for h, vs in self._headers.iteritems():
+            for v in vs:
+                s.append('%s: %s' % (h.title(), v))
+        return '\r\n'.join(s)
+    
+    def link(self):
+        self.items = self._headers.items
+        self.keys = self._headers.keys
+        self.values = self._headers.values
+        self.itervalues = self._headers.itervalues
+        self.iteritems = self._headers.iteritems
 
-	def parse(self, rawInput):
-		ws = ' \t'
-		heads = {}
-		curhead = None
-		curbuf = []
-		for line in rawInput.splitlines():
-			if not line.strip():
-				continue
-			if line[0] in ws:
-				curbuf.append(line.strip())
-			else:
-				if curhead:
-					heads.setdefault(curhead, []).append(' '.join(curbuf))
-				name, body = map(str.strip, line.split(':', 1))
-				curhead = name.lower()
-				curbuf = [body]
-		if curhead:
-			heads.setdefault(curhead, []).append(' '.join(curbuf))
-		self._headers = heads
-		self.link()
+    def parse(self, rawInput):
+        ws = ' \t'
+        heads = {}
+        curhead = None
+        curbuf = []
+        for line in rawInput.splitlines():
+            if not line.strip():
+                continue
+            if line[0] in ws:
+                curbuf.append(line.strip())
+            else:
+                if curhead:
+                    heads.setdefault(curhead, []).append(' '.join(curbuf))
+                name, body = map(str.strip, line.split(':', 1))
+                curhead = name.lower()
+                curbuf = [body]
+        if curhead:
+            heads.setdefault(curhead, []).append(' '.join(curbuf))
+        self._headers = heads
+        self.link()
 
-	def __contains__(self, k):
-		return k.lower() in self._headers
+    def __contains__(self, k):
+        return k.lower() in self._headers
 
-	def __getitem__(self, k):
-		return self._headers[k.lower()]
+    def __getitem__(self, k):
+        return self._headers[k.lower()]
 
-	def get(self, k, d=None):
-		return self._headers.get(k.lower(), d)
+    def get(self, k, d=None):
+        return self._headers.get(k.lower(), d)
 
-	def __iter__(self):
-		return self._headers
+    def __iter__(self):
+        return self._headers
 
-	def __str__(self):
-		return self.format()
+    def __str__(self):
+        return self.format()
 
 class HttpRequest(object):
-	def __init__(self, cmd, url, version, id=None):
-		self.cmd = cmd
-		self.url = url
-		self.version = version
-		self.headers = None
-		self.body = None
-		self.id = id
-		
-	def format(self):	
-		return '%s %s HTTP/%s' % (self.cmd, self.url, self.version)
-		
-class HttpProtocolError(Exception): pass	
-class HttpClose(object): pass	
+    def __init__(self, cmd, url, version, id=None):
+        self.cmd = cmd
+        self.url = url
+        self.version = version
+        self.headers = None
+        self.body = None
+        self.id = id
+        
+    def format(self):    
+        return '%s %s HTTP/%s' % (self.cmd, self.url, self.version)
+        
+class HttpProtocolError(Exception): pass    
+class HttpClose(object): pass    
 
 class HttpServer(object):
-	def __init__(self, request_handler):
-		self.request_handler = request_handler
+    def __init__(self, request_handler):
+        self.request_handler = request_handler
 
-	BODY_CHUNKED, BODY_CL, BODY_NONE = range(3)
+    BODY_CHUNKED, BODY_CL, BODY_NONE = range(3)
 
-	def check_for_http_body(self, heads):
-		if heads.get('Transfer-Encoding') == ['chunked']:
-			return self.BODY_CHUNKED
-		elif 'Content-Length' in heads:
-			return self.BODY_CL
-		return self.BODY_NONE
+    def check_for_http_body(self, heads):
+        if heads.get('Transfer-Encoding') == ['chunked']:
+            return self.BODY_CHUNKED
+        elif 'Content-Length' in heads:
+            return self.BODY_CL
+        return self.BODY_NONE
 
-	def __call__(self, addr):
-		req_id = 1
-		while True:
-			chunks = []
-			try:
-				header_line = yield until_eol()
-			except ConnectionClosed:
-				break
+    def __call__(self, addr):
+        req_id = 1
+        while True:
+            chunks = []
+            try:
+                header_line = yield until_eol()
+            except ConnectionClosed:
+                break
 
-			cmd, url, version = parse_request_line(header_line)	
-			req = HttpRequest(cmd, url, version, req_id)
-			req_id += 1
+            cmd, url, version = parse_request_line(header_line)    
+            req = HttpRequest(cmd, url, version, req_id)
+            req_id += 1
 
-			header_block = yield until('\r\n\r\n')
+            header_block = yield until('\r\n\r\n')
 
-			heads = HttpHeaders()
-			heads.parse(header_block)
-			req.headers = heads
+            heads = HttpHeaders()
+            heads.parse(header_block)
+            req.headers = heads
 
-			if req.version >= '1.1' and heads.get('Expect') == ['100-continue']:
-				yield 'HTTP/1.1 100 Continue\r\n\r\n'
+            if req.version >= '1.1' and heads.get('Expect') == ['100-continue']:
+                yield 'HTTP/1.1 100 Continue\r\n\r\n'
 
-			more_mode = self.check_for_http_body(heads)
+            more_mode = self.check_for_http_body(heads)
 
-			if more_mode is self.BODY_NONE:
-				req.body = None
+            if more_mode is self.BODY_NONE:
+                req.body = None
 
-			elif more_mode is self.BODY_CL:
-				req.body = yield bytes(int(heads['Content-Length']))
+            elif more_mode is self.BODY_CL:
+                req.body = yield bytes(int(heads['Content-Length']))
 
-			elif more_mode is self.BODY_CHUNKED:
-				req.body = handle_chunks(heads)
+            elif more_mode is self.BODY_CHUNKED:
+                req.body = handle_chunks(heads)
 
-			leave_loop = False
-			for i in self.request_handler(req): 
-				if i == HttpClose:
-					leave_loop = True
-				else:
-					yield i
-			if leave_loop:
-				break
+            leave_loop = False
+            for i in self.request_handler(req): 
+                if i == HttpClose:
+                    leave_loop = True
+                else:
+                    yield i
+            if leave_loop:
+                break
 
 def http_response(req, code, heads, body):
-	if req.version <= '1.0' and req.headers.get('Connection') != 'Keep-Alive':
-		close = True
-	elif req.headers.get('Connection') == ['close'] or  \
-		heads.get('Connection') == ['close']:
-		close = True
-	else:
-		close = False
-		heads.set('Connection', 'keep-alive')
-	yield '''HTTP/%s %s %s\r\n%s\r\n\r\n''' % (
+    if req.version <= '1.0' and req.headers.get('Connection') != 'Keep-Alive':
+        close = True
+    elif req.headers.get('Connection') == ['close'] or  \
+        heads.get('Connection') == ['close']:
+        close = True
+    else:
+        close = False
+        heads.set('Connection', 'keep-alive')
+    yield '''HTTP/%s %s %s\r\n%s\r\n\r\n''' % (
     req.version, code, status_strings.get(code, "Unknown Status"), 
-	heads.format())
-	if body:
-		yield body
-	if close:
-		yield HttpClose
+    heads.format())
+    if body:
+        yield body
+    if close:
+        yield HttpClose
 
 from diesel import Client, call, response
 
 def handle_chunks(headers):
-	chunks = []
-	while True:
-		chunk_head = yield until_eol()
-		if ';' in chunk_head:
-			# we don't support any chunk extensions
-			chunk_head = chunk_head[:chunk_head.find(';')]
-		size = int(chunk_head, 16)
-		if size == 0:
-			break
-		else:
-			chunks.append((yield bytes(size)))
-			_ = yield bytes(2) # ignore trailing CRLF
+    chunks = []
+    while True:
+        chunk_head = yield until_eol()
+        if ';' in chunk_head:
+            # we don't support any chunk extensions
+            chunk_head = chunk_head[:chunk_head.find(';')]
+        size = int(chunk_head, 16)
+        if size == 0:
+            break
+        else:
+            chunks.append((yield bytes(size)))
+            _ = yield bytes(2) # ignore trailing CRLF
 
-	while True:
-		trailer = yield until_eol()
-		if trailer.strip():
-			headers.add(*tuple(trailer.split(':', 1)))
-		else:
-			body = ''.join(chunks)
-			headers.set('Content-Length', len(body))
-			headers.remove('Transfer-Encoding')
-			yield up(body)
-			break
+    while True:
+        trailer = yield until_eol()
+        if trailer.strip():
+            headers.add(*tuple(trailer.split(':', 1)))
+        else:
+            body = ''.join(chunks)
+            headers.set('Content-Length', len(body))
+            headers.remove('Transfer-Encoding')
+            yield up(body)
+            break
 
 class HttpClient(Client):
-	@call
-	def request(self, method, path, headers, body=None):
-		req = HttpRequest(method, path, '1.1')
-		
-		if body:
-			headers.set('Content-Length', len(body))
-		
-		yield '%s\r\n%s\r\n\r\n' % (req.format(), 
-		headers.format())
+    @call
+    def request(self, method, path, headers, body=None):
+        req = HttpRequest(method, path, '1.1')
+        
+        if body:
+            headers.set('Content-Length', len(body))
+        
+        yield '%s\r\n%s\r\n\r\n' % (req.format(), 
+        headers.format())
 
-		if body:	
-			yield body
+        if body:    
+            yield body
 
-		resp_line = yield until_eol()
-		version, code, status = resp_line.split(None, 2)
-		code = int(code)
+        resp_line = yield until_eol()
+        version, code, status = resp_line.split(None, 2)
+        code = int(code)
 
-		header_block = yield until('\r\n\r\n')
-		heads = HttpHeaders()
-		heads.parse(header_block)
+        header_block = yield until('\r\n\r\n')
+        heads = HttpHeaders()
+        heads.parse(header_block)
 
-		if heads.get('Transfer-Encoding') == ['chunked']:
-			body = yield handle_chunks(heads)
-		else:
-			cl = int(heads.get('Content-Length', [0])[0])
-			if cl:
-				body = yield bytes(cl)
-			else:
-				body = None
+        if heads.get('Transfer-Encoding') == ['chunked']:
+            body = yield handle_chunks(heads)
+        else:
+            cl = int(heads.get('Content-Length', [0])[0])
+            if cl:
+                body = yield bytes(cl)
+            else:
+                body = None
 
-		if version < '1.0' or heads.get('Connection', ['keep-alive'])[0] == 'close':
-			self.close()
-		yield response((code, heads, body))
+        if version < '1.0' or heads.get('Connection', ['keep-alive'])[0] == 'close':
+            self.close()
+        yield response((code, heads, body))

diesel/protocols/wsgi.py

+# vim:ts=4:sw=4:expandtab
 """A minimal WSGI container.
 """
 import urlparse
 HOSTNAME = os.uname()[1] # win32?
 
 def cgiish_name(nm):
-	return nm.upper().replace('-', '_')
+    return nm.upper().replace('-', '_')
 
 class FileLikeErrorLogger(object):
-	def __init__(self, logger):
-		self.logger = logger
+    def __init__(self, logger):
+        self.logger = logger
 
-	def write(self, s):
-		self.logger.error(s)
+    def write(self, s):
+        self.logger.error(s)
 
-	def writelines(self, lns):
-		self.logger.error('\n'.join(list(lns)))
+    def writelines(self, lns):
+        self.logger.error('\n'.join(list(lns)))
 
-	def flush(self):
-		pass
+    def flush(self):
+        pass
 
 def build_wsgi_env(req, port):
-	url_info = urlparse.urlparse(req.url)
-	env = {}
+    url_info = urlparse.urlparse(req.url)
+    env = {}
 
-	# CGI bits
-	env['REQUEST_METHOD'] = req.cmd
-	env['SCRIPT_NAME'] = ''
-	env['PATH_INFO'] = url_info[2]
-	env['QUERY_STRING'] = url_info[4]
-	if 'Content-Type' in req.headers:
-		env['CONTENT_TYPE'] = req.headers['Content-Type'][0]
-	if 'Content-Length' in req.headers:
-		env['CONTENT_LENGTH'] = int(req.headers['Content-Length'][0])
-	env['SERVER_NAME'] = HOSTNAME
-	env['SERVER_PORT'] = port
-	env['SERVER_PROTOCOL'] = 'HTTP/' + req.version
-	for name, v in req.headers.iteritems():
-		env['HTTP_%s' % cgiish_name(name)] = v[0]
+    # CGI bits
+    env['REQUEST_METHOD'] = req.cmd
+    env['SCRIPT_NAME'] = ''
+    env['PATH_INFO'] = url_info[2]
+    env['QUERY_STRING'] = url_info[4]
+    if 'Content-Type' in req.headers:
+        env['CONTENT_TYPE'] = req.headers['Content-Type'][0]
+    if 'Content-Length' in req.headers:
+        env['CONTENT_LENGTH'] = int(req.headers['Content-Length'][0])
+    env['SERVER_NAME'] = HOSTNAME
+    env['SERVER_PORT'] = port
+    env['SERVER_PROTOCOL'] = 'HTTP/' + req.version
+    for name, v in req.headers.iteritems():
+        env['HTTP_%s' % cgiish_name(name)] = v[0]
 
-	# WSGI-specific bits
-	env['wsgi.version'] = (1,0)
-	env['wsgi.url_scheme'] = 'http' # XXX incomplete
-	env['wsgi.input'] = cStringIO.StringIO(req.body or '')
-	env['wsgi.errors'] = FileLikeErrorLogger(log)
-	env['wsgi.multithread'] = False
-	env['wsgi.multiprocess'] = False
-	env['wsgi.run_once'] = False
-	return env
+    # WSGI-specific bits
+    env['wsgi.version'] = (1,0)
+    env['wsgi.url_scheme'] = 'http' # XXX incomplete
+    env['wsgi.input'] = cStringIO.StringIO(req.body or '')
+    env['wsgi.errors'] = FileLikeErrorLogger(log)
+    env['wsgi.multithread'] = False
+    env['wsgi.multiprocess'] = False
+    env['wsgi.run_once'] = False
+    return env
 
 class WSGIRequestHandler(object):
-	def __init__(self, app):
-		self.app = app
+    def __init__(self, app):
+        self.app = app
 
-	def _start_response(self, status, response_headers, exc_info=None):
-		if exc_info:
-			raise exc_info[0], exc_info[1], exc_info[2]
-		else:
-			self.status = status
-			self.response_headers = response_headers
-		return self.write_output
+    def _start_response(self, status, response_headers, exc_info=None):
+        if exc_info:
+            raise exc_info[0], exc_info[1], exc_info[2]
+        else:
+            self.status = status
+            self.response_headers = response_headers
+        return self.write_output
 
-	def write_output(self, output):
-		self.outbuf.append(output)
+    def write_output(self, output):
+        self.outbuf.append(output)
 
-	def __call__(self, req):
-		env = build_wsgi_env(req, self.app.port)
-		self.outbuf = []
-		for output in self.app.wsgi_callable(env, self._start_response):
-			self.write_output(output)
-		return self.finalize_request(req)
+    def __call__(self, req):
+        env = build_wsgi_env(req, self.app.port)
+        self.outbuf = []
+        for output in self.app.wsgi_callable(env, self._start_response):
+            self.write_output(output)
+        return self.finalize_request(req)
 
-	def finalize_request(self, req):
-		code = int(self.status.split()[0])
-		heads = HttpHeaders()
-		for n, v in self.response_headers:
-			heads.add(n, v)
-		body = ''.join(self.outbuf)
-		return http_response(req, code, heads, body)
+    def finalize_request(self, req):
+        code = int(self.status.split()[0])
+        heads = HttpHeaders()
+        for n, v in self.response_headers:
+            heads.add(n, v)
+        body = ''.join(self.outbuf)
+        return http_response(req, code, heads, body)
 
 class WSGIApplication(Application):
-	def __init__(self, wsgi_callable, port=80, iface=''):
-		Application.__init__(self)
-		self.port = port
-		self.wsgi_callable = wsgi_callable
-		http_service = Service(HttpServer(WSGIRequestHandler(self)), port, iface)
-		self.add_service(http_service)
+    def __init__(self, wsgi_callable, port=80, iface=''):
+        Application.__init__(self)
+        self.port = port
+        self.wsgi_callable = wsgi_callable
+        http_service = Service(HttpServer(WSGIRequestHandler(self)), port, iface)
+        self.add_service(http_service)
 
 if __name__ == '__main__':
-	def simple_app(environ, start_response):
-		"""Simplest possible application object"""
-		status = '200 OK'
-		response_headers = [('Content-type','text/plain')]
-		start_response(status, response_headers)
-		return ["Hello World!"]
-	app = WSGIApplication(simple_app, port=7080)
-	app.run()
+    def simple_app(environ, start_response):
+        """Simplest possible application object"""
+        status = '200 OK'
+        response_headers = [('Content-type','text/plain')]
+        start_response(status, response_headers)
+        return ["Hello World!"]
+    app = WSGIApplication(simple_app, port=7080)
+    app.run()
+# vim:ts=4:sw=4:expandtab
 '''Test the WSGI binding, hook cherrypy up.
 
 Tested on CherryPy 3.0.3
 print "You'll need to Ctl-Z and kill the job manually"
 
 class Root(object):
-	@cherrypy.expose
-	def index(self):
-		return "Hello, World!"
+    @cherrypy.expose
+    def index(self):
+        return "Hello, World!"
 
 root = cherrypy.tree.mount(Root(), '/')
 cherrypy.engine.start(blocking=False)

examples/crawler.py

+# vim:ts=4:sw=4:expandtab
 '''A very simple, flawed web crawler--demonstrates
 Clients + Loops
 '''
 path = path or '/'
 base_dir = path if path.endswith('/') else os.path.dirname(path)
 if not base_dir.endswith('/'):
-	base_dir += '/'
+    base_dir += '/'
 
 assert schema == 'http', 'http only'
 
 links = None
 
 def get_links(s):
-	for mo in url_exp.finditer(s):
-		lpath = mo.group(2)
-		if ':' not in lpath and '..' not in lpath:
-			if lpath.startswith('/'):
-				yield lpath
-			else:
-				yield urljoin(base_dir, lpath)
+    for mo in url_exp.finditer(s):
+        lpath = mo.group(2)
+        if ':' not in lpath and '..' not in lpath:
+            if lpath.startswith('/'):
+                yield lpath
+            else:
+                yield urljoin(base_dir, lpath)
 
 def get_client():
-	client = HttpClient()
-	client.connect(host, 80)
-	heads = HttpHeaders()
-	heads.set('Host', host)
-	return client, heads
+    client = HttpClient()
+    client.connect(host, 80)
+    heads = HttpHeaders()
+    heads.set('Host', host)
+    return client, heads
 
 def ensure_dirs(lpath):
-	def g(lpath):
-		while len(lpath) > len(folder):
-			lpath = os.path.dirname(lpath)
-			yield lpath
-	for d in reversed(list(g(lpath))):
-		if not os.path.isdir(d):
-			os.mkdir(d)
+    def g(lpath):
+        while len(lpath) > len(folder):
+            lpath = os.path.dirname(lpath)
+            yield lpath
+    for d in reversed(list(g(lpath))):
+        if not os.path.isdir(d):
+            os.mkdir(d)
 
 def write_file(lpath, body):
-	lpath = (lpath if not lpath.endswith('/') else (lpath + 'index.html')).lstrip('/')
-	lpath = os.path.join(folder, lpath)
-	ensure_dirs(lpath)
-	open(lpath, 'w').write(body)
+    lpath = (lpath if not lpath.endswith('/') else (lpath + 'index.html')).lstrip('/')
+    lpath = os.path.join(folder, lpath)
+    ensure_dirs(lpath)
+    open(lpath, 'w').write(body)
 
 def follow_loop():
-	global count
-	global files
-	count += 1
-	client, heads = get_client()
-	while True:
-		try:
-			lpath = links.next()
-		except StopIteration:
-			count -= 1
-			if not count:
-				stop()
-			break
-		print " -> %s" % lpath
-		if client.is_closed:
-			client, heads = get_client()
-		code, heads, body = yield client.request('GET', lpath, heads)
-		write_file(lpath, body)
-		files +=1
-	
+    global count
+    global files
+    count += 1
+    client, heads = get_client()
+    while True:
+        try:
+            lpath = links.next()
+        except StopIteration:
+            count -= 1
+            if not count:
+                stop()
+            break
+        print " -> %s" % lpath
+        if client.is_closed:
+            client, heads = get_client()
+        code, heads, body = yield client.request('GET', lpath, heads)
+        write_file(lpath, body)
+        files +=1
+    
 def req_loop():
-	global links
-	client, heads = get_client()
-	print path
-	code, heads, body = yield client.request('GET', path, heads)
-	write_file(path, body)
-	links = get_links(body)
-	for x in xrange(CONCURRENCY):
-		a.add_loop(Loop(follow_loop))
+    global links
+    client, heads = get_client()
+    print path
+    code, heads, body = yield client.request('GET', path, heads)
+    write_file(path, body)
+    links = get_links(body)
+    for x in xrange(CONCURRENCY):
+        a.add_loop(Loop(follow_loop))
 
 a = Application()
 a.add_loop(Loop(req_loop))
 
 def stop():
-	print "Fetched %s files in %.3fs with concurrency=%s" % (files, time.time() - t, CONCURRENCY)
-	a.halt() # stop application
+    print "Fetched %s files in %.3fs with concurrency=%s" % (files, time.time() - t, CONCURRENCY)
+    a.halt() # stop application
 
 t = time.time()
 a.run()
+# vim:ts=4:sw=4:expandtab
 '''Simple echo server.
 '''
 from diesel import Application, Service, until_eol
 
 def hi_server(addr):
-	while 1:
-		inp = (yield until_eol())
-		yield "you said %s" % inp
+    while 1:
+        inp = (yield until_eol())
+        yield "you said %s" % inp
 
 app = Application()
 app.add_service(Service(hi_server, 8013))

examples/echo_client.py

+# vim:ts=4:sw=4:expandtab
 '''A Client example connecting to an echo server (echo.py).
 Utilizes sleep as well.
 '''
 import time
 
 class EchoClient(Client):
-	@call
-	def echo(self, message):
-		yield "%s!\r\n" % message
-		back = yield until_eol()
-		yield response(back)
+    @call
+    def echo(self, message):
+        yield "%s!\r\n" % message
+        back = yield until_eol()
+        yield response(back)
 
-	def on_close(self):
-		print 'ouch!  closed!'
+    def on_close(self):
+        print 'ouch!  closed!'
 
 
 def echo_loop(n):
-	def _loop():
-		client = EchoClient()
-		client.connect('localhost', 8013)
-		while 1:
-			bar = yield client.echo("foo %s" % n)
-			tms = time.asctime()
-			print "[%s] %s: remote service said %r" % (tms, n, bar)
-			yield sleep(2)
-	return _loop
+    def _loop():
+        client = EchoClient()
+        client.connect('localhost', 8013)
+        while 1:
+            bar = yield client.echo("foo %s" % n)
+            tms = time.asctime()
+            print "[%s] %s: remote service said %r" % (tms, n, bar)
+            yield sleep(2)
+    return _loop
 
 a = Application()
 
 for x in xrange(5):
-	a.add_loop(Loop(echo_loop(x)))
+    a.add_loop(Loop(echo_loop(x)))
 a.run()

examples/echo_timeout.py

+# vim:ts=4:sw=4:expandtab
 '''Simple echo server.
 '''
 import time
 from diesel import Application, Service, until_eol, sleep
 
 def hi_server(addr):
-	while 1:
-		inp, to = (yield (until_eol(), sleep(3)))
-		if to:
-			print '%s timeout!' % time.asctime()
-		else:
-			yield "you said %s" % inp
+    while 1:
+        inp, to = (yield (until_eol(), sleep(3)))
+        if to:
+            print '%s timeout!' % time.asctime()
+        else:
+            yield "you said %s" % inp
 
 app = Application()
 app.add_service(Service(hi_server, 8013))
+# vim:ts=4:sw=4:expandtab
 '''Example of event firing.
 '''
 import random
 from diesel import Application, Loop, sleep, fire, wait
 
 def gunner():
-	x = 1
-	while True:
-		yield fire('bam', x)
-		x += 1
-		yield sleep()
+    x = 1
+    while True:
+        yield fire('bam', x)
+        x += 1
+        yield sleep()
 
 def sieged():
-	while True:
-		n = yield wait('bam')
-		if n % 10000 == 0:
-			print n
+    while True:
+        n = yield wait('bam')
+        if n % 10000 == 0:
+            print n
 
 a = Application()
 a.add_loop(Loop(gunner))
+# vim:ts=4:sw=4:expandtab
 '''The oh-so-canonical "Hello, World!" http server.
 '''