Commits

Anonymous committed 1ea366e

Created orbited repository

Comments (0)

Files changed (334)

clients/php/php-orbited.php

+<?php
+if (!@include 'Net/Socket.php') die("Net_Socket not found - you need to install the Net_Socket package using:\n\tpear install Net_Socket\n");
+
+class Orbited_User_Key {
+    private $location;
+    private $username;
+    private $session_key;
+
+    function __construct($location, $username, $session_key) {
+        $this->location    = $location;
+        $this->username    = $username;
+        $this->session_key = $session_key;
+    }
+    
+    function __tostring() {
+        return $this->location . ', ' . $this->username . ', ' . $this->session_key;
+    }
+}
+
+class Orbited_Client {
+    const VERSION = 'Orbit 1.0';
+    private $addr;
+    private $port;
+    private $socket;
+    private $id;
+    private $connected;
+
+    function __construct($addr = 'localhost', $port = 9000) {
+        $this->addr      = $addr;
+        $this->port      = $port;
+        $this->socket    = null;
+        $this->id        = 0;
+        $this->connected = false;
+    }
+
+    function connect() {
+        $this->connected = true;
+        if ($this->socket) {
+            return;
+        }
+        $this->socket = new Net_Socket;
+        $this->socket->connect($this->addr, $this->port);
+        $this->socket->setBlocking(true);
+    }
+
+    function disconnect() {
+        $this->connected = false;
+        $this->socket->disconnect();
+        $this->socket = null;
+    }
+
+    function reconnect() {
+        $this->disconnect();
+        $this->connect();
+    }
+
+    function sendLine($line = '') {
+        $this->socket->writeLine($line);
+    }
+
+    function event($recipients, $body, $retry = true) {
+        if (!$this->connected) {
+            $this->connect();
+        }
+        try {
+            if (!is_array($recipients)) {
+                $recipients = array($recipients);
+            }
+            if (!$this->socket) {
+                throw new Exception('Connection Lost');
+            }
+            try {
+                $this->id++;
+                $this->sendLine(self::VERSION);
+                $this->sendLine('Event');
+                $this->sendLine('id: ' . $this->id);
+                foreach ($recipients as $recipient) {
+                    if (is_object($recipient)) {
+                        $recipient = $recipient->__tostring();
+                    }
+                    $this->sendLine("recipient: $recipient");
+                }
+                $this->sendLine('length: ' . strlen($body));
+                $this->sendLine();
+                $this->socket->write($body);
+                return $this->read_response();
+            } catch (Exception $e) {
+                $this->disconnect();
+                throw new Exception('Connection Lost');
+            }
+        } catch (Exception $e) {
+            if ($retry) {
+                $this->reconnect();
+                $this->event($recipients, $body, false);
+            } else {
+                throw new Exception('Send Failed');
+            }
+        }
+    }
+
+    // This method is copied from Net_Socket::readLine() since
+    // Net_Socket doesn't offer a method to read up to a specified string!?
+    function read_response() {
+        $line = '';
+        $timeout = time() + $this->socket->timeout;
+        while (!feof($this->socket->fp) && (!$this->socket->timeout || time() < $timeout)) {
+            $line .= @fgets($this->socket->fp, $this->socket->lineLength);
+            if (substr($line, -4) == "\r\n\r\n") {
+                return rtrim($line, "\r\n");
+            }
+        }
+        return $line;
+    }
+}
+
+$client = new Orbited_Client;
+$user_key = new Orbited_User_Key("user", 0, "/demo");
+$client->event( $user_key, "Hello from PHP!<br>", false);
+?>

clients/python/LICENSE.txt

+This is the MIT license:
+http://www.opensource.org/licenses/mit-license.php
+
+Copyright (c) 2005, 2006 Michael Carter. pyrobit is a trademark of Kevin Dangoor.
+
+Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

clients/python/pyorbited/LICENSE.txt

+This is the MIT license:
+http://www.opensource.org/licenses/mit-license.php
+
+Copyright (c) 2005, 2006 Michael Carter. pyrobit is a trademark of Kevin Dangoor.
+
+Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

clients/python/pyorbited/__init__.py

Empty file added.

clients/python/pyorbited/event.py

+import socket
+import event
+LQUEUE_SIZE = 500
+BUFFER_SIZE = 4096
+
+END = '\r\n\r\n'
+LINE = '\r\n'
+
+def cb(req):
+    print req.status
+    print req.headers
+    event.abort()
+    
+def cb2(req):
+    print req.status
+    print req.headers
+
+def client_socket(addr, port):
+    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+    sock.setblocking(0)
+    sock.connect_ex((addr, port))
+    return sock    
+
+def jsonify(self, data):
+    return data
+    
+class Client(object):
+    def __init__(self, servers, jscript=None):        
+        if jscript:
+            self.jscript = jscript
+        self.servers = servers
+        self.connections = dict()
+        self.connections[0] = Connection(0, "localhost", 9000)
+        self.connections[1] = Connection(1, "localhost", 9000)
+        self.id = 0
+#    def connect_all(self):
+#        for connection in self.connections.values():
+#            connection.connect()
+        
+    def event(self, recipients, data, callback=None, jscript=True, timeout=10):
+        self.id += 1
+        if jscript:
+            data = jsonify(data)
+        r = Request(self, self.id, recipients, data, callback)
+        if not callback:
+            # this timeout isn't good for real use. just so the interpreter
+            # doesn't hang while testing.
+            tevent = event.timeout(timeout, event.abort)
+            event.dispatch()
+            tevent.delete()
+            return r
+            
+    def get_conn(self, id):
+        return self.connections[id]
+            
+    def hash(self, key):
+        if 'carn' in key:
+            return 0
+        return 1
+
+
+class Request(object):
+    def __init__(self, client, id, recipients, data, callback=None):
+        self.id = id
+        self.callback = callback
+        self.replies = dict()
+        self.requests = dict()
+        self.data = data
+        for recipient in recipients:
+            num = client.hash(recipient)
+            if num not in self.requests:
+                self.requests[num] = []
+            self.requests[num].append(recipient)
+            
+        for server_id in self.requests.keys():
+            client.get_conn(server_id).event(self)
+        
+
+
+    def render(self, connection):
+        out  = "Orbit 1.0\r\nEvent\r\nid: %s\r\n" % self.id
+        for recipient in self.requests[connection.id]:
+            out += "recipient: %s\r\n" % recipient
+        out += "length: %s\r\n\r\n" % len(self.data)
+        out += self.data
+#        print "request: %s" % out
+        return out
+        
+    def reply(self, connection, status, headers):
+        self.replies[connection.id] = status, headers
+        if len(self.replies.keys()) == len(self.requests.keys()):
+            self.complete()
+            
+    def complete(self):
+        final_headers = {
+            'msg': 'success',
+            'id': self.id,
+            'recipients': []
+        }
+        self.status = 'success'
+        for status, headers in self.replies.values():
+            if status.lower() == 'failure':
+                self.status = 'failure'
+                final_headers['msg'] = headers['msg']
+                final_headers['recipients'].extend(headers['recipient'])
+                
+        self.headers = final_headers
+        if self.status == 'success':
+            del self.headers['recipients']
+        # We are in synchronous mode...
+        if not self.callback:
+            return event.abort()
+        # We are in asynchronous mode...
+        self.callback(self)
+        
+class Connection(object):
+    def __init__(self, id, addr, port):
+        self.id = id
+        self.sock = client_socket(addr, port)
+        self.write_buffer = ""
+        self.read_buffer = ""
+        self.events = []
+        self.pending_events = {}
+        self.state = None
+        self.revent = None
+        self.wevent = None
+        
+    def close(self):
+        pass
+        
+    def event(self, event):
+        self.events.append(event)
+        self.start_write()
+        self.start_read()
+            
+    def start_write(self):
+        if not self.wevent:
+            self.next_event()
+            self.wevent = event.write(self.sock, self.write_ready)
+            
+    def start_read(self):
+        if not self.revent:
+            self.revent = event.read(self.sock, self.read_ready)
+            self.revent.add()
+        
+    def next_event(self):
+        e = self.events.pop(0)
+        self.pending_events[e.id] = e
+        self.write_buffer = e.render(self)      
+        
+    def write_ready(self):
+        bsent = self.sock.send(self.write_buffer)
+#        print "=====wrote=====\n %s" % self.write_buffer[:bsent]
+        self.write_buffer = self.write_buffer[bsent:]
+        if not self.write_buffer:
+            if not self.events:
+#                print "Wrote ALL Events"
+                self.wevent = None
+                return None
+            self.next_event()
+        return True
+        
+    def read_ready(self):
+        data = self.sock.recv(BUFFER_SIZE)
+#        print "READ: %s" % data
+        if not data:
+            self.close()
+            return None
+        self.read_buffer += data
+        if END in self.read_buffer:
+            self.process()
+#            self.state = None
+#            self.start_write()
+        return True   
+
+    def process(self):
+#        print "processing!"
+        index = self.read_buffer.find(END)
+        reply = self.read_buffer[:index]
+        self.read_buffer = self.read_buffer[index+len(END):]
+        lines = reply.split(LINE)
+        status = lines[0]
+        headers = {}
+        for line in lines[1:]:
+            key, val = line.split(': ')
+            if key == 'recipient':
+                if key in headers:
+                    headers[key].append(val)# = [ headers[key], val ]
+                else:
+                    headers[key] = [val]
+            else:
+                headers[key] = val
+#        print "FINISHED READ"
+#        print "headers: %s" % headers
+#        print "pending: %s" % self.pending_events
+        event = self.pending_events.pop(int(headers['id']))
+        event.reply(self, status, headers)
+        

clients/python/pyorbited/io.py

+import socket
+LQUEUE_SIZE = 500
+BUFFER_SIZE = 4096
+
+def server_socket(port):
+    """Return a listening socket bound to the given interface and port.
+    """
+    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+    sock.bind(("localhost", port))
+    sock.listen(LQUEUE_SIZE)
+    return sock
+
+
+def client_socket(addr, port):
+    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+    sock.setblocking(0)
+    sock.connect_ex((addr, port))
+    return sock    

clients/python/pyorbited/simple.py

+import socket, io
+from simplejson.encoder import JSONEncoder
+encode = JSONEncoder().encode
+END = '\r\n'
+
+def debug(data):
+    pass
+
+class Client(object):
+    version = 'Orbit 1.0'
+    def __init__(self, addr="localhost", port=9000):
+        self.addr = addr
+        self.port = port
+        self.socket = None
+        self.id = 0
+        self.connected = False
+    
+    def connect(self):
+        self.connected = True
+        if self.socket:
+            debug("Already Connected")
+            return
+        self.socket = io.client_socket(self.addr, self.port)
+        self.socket.setblocking(1)
+    
+    def disconnect(self):
+        self.connected = False
+        self.socket.close()
+        self.socket = None
+    
+    def reconnect(self):
+        self.disconnect()
+        self.connect()
+    
+    def sendline(self, line=""):
+        self.socket.send('%s%s' % (line, END))
+    
+    def event(self, recipients, body, json=True, retry=True):
+        if not self.connected:
+            self.connect()
+        try:
+            if json:
+                body = encode(body)
+            if not self.socket:
+                raise IOError("ConnectionLost")
+            try:
+                self.id += 1
+                self.sendline(self.version)
+                self.sendline('Event')
+                self.sendline('id: %s' % self.id)
+                for recipient in recipients:
+                    self.sendline('recipient: %s' % (str(recipient)))
+                self.sendline('length: %s' % len(body))
+                self.sendline()
+                self.socket.send(body)
+                return self.read_response()
+            except socket.error:
+                # self.disconnect()
+                raise IOError("ConnectionLost")
+        except IOError, e:
+            if retry:
+                self.reconnect()
+                self.event(recipients, body, json, False)
+            else:
+                raise
+    
+    def read_response(self):
+        return self.socket.recv(io.BUFFER_SIZE)
+    

clients/python/pyorbited/twistedorbit.py

+# Author: Michael Carter
+# Email: CarterMichael@gmail.com
+#
+# Please note that this is an incomplete implementation of the client api. As
+# of version 0.1 it works only with a single server
+from twisted.internet import reactor, defer
+from twisted.python import log
+from twisted.internet.protocol import Protocol, ClientFactory
+from twisted import internet
+import simplejson
+import sys
+
+make_json = simplejson.encoder.JSONEncoder().encode
+
+class OrbitRequest(object):
+    def __init__(self, id, deferred, recipients, data):
+        self.id = id
+        self.deferred = deferred
+        self.recipients = recipients
+        self.data = data
+    
+    def render(self):
+        out = "Orbit 1.0\r\nEvent\r\nid: %s\r\nlength: %s\r\n" % (self.id, len(self.data))
+        for r in self.recipients:
+            out += "recipient: %s\r\n" % r
+        out += "\r\n"
+        out += self.data
+        return out
+    
+    def success(self, status, headers):
+        response = OrbitResponse(status, headers)
+        self.deferred.callback(response)
+    
+
+class OrbitResponse(object):
+    def __init__(self, status, headers):
+        self.status = status
+        self.headers = headers
+    
+
+class OrbitProtocol(Protocol):
+    def __init__(self, factory):
+        self.factory = factory
+        self.buffer = ""
+        self.write_buffer = ""
+        self.requests = dict()
+        self.pending = []
+        self.id = 0
+    
+    def event(self, recipients, data, jscript=True):
+        self.id+=1
+        if jscript:
+            data = make_json(data)
+        d = defer.Deferred()
+        self.requests[self.id] = OrbitRequest(self.id, d, recipients, data)
+        self.transport.write(self.requests[self.id].render())
+        return d
+    
+    def dataReceived(self, data):
+        self.buffer += data
+        self.process()
+    
+    def process(self):
+        while '\r\n\r\n' in self.buffer:
+            req, self.buffer = self.buffer.split('\r\n\r\n', 1)                
+            lines = req.split('\r\n')
+            action = lines[0]
+            headers = {}
+            recipients = []
+            for line in lines[1:]:
+                key, val = line.split(': ')
+                if key == 'recipient':
+                    recipients.append(val)
+                else:
+                    headers[key] = val
+            if recipients:
+                headers['recipients'] = recipients
+            self.requests[int(headers['id'])].success(action, headers)
+            self.requests.pop(int(headers['id']))
+    
+
+class OrbitFactory(ClientFactory):
+    protocol = OrbitProtocol
+    def __init__(self):
+        self.client = None
+    
+    def buildProtocol(self, addr):
+        self.client = self.protocol(self)
+        return self.client
+    
+
+class OrbitClient(object):
+    def __init__(self, addr="localhost", port=9000):
+        self.factory = OrbitFactory()
+        reactor.connectTCP(addr, port, self.factory)
+    
+    def event(self, recipients, event, jscript=True):
+        return self.factory.client.event(recipients, event, jscript)
+    

clients/python/setup.py

+#from ez_setup import use_setuptools
+#use_setuptools()
+from setuptools import setup
+setup(
+    name='pyorbited',
+    version='0.1.1',
+    author='Michael Carter',
+    author_email='CarterMichael@gmail.com',
+    url='http://orbited.org',
+    download_url='http://orbited.org/download',
+    license="MIT License",
+    description='A Python client for Orbited (Orbit Event Daemon), a Comet server. Includes three implementations: pyevent, twisted, basic sockets.',
+    long_description='',
+    packages=["pyorbited"],    
+    install_requires = [
+#        "event >= 0.3"
+    ],
+    
+#    entry_points = """    
+#    [console_scripts]
+#    orbited = orbited.start:main
+#    """,
+
+    classifiers = [
+        'Development Status :: 3 - Alpha',
+        'Environment :: Console',
+        'Intended Audience :: Developers',
+        'License :: OSI Approved :: MIT License',
+        'Operating System :: OS Independent',
+        'Programming Language :: Python',
+        'Topic :: Software Development :: Libraries :: Python Modules'],        
+    )
+    
+    
+    

clients/ruby/ruby-orbited.rb

+require 'socket'
+include Socket::Constants
+require 'json'
+
+BUFFER_SIZE = 4096
+LINE_END = "\r\n"
+
+module SimpleOrbit
+end
+
+class SimpleOrbit::Client
+  @@version = 'Orbit 1.0'
+  def initialize(addr, port)
+    @addr = addr
+    @port = port
+    @socket = nil
+    @id = 0
+    @connected = false
+  end
+  def connect()
+    @connected = true
+    if @socket
+      # already connected
+      return
+    end
+    @socket = TCPSocket.new(@addr, @port)
+  end
+  def disconnect()
+    if @connected
+      @connected = false
+      @socket.close()
+      @socket = nil
+    end
+  end
+  def reconnect()
+    disconnect()
+    connect()
+  end
+  def sendline(line='')
+    @socket.write(line.to_s + LINE_END)
+    puts line.to_s
+  end
+  def event(recipients, body, json=true, try_again=true)
+    if not @connected
+      connect()
+    end
+    begin
+      if json
+        body = JSON.generate(body)
+      end
+      if not @socket
+        raise ## **Connection-lost**
+      end
+      begin
+        @id = @id + 1
+        sendline(@@version.to_s)
+        sendline('Event')
+        sendline('id: ' + @id.to_s)
+        for recipient in recipients
+          sendline('recipient: ' + recipient.to_s)
+        end
+        sendline('length: ' + body.length.to_s)
+        sendline()
+        @socket.write(body)
+        puts body.to_s
+        return read_response()
+      rescue
+        disconnect()
+        raise
+      end
+    rescue
+      if try_again
+          reconnect()
+          event(recipients, body, json, false)
+      else
+          raise
+      end
+    end
+  end
+  def read_response()
+    # this is incorrect. Should read until it gets a \r\n\r\n
+    return @socket.read(15)
+  end
+end
+require 'ruby-orbited'
+
+client = SimpleOrbit::Client.new('127.0.0.1', 9000)
+client.connect()
+
+puts client.event(['user, 0, /demo'], 'Hello World!<br>', false).to_s
+This is the MIT license:
+http://www.opensource.org/licenses/mit-license.php
+
+Copyright (c) 2006-2007 Michael Carter. Orbited is a trademark of Michael Carter.
+
+Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

daemon/orbited/__init__.py

Empty file added.

daemon/orbited/app.py

+import sys
+import traceback
+import StringIO
+import event
+from config import map as config
+from http import HTTPDaemon
+from log import getLogger
+from orbited import session
+from orbited.exceptions import InvalidTransport
+from orbited.orbit import OPDaemon
+#config = config.map
+
+logger = getLogger('Application')
+access = getLogger('ACCESS')
+class Application(object):
+    ''' Top-level Orbited object, which encapsulates an HTTP daemon, and an
+        Orbit protocol daemon, each of which listens for connections on a
+        different port.
+    '''
+    def __init__(self):
+        self.http = HTTPDaemon(self)
+        self.op = OPDaemon(self)
+        self.sessions = {}
+        self.requests = {}
+    
+    def dispatch_event(self, request):
+        access.info('EVENT REQUEST\t%s/%s\t%s' % (request.connection.addr[0], request.id, request.length))
+        self.requests[request.key()] = request
+        for recipient in request.recipients:
+            if recipient in self.sessions:
+                self.sessions[recipient].event(request)
+            else:
+                access.info('EVENT FAILED\t%s/%s\t\n|---------------------- Recipient %s not Found' % (request.addr, request.id, recipient))
+                request.failure(recipient)
+
+        self.requests.pop(request.key())
+
+    def accept_browser_connection(self, conn):
+        conn_key = conn.key()
+        if conn_key not in self.sessions:
+            self.sessions[conn_key] = session.create(self, conn_key)
+        self.sessions[conn_key].accept_browser_connection(conn)
+
+    def expire_browser_connection(self, conn):
+        if conn.key() in self.sessions:
+            self.sessions[conn.key()].expire_browser_connection(conn)
+
+    def remove_session(self, session):
+        del self.sessions[session.key]
+
+    def start(self):
+
+        def collect_toplevel_exceptions():
+            return True
+
+        event.timeout(1, collect_toplevel_exceptions)
+        while True:
+            try:
+                event.dispatch()
+            except KeyboardInterrupt, k:
+                event.abort()
+                print 'Received Ctr+c shutdown'
+                sys.stdout.flush()
+                sys.exit(0)
+
+            except Exception, e:
+                exception, instance, tb = traceback.sys.exc_info()
+                if 'exceptions must be strings' in str(instance):
+                    print "Error in pyevent 0.3 on ubuntu in python 2.5. See http://orbited.org/pyevent.html for details"
+                    event.abort()
+                    sys.exit(0)
+                # TODO: Start: There is certainly a better way of doing this
+                x = StringIO.StringIO()
+                traceback.print_tb(tb, file=x)
+                x = x.getvalue()
+                relevant_line = x.split('\n')[-3]
+                # End: Find a better way
+
+                logger.critical('%s:%s\t%s' % (exception, instance, relevant_line))
+                print x
+

daemon/orbited/buffer.py

+
+# logger = getLogger("HTTPBuffer")
+
+class Buffer(object):
+    ''' A Buffer object buffers text, and has two modes, 'index' and 'consume'
+        
+        In index mode, which is the default initial mode of a Buffer object,
+        the buffer object keeps track of a position 'pos', which can be
+        altered by calling b.move, or by b.exhaust, which moves 'pos' to the
+        end of the file.  ... TODO: finish this
+        
+        In consume mode, a buffer essentially just a proxy for its contained
+        string b.data.
+    '''
+    def __init__(self, initial_data='', mode='index'):
+        self.mode = None
+        self.pos = None
+        self.data = initial_data        
+        self.set_mode(mode)
+    
+    def set_mode(self, mode):
+        # No change
+        if self.mode == mode:
+            return
+        # Switch from consume to index or initial mode is index
+        elif mode == 'index':
+            self.mode = 'index'
+            self.pos = 0
+        # Switch from index to consume
+        elif mode == 'consume' and self.pos is not None:
+            self.data = self.data[self.pos:]
+            self.pos = None
+            self.mode = 'consume'
+        # initial mode is consume
+        else: 
+            self.mode = 'consume'
+            self.pos = None
+    
+    def find(self, marker, i=0):
+        if self.mode == 'consume':
+            return self.data.find(marker, i)
+        elif self.mode == 'index':
+            pos =  self.data.find(marker, i + self.pos)
+            if pos == -1:
+                return -1
+            return pos - self.pos
+    
+    def __str__(self):
+        return self.get_value()
+    
+    def get_value(self):
+        ''' Return the data in consume mode, or the remainder of the data in
+            index mode.
+        '''
+        if self.mode == 'consume':
+            return self.data
+        elif self.mode == 'index':
+            return self.data[self.pos:]
+    
+    def get_full_value(self):
+        ''' Returns the full value of the string, even in index mode. '''
+        return self.data
+    
+    def exhaust(self):
+        ''' In index mode, sets position to the end of the buffered data.  In
+            consume mode, sets data to empty.
+        '''
+        if self.mode == 'consume':
+            self.data = ''
+        elif self.mode == 'index':
+            self.pos = len(self.data)
+    
+    def reset_position(self):
+        ''' Resets position to 0. '''
+        if self.mode == 'index':
+            self.pos = 0
+    
+    def reset(self, content = ''):
+        ''' Resets data to empty, or an opional 'content' string, and position
+            to 0.
+        '''
+        self.data = content
+        if self.mode == 'index':
+            self.pos = 0
+    
+    def empty(self):
+        ''' Boolean; is true if the the buffer is empty '''
+        if self.mode == 'consume':
+            return len(self.data) == 0
+        elif self.mode == 'index':
+            return (len(self.data) - self.pos) == 0
+    
+    def move(self, i):
+        if self.mode == 'consume':
+            self.data = self.data[i:]
+        elif self.mode == 'index':
+            self.pos += i
+    
+    def part(self, start, end):
+        if self.mode == 'consume':
+            return self.data[start:end]
+        elif self.mode == 'index':
+            return self.data[self.pos + start: self.pos + end]
+    
+    def __contains__(self, data):
+        if self.mode == 'consume':
+            return data in self.data
+        elif self.mode == 'index':
+            return self.data.find(data, self.pos) != -1
+    
+    def __eq__(self, data):
+        if self.mode == 'consume':
+            return self.data == data
+        elif self.mode == 'index':
+            return self.data[self.pos:] == data
+    
+    def __len__(self):
+        if self.mode == 'consume':
+            return len(self.data)
+        elif self.mode == 'index':
+            return len(self.data) - self.pos
+    
+    def __get_slice__(self, start, end):
+        return self.part(self, start, end)
+    
+    def __add__(self, add_data):
+        ''' Add the passed-in string to the buffer '''
+        if not isinstance(add_data, str):
+            raise TypeError
+        self.data += add_data
+        return self        
+    

daemon/orbited/config.py

+map = {
+    '[global]': {
+        'op.port': '9000',
+        'http.port': '8000',
+        'admin.enabled': '0',
+        'proxy.enabled': '1',
+        'proxy.keepalive': '1',
+        'proxy.keepalive_default_timeout': 300,
+        'log.enabled': '1',
+        'session.default_key': '0', # TODO: change to token.default_key
+        'session.timeout': 5,
+        'event.retry_limit': 1
+    },
+    '[transport]': {
+        'default': 'stream',
+        'xhr.timeout': '30',
+    },
+    '[admin]': {
+        'admin.port': '9001'
+    },
+    '[proxy]': [
+        # ('/', ('69.60.117.172', 80)),
+        # ('/', ('ORBITED', None)),
+    ],
+    '[log]': {
+        'type': 'basic',
+        'level': 'INFO',
+        # 'type': 'file',
+        # 'location': 'logging.conf'
+        # 'log.access': ('screen',),
+        # 'log.error': ('screen',),
+        # 'log.event': ('screen',)
+    },
+    'default_config': 1, # set to 0 later if we load a config file
+}
+
+def update(**kwargs):
+    map.update(kwargs)
+    return True
+
+def load(filename):
+    try:
+        f = open(filename)
+        lines = [line.strip() for line in f.readlines()]
+        map['default_config'] = 0
+    
+    except IOError:    
+        print filename, 'could not be found. Using default configuration'
+        return False
+        # lines = default.split('\n')
+    
+    section = None
+    for line in lines:
+        
+        # ignore comments
+        if '#' in line:
+            line, comment = line.split('#', 1)
+        if not line:
+            continue
+        
+        # start of new section; create a dictionary for it in map if one
+        # doesn't already exist
+        if line.startswith('[') and line.endswith(']'):
+            section = line
+            if section not in map:
+                map[section]  = {}
+            continue
+        
+        # assign each source in the proxy section to a target address and port
+        if section == '[proxy]':
+            source, target = [side.strip() for side in line.split('->')]
+            if target.startswith('http://'):
+                target = target[7:]
+            if ':' in target:
+                addr, port = target.split(':', 1)
+                port = int(port)
+            else:
+                addr, port = target, 80
+            map[section].append((source, (addr, port)))
+            continue
+        
+        # skip lines which do not assign a value to a key
+        if '=' not in line:
+            continue
+        
+        key, value = [side.strip() for side in line.split('=', 1)]
+        
+        # in log section, value should be a tuple of one or two values
+        if section == '[log]':
+            value = tuple([val.strip() for val in value.split(',', 1)])
+        
+        map[section][key] = value
+    
+    print 'CONFIG'
+    print map
+    return True

daemon/orbited/exceptions.py

+class HTTPProtocolError(Exception):
+    pass
+
+class OrbitedHTTPError(Exception):
+    pass
+
+class InvalidTransport(Exception):
+    pass
+
+class DuplicateTransportName(Exception):
+    pass

daemon/orbited/http/__init__.py

+from daemon import HTTPDaemon

daemon/orbited/http/content.py

+from orbited.log import getLogger
+logger = getLogger('HTTPContent')
+
+class HTTPContent(object):
+    id = 0    
+    def __init__(self, content, succ_cb=None, fail_cb=None):
+        HTTPContent.id += 1
+        self.id = HTTPContent.id
+        self._content = content
+        self.succ_cb = succ_cb
+        self.fail_cb = fail_cb
+    
+    def success(self, conn):
+        logger.debug('Successfully sent content [id: %s]' % self.id)
+        if self.succ_cb:
+            self.succ_cb(conn)
+    
+    def failure(self, conn, reason):
+        logger.debug('Failed to send content [id: %s], reason: %s' % (self.id, reason))
+        if self.fail_cb:
+            self.fail_cb(conn, reason)
+    
+    def content(self):
+        return self._content
+    
+
+class HTTPClose(object):
+    
+    def success(self):
+        pass
+    
+    def failed(self, reason):
+        pass
+    
+    def content(self):
+        return ''
+    
+
+class HTTPRequestComplete(object):
+    
+    def success(self):
+        pass
+    
+    def failed(self, reason):
+        pass
+    
+    def content(self):
+        return ''
+    

daemon/orbited/http/daemon.py

+import event
+from orbited.log import getLogger
+from orbited.config import map as config
+from orbited import io
+from http import HTTPConnection
+
+logger = getLogger('HTTPDaemon')
+access = getLogger('ACCESS')
+            
+class HTTPDaemon(object):
+    
+    def __init__(self, app):
+        self.app = app
+        self.port = int(config['[global]']['http.port'])
+        sock = io.server_socket(self.port)
+        self.listen = event.event(self.accept_connection, handle=sock, evtype=event.EV_READ | event.EV_PERSIST)
+        self.listen.add()
+    
+    def accept_connection(self, ev, sock, event_type, *arg):
+        logger.debug('Accept Connection, ev: %s, sock: %s, event_type: %s, *arg: %s' % (ev, sock, event_type, arg))
+        sock, addr = sock.accept()
+        HTTPConnection(self, sock, addr)
+    

daemon/orbited/http/errors.py

+from orbited.util import formatBlock
+
+unknown = formatBlock("""
+    HTTP/1.0 500 Error
+    Content-Type: text/html
+      
+    An Unknown Error occurred.
+    """)
+    
+protocol = formatBlock("""
+    HTTP/1.0 500 Protocol Error
+    Content-Type: text/html
+    
+    This server did not understand the request protocol.<br>
+    Error: <b>%s</b>
+    """)
+
+
+oribited = formatBlock("""
+    HTTP/1.0 500 Orbited Error
+    Content-Type: text/html
+    
+    Orbited Error:<b>%s</b>
+    """)

daemon/orbited/http/http.py

+import cgi
+import event
+from router import StaticDestination, ProxyDestination, OrbitedDestination, \
+                   PluginDestination, OrbitedEventRequest, router
+from proxy import Proxy, ProxyComplete
+from orbited.buffer import Buffer
+#from orbited.orbit import HTTPOrbitRequest
+from content import HTTPContent, HTTPClose, HTTPRequestComplete
+from orbited.log import getLogger
+from orbited.config import map as config
+from orbited import io
+from orbited.exceptions import HTTPProtocolError, OrbitedHTTPError
+import errors
+access = getLogger('ACCESS')                   
+
+
+class HTTPConnection(object):
+    logger = getLogger('HTTPConnection')
+    id = 0
+#    import logging
+#    logger.setLevel(logging.DEBUG)
+    def __init__(self, daemon, sock, addr):
+        HTTPConnection.id += 1
+        self.id = HTTPConnection.id
+        pass # self.logger.debug('Creating HTTPConnection with id %s' % self.id)
+        self.daemon = daemon
+        self.app = daemon.app
+        self.sock = sock
+        self.addr, self.local_port = addr
+        self.new_request()
+        self.proxy = None
+    
+    def new_request(self):
+        self.browser_conn = None
+        self.revent = event.read(self.sock, self.read_ready)
+        self.wevent = None
+        self.request = HTTPRequest(self)
+        self.write_buffer = Buffer(mode='consume')
+        self.buffer = Buffer()
+        self.response_queue = []
+        self.current_response = None
+        self.state = 'reading'
+    
+    def proxy_completed(self):
+        if self.proxy.keepalive:
+            self.proxy.completed()
+            self.new_request()
+        else:
+            self.proxy.close()
+            self.close()
+    
+    def read_ready(self):
+        try:
+            data = self.sock.recv(io.BUFFER_SIZE)
+            if not data:
+                pass # self.logger.debug('Read empty string... call self.close()')
+                # self.close()
+                return None
+            return self.read(data)
+        except io.socket.error:
+            self.close()
+            return None
+    
+    def read(self, data):
+        # pass # self.logger.debug('entered read, data: %s' % (data[:4] + '...'))
+        pass # self.logger.debug('state: %s' % self.state)
+        # TODO: is this really HTTP/1.1 keepalive compliant? I don't think so.
+        #       those return None's look dubious. Do we ever read again after
+        #       the first request?
+        
+        if self.state != 'reading':
+            pass # self.logger.debug('raising something...')
+            self.write(HTTPContent(errors.protocol))
+            self.write(HTTPClose())
+            self.logger.error("HTTPProtocolError", 'Unexpected data: %s' % data) 
+            self.revent = None
+            return None
+        self.buffer += data
+        try:
+            completed = self.request.process()
+        except HTTPProtocolError, msg:
+            self.logger.error("HTTPPrtocolError", msg)
+            self.write(HTTPContent(errors.protocol))
+            self.write(HTTPClose())
+            self.revent = None
+            return None
+        if self.request.process():
+            return self.dispatch()
+        # Continue reading...
+        return True
+
+    def dispatch(self):
+        action = router(self.request)
+        if isinstance(action, ProxyDestination):
+            access.info('ACCESS\tPROXY\t%s -> %s:%s [ %s ]' % (self.request.url, action.addr, action.port, self.addr))
+            self.state = 'proxy'
+            if not self.proxy:
+                self.proxy = Proxy(self)
+            # Give proxy the whole buffer
+            self.buffer.reset_position()
+            self.proxy.proxy_request(action.addr, action.port)
+            return None
+
+        elif isinstance(action, StaticDestination):
+            access.info('ACCESS\tSTATIC\t%s' % action.url)
+            self.state = 'static'
+            self.write(HTTPContent(action.content))
+            self.write(HTTPClose())
+            return None
+
+        elif isinstance(action, PluginDestination):
+            access.info('ACCESS\tPlugin (%s)\t%s' % (action.plugin.name, action.url))
+            self.state = "plugin"
+            action.plugin.dispatch(self.request)            
+            return None
+        elif isinstance(action, OrbitedDestination):
+            try:
+                conn = BrowserConnection(self.request)
+                self.browser_conn = conn
+                access.info('ACCESS\tORBITED\\%s\t%s' % (conn.transport, conn.recipient()))
+                self.state = 'orbited'
+                self.app.accept_browser_connection(conn)
+            except OrbitedHTTPError, message:
+                self.logger.error("OrbitedHTTPError", message)
+                self.write(HTTPContent(errors.orbited % message))
+                self.write(HTTPClose())
+                if config['default_config']:
+                    self.logger.warn('Perhaps this is because no orbited.cfg file is in use?')
+                self.revent = None
+                return None
+            # self.logger.warn('No router rule found for: %s' % action.url)
+            return None
+
+        elif isinstance(action, OrbitedEventRequest):
+            self.state = 'orbitrequest'
+            req = HTTPEventRequest(self.request)
+            access.info('ACCESS\tHTTP-OP\t\%s\t%s' % (req.id, req.addr))
+            self.app.accept_orbit_request(HTTPOrbitRequest(self.request))
+            # return True # TODO: allow OP over HTTP
+    
+    
+    def write(self, content):
+        self.response_queue.append(content)
+        if not self.wevent:
+            # TODO: creating a write event if the socket is closed causes a
+            #       big crash. fix it.
+            self.wevent = event.write(self.sock, self.write_ready)
+            self.current_response = self.response_queue.pop(0)
+            self.write_buffer.reset(self.current_response.content())
+    
+    def write_ready(self):
+        self.logger.debug('entered write_ready')
+        if self.write_buffer.empty():
+            self.logger.debug('send completed for %s' % self.current_response)
+            self.current_response.success(self)        
+            self.current_response = None
+            if not self.response_queue:
+                self.wevent = None
+                return None
+            self.current_response = self.response_queue.pop(0)
+            self.logger.debug('new response: %s' % self.current_response)
+            if isinstance(self.current_response, ProxyComplete):
+                self.logger.debug('proxy complete')
+                self.current_response = None
+                self.proxy_completed()
+                if not self.response_queue:
+                    return None
+                self.current_response = self.response_queue.pop(0)
+            if isinstance(self.current_response, HTTPRequestComplete):
+                self.logger.debug('request complete')
+                self.current_response = None
+                self.new_request()
+                return None
+            if isinstance(self.current_response, HTTPClose):
+                self.logger.debug('write_ready recieved HTTPClose')
+                self.current_response = None
+                self.close()
+                return None
+            
+            self.write_buffer.reset(self.current_response.content())
+        
+        # Do actual writing here
+        try:
+            bsent = self.sock.send(self.write_buffer.get_value())
+            # Bandwith hook
+            self.sent_amount(bsent)
+            self.logger.debug('SEND: %s' % self.write_buffer.part(0,bsent))
+            self.write_buffer.move(bsent)
+            self.logger.debug('write_buffer: return True')
+            # self.write_buffer = self.write_buffer[bsent:]
+            return True
+        except io.socket.error, msg:
+            self.logger.debug('io.socket.error: %s' % msg)
+            self.close(reason=msg)
+            return None
+    
+    # Bandwith hook
+    def sent_amount(self, amount):
+        pass
+    
+    def close(self, reason=''):
+        self.logger.debug('entered close')
+        if self.current_response:
+            self.current_response.failure(self, reason)
+        for response in self.response_queue:
+            response.failure(self, reason)
+        if self.wevent:
+            self.wevent.delete()
+            self.wevent = None
+        if self.revent:
+            self.revent.delete()
+            self.revent = None
+        self.sock.close()
+        if self.state == 'orbited':
+            self.app.expire_browser_connection(self.browser_conn)
+    
+
+class HTTPRequest(object):
+    logger = getLogger('HTTPRequest')
+    
+    def __init__(self, conn):
+        self.conn = conn
+        self.state = 'action'
+        self.headers = {}
+        self.complete = False
+        self.cookies = {}
+        self.form = {}
+    
+    def process(self):
+        return getattr(self, 'state_%s' % self.state)()        
+    
+    def state_action(self):
+        if '\r\n' not in self.conn.buffer:
+            return False
+        i = self.conn.buffer.find('\r\n')
+        self.action = self.conn.buffer.part(0,i)
+        try:
+            self.type, self.url, self.protocol = self.conn.buffer.part(0,i).split(' ')
+        except ValueError:
+            raise HTTPProtocolError, "Invalid HTTP status line"
+        self.type = self.type.lower()
+        self.protocol = self.protocol.lower()
+        self.conn.buffer.move(i+2)
+        self.state = 'headers'
+        return self.state_headers()
+    
+    def state_headers(self):
+#        if not hasattr(self, 'last_header_index'):
+#            self.last_header_index = 0
+#        index = self.conn.buffer.find('\r\n', self.last_header_index)
+#        if index == -1:
+#            self.last_header_index = len(self.conn.buffer)
+#            return False
+#        headers = dict([ line.split(': ') for line in self.conn.buffer.part(0, index).split('\r\n') ])
+#        self.conn.buffer.move(index+4)
+#        self.state = 'body'
+#        return self.state_body()
+#   Above method is no faster, apparently.
+        while True:
+            index = self.conn.buffer.find('\r\n')
+            if index == -1:
+                return False
+            if index == 0:
+                self.conn.buffer.move(2)
+                self.state='body'
+                return self.state_body()
+            try:
+                key, value = self.conn.buffer.part(0, index).split(': ')
+            except ValueError:
+                raise HTTPProtocolError, "Invalid HTTP header format"
+            self.headers[key.lower()] = value
+            self.conn.buffer.move(index+2)
+    
+    def state_body(self):
+        if self.type == 'get':
+            if '?' in self.url:
+                self.url, self.qs = self.url.split('?', 1)
+            else:
+                self.qs = ''            
+            self.state = 'completed'
+            return self.state_completed()
+        elif self.type == 'post':
+            if not hasattr(self, 'content_length'):
+                if 'content-length' not in self.headers:    
+                    raise HTTPProtocolError, 'No Content-Length header specified'
+                try:
+                    self.content_length = int(self.headers['content-length'])
+                except ValueError:
+                    raise HTTPProtocolError, 'Invalid Content-Length: %s' % self.headers[content-length]
+            if len(self.conn.buffer) < self.content_length:
+                return False
+            self.qs = self.conn.buffer.part(0, self.content_length)
+            self.conn.buffer.move(self.content_length)
+            self.state = 'completed'
+            return self.state_completed()
+    
+    def state_completed(self):
+        if 'cookie' in self.headers:
+            try:
+                for key, val in [ i.split('=', 1) for i in self.headers['cookie'].split(';') ]:
+                    self.cookies[key] = val
+            except ValueError:
+                raise HTTPProtocolError, "Invalid HTTP cookies format"
+        try:
+            for key, val in cgi.parse_qsl(self.qs):
+                self.form[key] = val
+        except ValueError:
+            raise HTTPProtocolError, "Invalid querystring format"
+            
+        return True
+    
+    def key(self):
+        return self.user_id, self.session_id, self.location
+    
+    def respond(self, response):
+        self.conn.write(response)
+
+    def close(self):
+        self.conn.close()        
+        
+class BrowserConnection(object):
+    id = 0
+    logger = getLogger('BrowserConnection')
+    def __init__(self, request):
+        BrowserConnection.id += 1
+        self.id = BrowserConnection.id
+        self.request = request
+        if 'user' not in request.form:
+            raise OrbitedHTTPError, 'user ID must be specified in the GET or POST request'
+        self.user_id = request.form['user']
+        self.location = request.url
+        # session key is either in form or cookies
+        self.session_key = request.form.get('session', None)
+        if not self.session_key:
+            pass # self.logger.debug('session not found in form')
+            self.session_key = request.cookies.get('session', None)
+            if not self.session_key:
+                pass # self.logger.debug('session not foun in cookie')
+                pass # self.logger.debug('choosing default session key')
+                self.session_key = config['[global]']['session.default_key']
+        else:
+            pass # self.logger.debug('session key found in form')
+        self.transport = request.form.get('transport', config['[transport]']['default'])
+        self.addr = self.request.conn.addr
+
+    def respond(self, response):
+        pass # self.logger.debug('entered respond()')
+        self.request.respond(response)
+
+    def close(self):
+        pass # self.logger.debug('entered close()')
+        self.request.close()
+
+    def key(self):
+        return self.user_id, self.session_key, self.location
+
+    def recipient(self):
+        return '"%s, %s, %s" [%s]' % (self.user_id, self.session_key, self.location, self.addr)
+

daemon/orbited/http/proxy.py

+import event
+
+from orbited import io
+from content import HTTPContent
+from orbited.config import map as config
+from orbited.log import getLogger
+from orbited.buffer import Buffer
+
+   
+class ProxyComplete(object):
+    pass
+
+class Proxy(object):
+    logger = getLogger('proxy')
+    # import logging; logger.setLevel(logging.DEBUG)
+    id = 0
+    def __init__(self, conn):
+        Proxy.id += 1
+        self.id = Proxy.id
+        self.logger.debug('Creating Proxy with id %s' % self.id)
+        self.conn = conn
+        self.server = None
+        self.state = 'waiting'
+        self.keepalive = bool(int(config['[global]']['proxy.keepalive']))
+        
+        if self.conn.request.protocol == 'http/1.0':
+            self.protocol = 1.0
+            if self.keepalive:
+                # Default for http/1.0 is to Close
+                k = self.conn.request.headers.get('connection', 'Close')
+                if k.lower() != 'keep-alive':
+                    self.keepalive = False
+        elif self.conn.request.protocol == 'http/1.1':
+            self.protocol = 1.1
+            if self.keepalive:
+                # Default for http/1.1 is to Keep Alive
+                k = self.conn.request.headers.get('connection', 'keep-alive')
+                if k.lower() != 'keep-alive':
+                    self.keepalive = False
+        else:
+            self.logger.debug('protocol is: %s' % self.conn.request.protocol)
+            raise ProtocolError
+    
+    def proxy_request(self, addr, port):
+        self.logger.debug('proxying request: %s, %s' % (addr, port))
+        self.logger.debug('url: %s' % self.conn.request.url)
+        if self.state != 'waiting':
+            raise ProtocolError
+        if self.server and self.server.location != (addr, port):
+            self.logger.debug('old server is invalid')
+            self.server.close()
+            self.server = None
+        if not self.server:
+            self.logger.debug('creating new server')
+            self.server = ServerConnection(self, addr, port)
+        # TODO: possible improvement... don't copy the buffer, just pass it
+        #       along and make a new buffer for the conn
+        self.state = 'proxying'
+        self.logger.debug('calling self.server.send_request')
+        self.server.send_request(self.conn.buffer.get_value())        
+    
+    def completed(self):
+        # TODO: check self.keepalive, and the server's connection header
+        #       to figure out if we should close the server connection or not
+        self.logger.debug('completed...')
+        self.state = 'waiting'
+    
+    def close(self):
+        if self.server:
+            self.server.close()
+            self.server = None
+        self.state = 'closed'
+    
+    def relay_response(self, data):
+        self.conn.write(data)
+    
+    def failure(self, err):
+        self.logger.debug('proxy failed: %s ' % err)
+        self.server.close()
+        # TODO: check if we already started responding
+        #       if we haven't, send an html error page
+        #       if we have, then just close the connection entirely
+        self.conn.close()
+    
+
+class ServerConnection(object):
+    id = 0
+    logger = getLogger('ServerConnection')
+    # import logging; logger.setLevel(logging.DEBUG)    
+    def __init__(self, proxy, addr, port):
+        ServerConnection.id += 1
+        self.id = ServerConnection.id
+        self.logger.debug('CREATING ServerConnection with id %s' % self.id)
+        self.proxy = proxy
+        self.location = addr, port
+        self.sock = io.client_socket(addr, port)
+        self.revent = None #event.read(self.read_ready, self.sock)
+        self.wevent = None
+        self.write_buffer = None
+        self.complete = False
+        self.state = 'connecting'
+    
+    def send_request(self, data):
+        self.logger.debug('entered send_request')
+        if self.state == 'writing' or self.state == 'reading':
+            raise ProtocolError
+        
+        self.buffer = ProxyBuffer()
+        self.write_buffer = Buffer(data, mode='consume')
+        self.wevent = event.write(self.sock, self.write_ready)
+        self.logger.debug('exiting send_request')
+    
+    def read_ready(self):
+        try:
+            data = self.sock.recv(io.BUFFER_SIZE)
+            self.buffer.recv(data)
+            if self.buffer.out:
+                content = HTTPContent(self.buffer.out.get_value())
+                self.buffer.out.exhaust()
+                self.proxy.relay_response(content)
+            if self.buffer.complete:
+                self.revent = None
+                self.complete = True
+                self.proxy.relay_response(HTTPContent(""))
+                self.proxy.relay_response(ProxyComplete())
+                self.state = 'waiting'
+                return None
+            return True
+        except:
+            raise
+    
+    def write_ready(self):
+        self.logger.debug('entered write_ready')
+        if self.state == 'connecting':
+            self.state = 'writing'
+        try:
+            bsent = self.sock.send(self.write_buffer.get_value())
+            self.write_buffer.move(bsent)
+            if self.write_buffer.empty():
+                self.logger.debug('Finished sending proxied request to server')
+                self.wevent = None
+                self.revent = event.read(self.sock, self.read_ready)
+                self.state = 'reading'
+                return None
+        except io.socket.error, err:
+            self.logger.debug('error')
+            raise
+            self.server_closed(err)
+    
+    def server_closed(self, err=None):
+        if self.complete:
+            pass
+        else:
+            self.proxy.failure(err)
+    
+    def close(self):
+        if self.revent:
+            self.revent.delete()
+            self.revent = None
+        if self.wevent: