Anonymous avatar Anonymous committed a20d2ea

moving to the new name

Comments (0)

Files changed (19)

-Turtle is an HTTP proxy whose purpose is to throttle connections to
+Turtl is an HTTP proxy whose purpose is to throttle connections to
 specific hostnames to avoid breaking terms of usage of those API
 providers (like del.icio.us, technorati and so on).
 
 application or it can be started stand-alone using the provided twistd
 plugin.
 
-Here's an example to use turtle from inside another python application:
+Here's an example to use turtl from inside another python application:
 
 {{{
 from twisted.internet import reactor
-from turtle import proxy, config
+from turtl import proxy, config
 
 s = """\
 # defaults is pulled out of the end results
 """
 
 urlmap, rest, port = config.loadConfigFromString(s)
-f = proxy.TurtleHTTPFactory(urlmap, rest)
+f = proxy.TurtlHTTPFactory(urlmap, rest)
 reactor.listenTCP(port, f)
 reactor.run()
 }}}
 And here's an example of a command line to start it stand alone:
 
 {{{
-twistd --logfile=/var/log/turtle.log --pidfile=/var/run/turtle.pid turtle -c /some/path/config.yaml -p turtleOne
+twistd --logfile=/var/log/turtl.log --pidfile=/var/run/turtl.pid turtl -c /some/path/config.yaml -p turtlOne
 }}}
 

examples/basic.py

 from twisted.internet import reactor
-from turtle import proxy, config
+from turtl import proxy, config
 
 s = """\
 # defaults is pulled out of the end results
 """
 
 urlmap, rest, port = config.loadConfigFromString(s)
-f = proxy.TurtleHTTPFactory(urlmap, rest)
+f = proxy.TurtlHTTPFactory(urlmap, rest)
 reactor.listenTCP(port, f)
 reactor.run()
 # See LICENSE for details.
 
 """
-Distutils installer for turtle.
+Distutils installer for turtl.
 """
 
 try:
 # For great justice, take off every zig.
 import sys, os, pprint, traceback
 
-import turtle
-version = turtle.__version__
+import turtl
+version = turtl.__version__
 
 install_requires = ["Twisted>=8.0.1", "PyYAML>=3.0.8"]
 
 
 # Most of the following code is from Divmod Epsilon, (C) 2008 Divmod, Inc.
 # under MIT license, see http://divmod.org/trac/wiki/DivmodEpsilon
-# since I only need this to install turtle correctly with Twisted
+# since I only need this to install turtl correctly with Twisted
 # Plugins I'd rather copy this few lines than actually add another
 # dependency. Also this version uses the setuptools setup() function.
 
     return distobj
 
 description = """\
-Turtle is an HTTP proxy whose purpose is to throttle connections to
+Turtl is an HTTP proxy whose purpose is to throttle connections to
 specific hostnames to avoid breaking terms of usage of those API
 providers (like del.icio.us, technorati and so on).
 """
 
 autosetup(
-    name = "turtle",
+    name = "turtl",
     author = "Valentino Volonghi",
     author_email = "valentino@adroll.com",
     url = "http://adroll.com/labs",

turtl/__init__.py

+__version__ = '0.0.3-dev'
+from yaml import load
+
+def loadConfigFromFile(filepath):
+    """
+    Load the configuration options from a filepath
+
+    @param filepath: The filepath to a configuration file.
+    @type filepath: C{twisted.python.filepath.FilePath}
+    """
+    return loadConfigFromString(filepath.getContent())
+
+def loadConfigFromString(s):
+    """
+    Load the configuration options from a string.
+
+    @param s: a C{str} that contains the yaml formatted
+                    configuration
+    @type s: yaml formatted C{str}
+
+    @returns: a tuple of hostname and L{engine.ThrottlingDeferred}s
+                and the default behavior for unknown urls.
+    """
+    from turtl import engine
+
+    loaded = load(s)
+
+    # Remove the filtering param
+    rest = loaded.pop('filter-rest', True)
+    port = loaded.pop('port', 8080)
+    # Remove the defaults to use them as
+    # a base to work on.
+    defaults = loaded.pop('defaults', {})
+    # If nothing is in the config file, this is the default
+    defaults.update({'calls': 1, 'interval': 1, 'concurrency': 10})
+
+    urlmapping = {}
+    for host, kwargs in loaded.iteritems():
+        # Fill an empty dictionary with the defaults
+        # And then override the defaults with the one
+        # in the current block so that we use them
+        kw = {}
+        kw.update(defaults)
+        kw.update(kwargs)
+        urlmapping[host] = engine.ThrottlingDeferred(**kw)
+
+    return urlmapping, rest, port
+
+from twisted.internet import defer, task
+
+class _ConcurrencyPrimitive(object):
+    _execute = defer.maybeDeferred
+
+    def __init__(self):
+        self.waiting = []
+
+    def _releaseAndReturn(self, r):
+        self.release()
+        return r
+
+    def run(*args, **kwargs):
+        """Acquire, run, release.
+
+        This function takes a callable as its first argument and any
+        number of other positional and keyword arguments.  When the
+        lock or semaphore is acquired, the callable will be invoked
+        with those arguments.
+
+        The callable may return a Deferred; if it does, the lock or
+        semaphore won't be released until that Deferred fires.
+
+        @return: Deferred of function result.
+        """
+        if len(args) < 2:
+            if not args:
+                raise TypeError("run() takes at least 2 arguments, none given.")
+            raise TypeError("%s.run() takes at least 2 arguments, 1 given" % (
+                args[0].__class__.__name__,))
+        self, f = args[:2]
+        args = args[2:]
+
+        def execute(ignoredResult):
+            d = self._execute(f, *args, **kwargs)
+            d.addBoth(self._releaseAndReturn)
+            return d
+
+        d = self.acquire(kwargs.pop('_hpriority', False))
+        d.addCallback(execute)
+        return d
+
+    def runasap(*args, **kwargs):
+        """Acquire, run, release and put in the front of the waiting queue
+
+        @return: Deferred of function result.
+        """
+        kwargs['_hpriority'] = True
+        return _ConcurrencyPrimitive.run(*args, **kwargs)
+
+class ThrottlingDeferred(_ConcurrencyPrimitive):
+    def __init__(self, concurrency, calls, interval):
+        """
+        Throttling deferred that considers both the concurrency
+        requirements and the frequency, over time, of calls that
+        you are allowed to make. It's clear however that if the
+        rate of calls is higher than the tokens there will be
+        a queue, and the queue can grow indefinitely if calls don't
+        return quickly enough. More specifically: if T(f) is the
+        time it takes to execute a call, and this time is formed
+        by Ts(f) and Tp(f) [serial time and parallelizable time]:
+
+            Ts(f)*calls + Tp(f)*(calls/tokens) <= interval
+
+        If this is not true then the ingress could be too high
+        and causing an ever-increasing queue.
+
+        @param concurrency: The maximum number of concurrent
+                            calls.
+        @type concurrency: C{int}
+
+        @param calls: Represents the number of calls that
+                can be made every C{interval}
+        @type calls: C{int}
+
+        @param interval: Represents the time between a
+                C{calls} number of calls
+
+        NOTE: Currently it's not a requirement but if distributed
+                usage of this deferred was a necessity, the points
+                and current concurrency levels should be stored
+                somewhere else and updated every time they are
+                checked (there would also be race conditions and
+                so on).
+        """
+        _ConcurrencyPrimitive.__init__(self)
+
+        self._sem = defer.DeferredSemaphore(concurrency)
+        self._execute = self._sem.run
+
+        self.calls = calls
+        self.interval = interval
+        self.points = calls
+
+        self._resetLoop = task.LoopingCall(self._reset)
+        self._resetLoop.start(interval, now=False)
+
+    def _reset(self):
+        self.points = self.calls
+        self.release()
+
+    def acquire(self, priority=False):
+        """Attempt to acquire the token.
+
+        @param priority: Defines an high priority call that should
+                            either be executed immediately or scheduled
+                            as the immediate next one.
+        @type priority: C{bool}
+
+        @return: a Deferred which fires on token acquisition.
+        """
+        assert self.points >= 0, "Internal inconsistency??  points should never be negative"
+
+        d = defer.Deferred()
+        if not self.points:
+            if priority:
+                # Think about a better data structure for this
+                self.waiting.insert(0, d)
+            else:
+                self.waiting.append(d)
+        else:
+            self.points = self.points - 1
+            d.callback(self)
+        return d
+
+    def release(self):
+        """Release the token.
+
+        Should be called by whoever did the acquire() when the shared
+        resource is free.
+        """
+        if self.points > 0 and self.waiting:
+            self.points = self.points - 1
+            d = self.waiting.pop(0)
+            d.callback(self)
+
+import urlparse
+
+from twisted.internet import reactor, defer, error
+from twisted.web import proxy, http
+from twisted.python import log
+
+class TurtlProxyClientFactory(proxy.ProxyClientFactory):
+    """
+    Redefine the L{proxy.ProxyClientFactory} in order to trigger
+    the L{TurtlProxyRequest.completed} deferred once the connection
+    has been closed.
+    """
+    def clientConnectionLost(self, connector, reason):
+        proxy.ProxyClientFactory.clientConnectionLost(self, connector, reason)
+        if reason.trap(error.ConnectionDone):
+            self.father.completed.callback(None)
+        else:
+            self.father.completed.errback(reason)
+
+class TurtlProxyRequest(proxy.ProxyRequest):
+    """
+    Redefine L{proxy.ProxyRequest} to add support for upcalling the
+    factory to decide whether to forward or filter a specific request.
+    If the request is blocked then return an error page to the client.
+
+    Also introduce a deferred that is fired once the forwarded host
+    request has completed, so that we can unlock the caller that could
+    be waiting for that to be over.
+
+    @ivar completed: a L{defer.Deferred} that is triggered once the
+                        outgoing request has completed.
+    """
+
+    protocols = {'http': TurtlProxyClientFactory}
+
+    def __init__(self, channel, queued, reactor=reactor):
+        proxy.ProxyRequest.__init__(self, channel, queued, reactor)
+        self.completed = defer.Deferred()
+
+    def process(self):
+        parsed = urlparse.urlparse(self.uri)
+        protocol = parsed[0]
+        host = parsed[1]
+        port = self.ports[protocol]
+        if ':' in host:
+            host, port = host.split(':')
+            port = int(port)
+        rest = urlparse.urlunparse(('', '') + parsed[2:])
+        if not rest:
+            rest = rest + '/'
+        class_ = self.protocols[protocol]
+        headers = self.getAllHeaders().copy()
+        priority = headers.pop('x-priority', False)
+
+        if 'host' not in headers:
+            headers['host'] = host
+        self.content.seek(0, 0)
+        s = self.content.read()
+        clientFactory = class_(self.method, rest, self.clientproto,
+                               headers, s, self)
+
+        processed = self.channel.factory.process(host, port, clientFactory,
+                                                 priority, self.completed)
+        if not processed:
+            self.sendError('''<H1>%s domain is filtered</H1>''' % (host,))
+
+        else:
+            processed.addErrback(self.sendError)
+
+    def sendError(self, line):
+        self.transport.write("HTTP/1.0 501 Gateway error\r\n")
+        self.transport.write("Content-Type: text/html\r\n")
+        self.transport.write("\r\n")
+        self.transport.write(str(line))
+        self.transport.loseConnection()
+
+
+class TurtlProxy(proxy.Proxy):
+    requestFactory = TurtlProxyRequest
+
+class TurtlHTTPFactory(http.HTTPFactory):
+    """
+    Override the default HTTPFactory to add support for a shared (among
+    all incoming requests) mapping of urls to request throttlers.
+
+    There is one factory for all incoming connections so shared state has
+    to stay here.
+
+    @ivar urlmapping: A C{dict} that maps hostnames to instances of
+                            L{turtl.engine.ThrottlingDeferred}.
+
+    @ivar filter_rest: Define the behavior of unknown urls. If C{True}
+                        block those requests.
+    """
+
+    protocol = TurtlProxy
+
+    def __init__(self, urlmapping={}, filter_rest=False, reactor=reactor):
+        http.HTTPFactory.__init__(self)
+        self.urlmapping = urlmapping
+        self.filter_rest = filter_rest
+        self.reactor = reactor
+
+    def process(self, host, port, clientFactory, priority, completed):
+        """
+        Process a request from the Proxy object. Check if it's in the
+        mapping for urls so that we can use the limits specified in the
+        configuration file. If there is not then check if we can forward
+        unknown hosts, and if we cannot block them. If priority is high
+        this request is scheduled in front of the queue of waiting requests
+        and will use the next slot available.
+        """
+        if host in self.urlmapping:
+            throttler = self.urlmapping[host]
+            if priority:
+                fun = throttler.runasap
+            else:
+                fun = throttler.run
+            return fun(self.makeRequest, host, port, clientFactory, completed)
+
+        else:
+            if self.filter_rest:
+                log.msg("Filtering %s:%s..." % (host, port))
+                return False
+            else:
+                return self.makeRequest(host, port, clientFactory, completed)
+
+    def makeRequest(self, host, port, clientFactory, completed):
+        """
+        Abstract the function away so that we can use it both directly
+        for unfiltered calls and through the throttling engine or other
+        similar calls.
+        """
+        log.msg("Proxying %s:%s..." % (host, port))
+        self.reactor.connectTCP(host, port, clientFactory)
+        return completed
+from twisted.application import service
+from twisted.python import log
+
+try:
+    # windows doesn't support syslog; so ok to pass
+    import syslog
+
+    class SyslogObserver:
+        def __init__(self, prefix):
+            self.prefix = prefix
+            syslog.openlog(prefix, 0, syslog.LOG_LOCAL1)
+
+        def emit(self, eventDict):
+            edm = eventDict['message']
+            if not edm:
+                if eventDict['isError'] and eventDict.has_key('failure'):
+                    text = eventDict['failure'].getTraceback()
+                elif eventDict.has_key('format'):
+                    text = eventDict['format'] % eventDict
+                else:
+                    # we don't know how to log this
+                    return
+            else:
+                text = ' '.join(map(str, edm))
+
+            lines = text.split('\n')
+            while lines[-1:] == ['']:
+                lines.pop()
+
+            firstLine = 1
+            for line in lines:
+                if firstLine:
+                    firstLine=0
+                else:
+                    line = '\t%s' % line
+                syslog.syslog(syslog.LOG_INFO, '[%s] %s' % (self.prefix, line))
+
+    def startLogging(prefix='Twisted', setStdout=1):
+        obs = SyslogObserver(prefix)
+        log.startLoggingWithObserver(obs.emit, setStdout=setStdout)
+except:
+    syslog = None
+
+
+
+def makeService(options):
+    """
+    Create the service for the application
+    """
+    return TurtlService(options)
+
+
+
+class TurtlService(service.Service):
+    """
+    I am the service responsible for starting up a TurtlProxy
+    instance with a command line given configuration file.
+    """
+    def __init__(self, options):
+        self.config = options['config']
+        self.syslog_prefix = options['with_syslog_prefix']
+
+    def startService(self):
+        """
+        Before reactor.run() is called we setup the system.
+        """
+        service.Service.startService(self)
+        try:
+            if self.syslog_prefix and syslog:
+                startLogging(self.syslog_prefix)
+
+            from turtl import proxy, config
+            urlmap, filter_rest, port = config.loadConfigFromFile(self.config)
+            log.msg('Initializing turtl...')
+            log.msg("Domains: %s" % (", ".join(urlmap.keys())))
+            desc = "Allowing"
+            if filter_rest:
+                desc = "Filtering"
+            log.msg("%s unknown domains" % (desc,))
+            f = proxy.TurtlHTTPFactory(urlmap, filter_rest)
+            from twisted.internet import reactor
+            reactor.listenTCP(port, f)
+        except:
+            import traceback
+            print traceback.format_exc()
+            raise

turtl/test/test_engine.py

+from twisted.internet import defer
+from twisted.trial import unittest
+from turtl import engine
+
+class TestEngine(unittest.TestCase):
+
+    def _incr(self, result, by=1):
+        self.counter += by
+
+    def setUp(self):
+        self.counter = 0
+
+    def testThrottler(self):
+        N = 13
+        thr = engine.ThrottlingDeferred(3*N, N, 200)
+        self.assert_(thr._resetLoop.running)
+        thr._resetLoop.stop()
+        self.assert_(not thr._resetLoop.running)
+
+        controlDeferred = defer.Deferred()
+        def helper(self, arg):
+            self.arg = arg
+            return controlDeferred
+
+        results = []
+        uniqueObject = object()
+        resultDeferred = thr.run(helper, self=self, arg=uniqueObject)
+        resultDeferred.addCallback(results.append)
+        resultDeferred.addCallback(self._incr)
+        self.assertEquals(results, [])
+        self.assertEquals(self.arg, uniqueObject)
+        controlDeferred.callback(None)
+        self.assertEquals(results.pop(), None)
+        self.assertEquals(self.counter, 1)
+
+        thr._reset()
+        self.counter = 0
+        for i in range(1, 1 + N):
+            thr.acquire().addCallback(self._incr)
+            self.assertEquals(self.counter, i)
+
+        thr.acquire().addCallback(self._incr)
+        self.assertEquals(self.counter, N)
+        self.assertEquals(thr.points, 0)
+
+        for i in range(1, N):
+            thr.acquire().addCallback(self._incr)
+            self.assertEquals(self.counter, N)
+
+        # Scheduled a prioritized call for last and
+        # later we'll check that this runs through
+        # before everything else.
+        thr.acquire(True).addCallback(self._incr, 10)
+
+        # Even if I release a lot, nothing happens
+        # until I have enough points to unlock the
+        # situation in the throttler
+        for i in thr.waiting:
+            self.assertEquals(thr.points, 0)
+            thr.release()
+        self.assertEquals(self.counter, N)
+        self.assertEquals(thr.points, 0)
+
+        # At this point I have: 14 waiting calls (N + 1)
+        # and 0 points. After I reset I'll have 13 waiting
+        # calls and 12 points.
+        thr._reset()
+        self.assertEquals(self.counter, N+10)
+        self.assertEquals(thr.points, 12)
+
+        # As I said 13 waiting and 12 points:
+        for i in range(len(thr.waiting)):
+            thr.release()
+
+        self.assertEquals(thr.points, 0)
+        self.assertEquals(self.counter, N*2+9)
+
+        # Now there's one waiting and 0 points
+        thr._reset()
+
+        # The last reset also called a release
+        # 0 waiting and 12 points
+        self.assertEquals(self.counter, 2*N+10)
+        self.assertEquals(thr.points, N-1)

turtl/test/test_proxy.py

+from twisted.test.proto_helpers import StringTransportWithDisconnection, LineSendingProtocol
+
+from twisted.python import failure
+from twisted.internet import defer, error
+from twisted.trial import unittest
+from turtl import proxy
+
+from twisted.web.test.test_proxy import FakeReactor, DummyParent, DummyChannel
+
+class TestEngine(unittest.TestCase):
+
+    def setUp(self):
+        self.counter = 0
+
+    def _incr(self, result, by=1):
+        self.counter += by
+        return result
+
+    def trap(self, failure, what):
+        failure.trap(what)
+
+    def testConnectionDone(self):
+        """
+        C{TurtlProxyClientFactory} calls the father's completed
+        deferred after clientConnectionLost is called.
+        """
+        class Father(object):
+            def __init__(self):
+                self.completed = defer.Deferred()
+
+        f = Father()
+        f.completed.addCallback(self._incr)
+        t = proxy.TurtlProxyClientFactory("GET", "/", "1.0", {}, "", f)
+        t.clientConnectionLost(object(), failure.Failure(error.ConnectionDone()))
+        self.assertEquals(self.counter, 1)
+
+        ## f = Father()
+        ## t = proxy.TurtlProxyClientFactory("GET", "/", "1.0", {}, "", f)
+        ## t.clientConnectionLost(object(), failure.Failure(error.ConnectionLost()))
+        ## self.assertFailure(f.completed, error.ConnectionLost)
+        ## self.assertEquals(self.counter, 1)
+
+# This test cases were taken almost verbatim from Twisted Matrix
+# http://www.twistedmatrix.com
+class TurtlProxyRequestTestCase(unittest.TestCase):
+    """
+    Tests for L{TurtlProxyRequest}.
+    """
+
+    def _testProcess(self, uri, expectedURI, method="GET", data=""):
+        """
+        Build a request pointing at C{uri}, and check that a proxied request
+        is created, pointing a C{expectedURI}.
+        """
+        transport = StringTransportWithDisconnection()
+        channel = DummyChannel(transport)
+        reactor = FakeReactor()
+        proxy_factory = proxy.TurtlHTTPFactory({}, False, reactor)
+        channel.factory = proxy_factory
+        request = proxy.TurtlProxyRequest(channel, False, reactor)
+        request.gotLength(len(data))
+        request.handleContentChunk(data)
+        request.requestReceived(method, 'http://example.com%s' % (uri,),
+                                'HTTP/1.0')
+
+        self.assertEquals(len(reactor.connect), 1)
+        self.assertEquals(reactor.connect[0][0], "example.com")
+        self.assertEquals(reactor.connect[0][1], 80)
+
+        factory = reactor.connect[0][2]
+        self.assertIsInstance(factory, proxy.TurtlProxyClientFactory)
+        self.assertEquals(factory.command, method)
+        self.assertEquals(factory.version, 'HTTP/1.0')
+        self.assertEquals(factory.headers, {'host': 'example.com'})
+        self.assertEquals(factory.data, data)
+        self.assertEquals(factory.rest, expectedURI)
+        self.assertEquals(factory.father, request)
+
+
+    def test_process(self):
+        """
+        L{TurtlProxyRequest.process} should create a connection to the given server,
+        with a L{TurtlProxyClientFactory} as connection factory, with the correct
+        parameters:
+            - forward comment, version and data values
+            - update headers with the B{host} value
+            - remove the host from the URL
+            - pass the request as parent request
+        """
+        return self._testProcess("/foo/bar", "/foo/bar")
+
+
+    def test_processWithoutTrailingSlash(self):
+        """
+        If the incoming request doesn't contain a slash,
+        L{TurtlProxyRequest.process} should add one when instantiating
+        L{TurtlProxyClientFactory}.
+        """
+        return self._testProcess("", "/")
+
+
+    def test_processWithData(self):
+        """
+        L{TurtlProxyRequest.process} should be able to retrieve request body and
+        to forward it.
+        """
+        return self._testProcess(
+            "/foo/bar", "/foo/bar", "POST", "Some content")
+
+
+    def test_processWithPort(self):
+        """
+        Check that L{TurtlProxyRequest.process} correctly parse port in the incoming
+        URL, and create a outgoing connection with this port.
+        """
+        transport = StringTransportWithDisconnection()
+        channel = DummyChannel(transport)
+        reactor = FakeReactor()
+        proxy_factory = proxy.TurtlHTTPFactory({}, False, reactor)
+        channel.factory = proxy_factory
+
+        request = proxy.TurtlProxyRequest(channel, False, reactor)
+        request.gotLength(0)
+        request.requestReceived('GET', 'http://example.com:1234/foo/bar',
+                                'HTTP/1.0')
+
+        # That should create one connection, with the port parsed from the URL
+        self.assertEquals(len(reactor.connect), 1)
+        self.assertEquals(reactor.connect[0][0], "example.com")
+        self.assertEquals(reactor.connect[0][1], 1234)
+
+
+    def test_filtering(self):
+        """
+        Check that L{TurtlProxyRequest.process} filters urls that
+        it doesn't know.
+        """
+        class FakeThrottler(object):
+            called_run = False
+            called_run_asap = False
+            def run(self, fun, *args, **kwargs):
+                self.called_run = True
+                return defer.maybeDeferred(fun, *args, **kwargs)
+
+            def runasap(self, fun, *args, **kwargs):
+                self.called_run_asap = True
+                return defer.maybeDeferred(fun, *args, **kwargs)
+
+
+        transport = StringTransportWithDisconnection()
+        transport.protocol = LineSendingProtocol([], False)
+        channel = DummyChannel(transport)
+        reactor = FakeReactor()
+
+        throttler = FakeThrottler()
+        urlmap = {'delicious.com': throttler}
+        proxy_factory = proxy.TurtlHTTPFactory(urlmap, True, reactor)
+        channel.factory = proxy_factory
+
+        request = proxy.TurtlProxyRequest(channel, False, reactor)
+        request.gotLength(0)
+        request.requestReceived('GET', 'http://example.com:1234/foo/bar',
+                                'HTTP/1.0')
+
+        # That should create one connection, with the port parsed from the URL
+        self.assertEquals(len(reactor.connect), 0)
+
+
+        request = proxy.TurtlProxyRequest(channel, False, reactor)
+        request.gotLength(0)
+        request.requestReceived('GET', 'http://delicious.com:1234/foo/bar',
+                                'HTTP/1.0')
+
+        # That should create one connection, with the port parsed from the URL
+        self.assertEquals(len(reactor.connect), 1)
+        self.assertEquals(reactor.connect[-1][0], "delicious.com")
+        self.assertEquals(reactor.connect[-1][1], 1234)
+        self.assertEquals(throttler.called_run, True)
+        self.assertEquals(throttler.called_run_asap, False)
+
+
+
+
+        request = proxy.TurtlProxyRequest(channel, False, reactor)
+        request.gotLength(0)
+        request.requestHeaders.addRawHeader('x-priority', 'interactive')
+        request.requestReceived('GET', 'http://delicious.com:1234/foo/bar',
+                                'HTTP/1.0')
+
+        # That should create one connection, with the port parsed from the URL
+        self.assertEquals(len(reactor.connect), 2)
+        self.assertEquals(reactor.connect[-1][0], "delicious.com")
+        self.assertEquals(reactor.connect[-1][1], 1234)
+        self.assertEquals(throttler.called_run_asap, True)
+

turtle/__init__.py

-__version__ = '0.0.3-dev'
Add a comment to this file

turtle/__init__.pyc

Binary file removed.

turtle/config.py

-from yaml import load
-
-def loadConfigFromFile(filepath):
-    """
-    Load the configuration options from a filepath
-
-    @param filepath: The filepath to a configuration file.
-    @type filepath: C{twisted.python.filepath.FilePath}
-    """
-    return loadConfigFromString(filepath.getContent())
-
-def loadConfigFromString(s):
-    """
-    Load the configuration options from a string.
-
-    @param s: a C{str} that contains the yaml formatted
-                    configuration
-    @type s: yaml formatted C{str}
-
-    @returns: a tuple of hostname and L{engine.ThrottlingDeferred}s
-                and the default behavior for unknown urls.
-    """
-    from turtle import engine
-
-    loaded = load(s)
-
-    # Remove the filtering param
-    rest = loaded.pop('filter-rest', True)
-    port = loaded.pop('port', 8080)
-    # Remove the defaults to use them as
-    # a base to work on.
-    defaults = loaded.pop('defaults', {})
-    # If nothing is in the config file, this is the default
-    defaults.update({'calls': 1, 'interval': 1, 'concurrency': 10})
-
-    urlmapping = {}
-    for host, kwargs in loaded.iteritems():
-        # Fill an empty dictionary with the defaults
-        # And then override the defaults with the one
-        # in the current block so that we use them
-        kw = {}
-        kw.update(defaults)
-        kw.update(kwargs)
-        urlmapping[host] = engine.ThrottlingDeferred(**kw)
-
-    return urlmapping, rest, port
-

turtle/engine.py

-from twisted.internet import defer, task
-
-class _ConcurrencyPrimitive(object):
-    _execute = defer.maybeDeferred
-
-    def __init__(self):
-        self.waiting = []
-
-    def _releaseAndReturn(self, r):
-        self.release()
-        return r
-
-    def run(*args, **kwargs):
-        """Acquire, run, release.
-
-        This function takes a callable as its first argument and any
-        number of other positional and keyword arguments.  When the
-        lock or semaphore is acquired, the callable will be invoked
-        with those arguments.
-
-        The callable may return a Deferred; if it does, the lock or
-        semaphore won't be released until that Deferred fires.
-
-        @return: Deferred of function result.
-        """
-        if len(args) < 2:
-            if not args:
-                raise TypeError("run() takes at least 2 arguments, none given.")
-            raise TypeError("%s.run() takes at least 2 arguments, 1 given" % (
-                args[0].__class__.__name__,))
-        self, f = args[:2]
-        args = args[2:]
-
-        def execute(ignoredResult):
-            d = self._execute(f, *args, **kwargs)
-            d.addBoth(self._releaseAndReturn)
-            return d
-
-        d = self.acquire(kwargs.pop('_hpriority', False))
-        d.addCallback(execute)
-        return d
-
-    def runasap(*args, **kwargs):
-        """Acquire, run, release and put in the front of the waiting queue
-
-        @return: Deferred of function result.
-        """
-        kwargs['_hpriority'] = True
-        return _ConcurrencyPrimitive.run(*args, **kwargs)
-
-class ThrottlingDeferred(_ConcurrencyPrimitive):
-    def __init__(self, concurrency, calls, interval):
-        """
-        Throttling deferred that considers both the concurrency
-        requirements and the frequency, over time, of calls that
-        you are allowed to make. It's clear however that if the
-        rate of calls is higher than the tokens there will be
-        a queue, and the queue can grow indefinitely if calls don't
-        return quickly enough. More specifically: if T(f) is the
-        time it takes to execute a call, and this time is formed
-        by Ts(f) and Tp(f) [serial time and parallelizable time]:
-
-            Ts(f)*calls + Tp(f)*(calls/tokens) <= interval
-
-        If this is not true then the ingress could be too high
-        and causing an ever-increasing queue.
-
-        @param concurrency: The maximum number of concurrent
-                            calls.
-        @type concurrency: C{int}
-
-        @param calls: Represents the number of calls that
-                can be made every C{interval}
-        @type calls: C{int}
-
-        @param interval: Represents the time between a
-                C{calls} number of calls
-
-        NOTE: Currently it's not a requirement but if distributed
-                usage of this deferred was a necessity, the points
-                and current concurrency levels should be stored
-                somewhere else and updated every time they are
-                checked (there would also be race conditions and
-                so on).
-        """
-        _ConcurrencyPrimitive.__init__(self)
-
-        self._sem = defer.DeferredSemaphore(concurrency)
-        self._execute = self._sem.run
-
-        self.calls = calls
-        self.interval = interval
-        self.points = calls
-
-        self._resetLoop = task.LoopingCall(self._reset)
-        self._resetLoop.start(interval, now=False)
-
-    def _reset(self):
-        self.points = self.calls
-        self.release()
-
-    def acquire(self, priority=False):
-        """Attempt to acquire the token.
-
-        @param priority: Defines an high priority call that should
-                            either be executed immediately or scheduled
-                            as the immediate next one.
-        @type priority: C{bool}
-
-        @return: a Deferred which fires on token acquisition.
-        """
-        assert self.points >= 0, "Internal inconsistency??  points should never be negative"
-
-        d = defer.Deferred()
-        if not self.points:
-            if priority:
-                # Think about a better data structure for this
-                self.waiting.insert(0, d)
-            else:
-                self.waiting.append(d)
-        else:
-            self.points = self.points - 1
-            d.callback(self)
-        return d
-
-    def release(self):
-        """Release the token.
-
-        Should be called by whoever did the acquire() when the shared
-        resource is free.
-        """
-        if self.points > 0 and self.waiting:
-            self.points = self.points - 1
-            d = self.waiting.pop(0)
-            d.callback(self)
-

turtle/proxy.py

-import urlparse
-
-from twisted.internet import reactor, defer, error
-from twisted.web import proxy, http
-from twisted.python import log
-
-class TurtleProxyClientFactory(proxy.ProxyClientFactory):
-    """
-    Redefine the L{proxy.ProxyClientFactory} in order to trigger
-    the L{TurtleProxyRequest.completed} deferred once the connection
-    has been closed.
-    """
-    def clientConnectionLost(self, connector, reason):
-        proxy.ProxyClientFactory.clientConnectionLost(self, connector, reason)
-        if reason.trap(error.ConnectionDone):
-            self.father.completed.callback(None)
-        else:
-            self.father.completed.errback(reason)
-
-class TurtleProxyRequest(proxy.ProxyRequest):
-    """
-    Redefine L{proxy.ProxyRequest} to add support for upcalling the
-    factory to decide whether to forward or filter a specific request.
-    If the request is blocked then return an error page to the client.
-
-    Also introduce a deferred that is fired once the forwarded host
-    request has completed, so that we can unlock the caller that could
-    be waiting for that to be over.
-
-    @ivar completed: a L{defer.Deferred} that is triggered once the
-                        outgoing request has completed.
-    """
-
-    protocols = {'http': TurtleProxyClientFactory}
-
-    def __init__(self, channel, queued, reactor=reactor):
-        proxy.ProxyRequest.__init__(self, channel, queued, reactor)
-        self.completed = defer.Deferred()
-
-    def process(self):
-        parsed = urlparse.urlparse(self.uri)
-        protocol = parsed[0]
-        host = parsed[1]
-        port = self.ports[protocol]
-        if ':' in host:
-            host, port = host.split(':')
-            port = int(port)
-        rest = urlparse.urlunparse(('', '') + parsed[2:])
-        if not rest:
-            rest = rest + '/'
-        class_ = self.protocols[protocol]
-        headers = self.getAllHeaders().copy()
-        priority = headers.pop('x-priority', False)
-
-        if 'host' not in headers:
-            headers['host'] = host
-        self.content.seek(0, 0)
-        s = self.content.read()
-        clientFactory = class_(self.method, rest, self.clientproto,
-                               headers, s, self)
-
-        processed = self.channel.factory.process(host, port, clientFactory,
-                                                 priority, self.completed)
-        if not processed:
-            self.sendError('''<H1>%s domain is filtered</H1>''' % (host,))
-
-        else:
-            processed.addErrback(self.sendError)
-
-    def sendError(self, line):
-        self.transport.write("HTTP/1.0 501 Gateway error\r\n")
-        self.transport.write("Content-Type: text/html\r\n")
-        self.transport.write("\r\n")
-        self.transport.write(str(line))
-        self.transport.loseConnection()
-
-
-class TurtleProxy(proxy.Proxy):
-    requestFactory = TurtleProxyRequest
-
-class TurtleHTTPFactory(http.HTTPFactory):
-    """
-    Override the default HTTPFactory to add support for a shared (among
-    all incoming requests) mapping of urls to request throttlers.
-
-    There is one factory for all incoming connections so shared state has
-    to stay here.
-
-    @ivar urlmapping: A C{dict} that maps hostnames to instances of
-                            L{turtle.engine.ThrottlingDeferred}.
-
-    @ivar filter_rest: Define the behavior of unknown urls. If C{True}
-                        block those requests.
-    """
-
-    protocol = TurtleProxy
-
-    def __init__(self, urlmapping={}, filter_rest=False, reactor=reactor):
-        http.HTTPFactory.__init__(self)
-        self.urlmapping = urlmapping
-        self.filter_rest = filter_rest
-        self.reactor = reactor
-
-    def process(self, host, port, clientFactory, priority, completed):
-        """
-        Process a request from the Proxy object. Check if it's in the
-        mapping for urls so that we can use the limits specified in the
-        configuration file. If there is not then check if we can forward
-        unknown hosts, and if we cannot block them. If priority is high
-        this request is scheduled in front of the queue of waiting requests
-        and will use the next slot available.
-        """
-        if host in self.urlmapping:
-            throttler = self.urlmapping[host]
-            if priority:
-                fun = throttler.runasap
-            else:
-                fun = throttler.run
-            return fun(self.makeRequest, host, port, clientFactory, completed)
-
-        else:
-            if self.filter_rest:
-                log.msg("Filtering %s:%s..." % (host, port))
-                return False
-            else:
-                return self.makeRequest(host, port, clientFactory, completed)
-
-    def makeRequest(self, host, port, clientFactory, completed):
-        """
-        Abstract the function away so that we can use it both directly
-        for unfiltered calls and through the throttling engine or other
-        similar calls.
-        """
-        log.msg("Proxying %s:%s..." % (host, port))
-        self.reactor.connectTCP(host, port, clientFactory)
-        return completed

turtle/service.py

-from twisted.application import service
-from twisted.python import log
-
-try:
-    # windows doesn't support syslog; so ok to pass
-    import syslog
-
-    class SyslogObserver:
-        def __init__(self, prefix):
-            self.prefix = prefix
-            syslog.openlog(prefix, 0, syslog.LOG_LOCAL1)
-
-        def emit(self, eventDict):
-            edm = eventDict['message']
-            if not edm:
-                if eventDict['isError'] and eventDict.has_key('failure'):
-                    text = eventDict['failure'].getTraceback()
-                elif eventDict.has_key('format'):
-                    text = eventDict['format'] % eventDict
-                else:
-                    # we don't know how to log this
-                    return
-            else:
-                text = ' '.join(map(str, edm))
-
-            lines = text.split('\n')
-            while lines[-1:] == ['']:
-                lines.pop()
-
-            firstLine = 1
-            for line in lines:
-                if firstLine:
-                    firstLine=0
-                else:
-                    line = '\t%s' % line
-                syslog.syslog(syslog.LOG_INFO, '[%s] %s' % (self.prefix, line))
-
-    def startLogging(prefix='Twisted', setStdout=1):
-        obs = SyslogObserver(prefix)
-        log.startLoggingWithObserver(obs.emit, setStdout=setStdout)
-except:
-    syslog = None
-
-
-
-def makeService(options):
-    """
-    Create the service for the application
-    """
-    return TurtleService(options)
-
-
-
-class TurtleService(service.Service):
-    """
-    I am the service responsible for starting up a TurtleProxy
-    instance with a command line given configuration file.
-    """
-    def __init__(self, options):
-        self.config = options['config']
-        self.syslog_prefix = options['with_syslog_prefix']
-
-    def startService(self):
-        """
-        Before reactor.run() is called we setup the system.
-        """
-        service.Service.startService(self)
-        try:
-            if self.syslog_prefix and syslog:
-                startLogging(self.syslog_prefix)
-
-            from turtle import proxy, config
-            urlmap, filter_rest, port = config.loadConfigFromFile(self.config)
-            log.msg('Initializing turtle...')
-            log.msg("Domains: %s" % (", ".join(urlmap.keys())))
-            desc = "Allowing"
-            if filter_rest:
-                desc = "Filtering"
-            log.msg("%s unknown domains" % (desc,))
-            f = proxy.TurtleHTTPFactory(urlmap, filter_rest)
-            from twisted.internet import reactor
-            reactor.listenTCP(port, f)
-        except:
-            import traceback
-            print traceback.format_exc()
-            raise
Add a comment to this file

turtle/test/__init__.py

Empty file removed.

turtle/test/test_engine.py

-from twisted.internet import defer
-from twisted.trial import unittest
-from turtle import engine
-
-class TestEngine(unittest.TestCase):
-
-    def _incr(self, result, by=1):
-        self.counter += by
-
-    def setUp(self):
-        self.counter = 0
-
-    def testThrottler(self):
-        N = 13
-        thr = engine.ThrottlingDeferred(3*N, N, 200)
-        self.assert_(thr._resetLoop.running)
-        thr._resetLoop.stop()
-        self.assert_(not thr._resetLoop.running)
-
-        controlDeferred = defer.Deferred()
-        def helper(self, arg):
-            self.arg = arg
-            return controlDeferred
-
-        results = []
-        uniqueObject = object()
-        resultDeferred = thr.run(helper, self=self, arg=uniqueObject)
-        resultDeferred.addCallback(results.append)
-        resultDeferred.addCallback(self._incr)
-        self.assertEquals(results, [])
-        self.assertEquals(self.arg, uniqueObject)
-        controlDeferred.callback(None)
-        self.assertEquals(results.pop(), None)
-        self.assertEquals(self.counter, 1)
-
-        thr._reset()
-        self.counter = 0
-        for i in range(1, 1 + N):
-            thr.acquire().addCallback(self._incr)
-            self.assertEquals(self.counter, i)
-
-        thr.acquire().addCallback(self._incr)
-        self.assertEquals(self.counter, N)
-        self.assertEquals(thr.points, 0)
-
-        for i in range(1, N):
-            thr.acquire().addCallback(self._incr)
-            self.assertEquals(self.counter, N)
-
-        # Scheduled a prioritized call for last and
-        # later we'll check that this runs through
-        # before everything else.
-        thr.acquire(True).addCallback(self._incr, 10)
-
-        # Even if I release a lot, nothing happens
-        # until I have enough points to unlock the
-        # situation in the throttler
-        for i in thr.waiting:
-            self.assertEquals(thr.points, 0)
-            thr.release()
-        self.assertEquals(self.counter, N)
-        self.assertEquals(thr.points, 0)
-
-        # At this point I have: 14 waiting calls (N + 1)
-        # and 0 points. After I reset I'll have 13 waiting
-        # calls and 12 points.
-        thr._reset()
-        self.assertEquals(self.counter, N+10)
-        self.assertEquals(thr.points, 12)
-
-        # As I said 13 waiting and 12 points:
-        for i in range(len(thr.waiting)):
-            thr.release()
-
-        self.assertEquals(thr.points, 0)
-        self.assertEquals(self.counter, N*2+9)
-
-        # Now there's one waiting and 0 points
-        thr._reset()
-
-        # The last reset also called a release
-        # 0 waiting and 12 points
-        self.assertEquals(self.counter, 2*N+10)
-        self.assertEquals(thr.points, N-1)

turtle/test/test_proxy.py

-from twisted.test.proto_helpers import StringTransportWithDisconnection, LineSendingProtocol
-
-from twisted.python import failure
-from twisted.internet import defer, error
-from twisted.trial import unittest
-from turtle import proxy
-
-from twisted.web.test.test_proxy import FakeReactor, DummyParent, DummyChannel
-
-class TestEngine(unittest.TestCase):
-
-    def setUp(self):
-        self.counter = 0
-
-    def _incr(self, result, by=1):
-        self.counter += by
-        return result
-
-    def trap(self, failure, what):
-        failure.trap(what)
-
-    def testConnectionDone(self):
-        """
-        C{TurtleProxyClientFactory} calls the father's completed
-        deferred after clientConnectionLost is called.
-        """
-        class Father(object):
-            def __init__(self):
-                self.completed = defer.Deferred()
-
-        f = Father()
-        f.completed.addCallback(self._incr)
-        t = proxy.TurtleProxyClientFactory("GET", "/", "1.0", {}, "", f)
-        t.clientConnectionLost(object(), failure.Failure(error.ConnectionDone()))
-        self.assertEquals(self.counter, 1)
-
-        ## f = Father()
-        ## t = proxy.TurtleProxyClientFactory("GET", "/", "1.0", {}, "", f)
-        ## t.clientConnectionLost(object(), failure.Failure(error.ConnectionLost()))
-        ## self.assertFailure(f.completed, error.ConnectionLost)
-        ## self.assertEquals(self.counter, 1)
-
-# This test cases were taken almost verbatim from Twisted Matrix
-# http://www.twistedmatrix.com
-class TurtleProxyRequestTestCase(unittest.TestCase):
-    """
-    Tests for L{TurtleProxyRequest}.
-    """
-
-    def _testProcess(self, uri, expectedURI, method="GET", data=""):
-        """
-        Build a request pointing at C{uri}, and check that a proxied request
-        is created, pointing a C{expectedURI}.
-        """
-        transport = StringTransportWithDisconnection()
-        channel = DummyChannel(transport)
-        reactor = FakeReactor()
-        proxy_factory = proxy.TurtleHTTPFactory({}, False, reactor)
-        channel.factory = proxy_factory
-        request = proxy.TurtleProxyRequest(channel, False, reactor)
-        request.gotLength(len(data))
-        request.handleContentChunk(data)
-        request.requestReceived(method, 'http://example.com%s' % (uri,),
-                                'HTTP/1.0')
-
-        self.assertEquals(len(reactor.connect), 1)
-        self.assertEquals(reactor.connect[0][0], "example.com")
-        self.assertEquals(reactor.connect[0][1], 80)
-
-        factory = reactor.connect[0][2]
-        self.assertIsInstance(factory, proxy.TurtleProxyClientFactory)
-        self.assertEquals(factory.command, method)
-        self.assertEquals(factory.version, 'HTTP/1.0')
-        self.assertEquals(factory.headers, {'host': 'example.com'})
-        self.assertEquals(factory.data, data)
-        self.assertEquals(factory.rest, expectedURI)
-        self.assertEquals(factory.father, request)
-
-
-    def test_process(self):
-        """
-        L{TurtleProxyRequest.process} should create a connection to the given server,
-        with a L{TurtleProxyClientFactory} as connection factory, with the correct
-        parameters:
-            - forward comment, version and data values
-            - update headers with the B{host} value
-            - remove the host from the URL
-            - pass the request as parent request
-        """
-        return self._testProcess("/foo/bar", "/foo/bar")
-
-
-    def test_processWithoutTrailingSlash(self):
-        """
-        If the incoming request doesn't contain a slash,
-        L{TurtleProxyRequest.process} should add one when instantiating
-        L{TurtleProxyClientFactory}.
-        """
-        return self._testProcess("", "/")
-
-
-    def test_processWithData(self):
-        """
-        L{TurtleProxyRequest.process} should be able to retrieve request body and
-        to forward it.
-        """
-        return self._testProcess(
-            "/foo/bar", "/foo/bar", "POST", "Some content")
-
-
-    def test_processWithPort(self):
-        """
-        Check that L{TurtleProxyRequest.process} correctly parse port in the incoming
-        URL, and create a outgoing connection with this port.
-        """
-        transport = StringTransportWithDisconnection()
-        channel = DummyChannel(transport)
-        reactor = FakeReactor()
-        proxy_factory = proxy.TurtleHTTPFactory({}, False, reactor)
-        channel.factory = proxy_factory
-
-        request = proxy.TurtleProxyRequest(channel, False, reactor)
-        request.gotLength(0)
-        request.requestReceived('GET', 'http://example.com:1234/foo/bar',
-                                'HTTP/1.0')
-
-        # That should create one connection, with the port parsed from the URL
-        self.assertEquals(len(reactor.connect), 1)
-        self.assertEquals(reactor.connect[0][0], "example.com")
-        self.assertEquals(reactor.connect[0][1], 1234)
-
-
-    def test_filtering(self):
-        """
-        Check that L{TurtleProxyRequest.process} filters urls that
-        it doesn't know.
-        """
-        class FakeThrottler(object):
-            called_run = False
-            called_run_asap = False
-            def run(self, fun, *args, **kwargs):
-                self.called_run = True
-                return defer.maybeDeferred(fun, *args, **kwargs)
-
-            def runasap(self, fun, *args, **kwargs):
-                self.called_run_asap = True
-                return defer.maybeDeferred(fun, *args, **kwargs)
-
-
-        transport = StringTransportWithDisconnection()
-        transport.protocol = LineSendingProtocol([], False)
-        channel = DummyChannel(transport)
-        reactor = FakeReactor()
-
-        throttler = FakeThrottler()
-        urlmap = {'delicious.com': throttler}
-        proxy_factory = proxy.TurtleHTTPFactory(urlmap, True, reactor)
-        channel.factory = proxy_factory
-
-        request = proxy.TurtleProxyRequest(channel, False, reactor)
-        request.gotLength(0)
-        request.requestReceived('GET', 'http://example.com:1234/foo/bar',
-                                'HTTP/1.0')
-
-        # That should create one connection, with the port parsed from the URL
-        self.assertEquals(len(reactor.connect), 0)
-
-
-        request = proxy.TurtleProxyRequest(channel, False, reactor)
-        request.gotLength(0)
-        request.requestReceived('GET', 'http://delicious.com:1234/foo/bar',
-                                'HTTP/1.0')
-
-        # That should create one connection, with the port parsed from the URL
-        self.assertEquals(len(reactor.connect), 1)
-        self.assertEquals(reactor.connect[-1][0], "delicious.com")
-        self.assertEquals(reactor.connect[-1][1], 1234)
-        self.assertEquals(throttler.called_run, True)
-        self.assertEquals(throttler.called_run_asap, False)
-
-
-
-
-        request = proxy.TurtleProxyRequest(channel, False, reactor)
-        request.gotLength(0)
-        request.requestHeaders.addRawHeader('x-priority', 'interactive')
-        request.requestReceived('GET', 'http://delicious.com:1234/foo/bar',
-                                'HTTP/1.0')
-
-        # That should create one connection, with the port parsed from the URL
-        self.assertEquals(len(reactor.connect), 2)
-        self.assertEquals(reactor.connect[-1][0], "delicious.com")
-        self.assertEquals(reactor.connect[-1][1], 1234)
-        self.assertEquals(throttler.called_run_asap, True)
-
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.