Anonymous avatar Anonymous committed 646a063

importing from turtle because turtle is already a taken name for a module... sigh

Comments (0)

Files changed (18)

+Copyright (c) 2009
+adroll.com
+Valentino Volonghi
+
+Permission is hereby granted, free of charge, to any person obtaining
+a copy of this software and associated documentation files (the
+"Software"), to deal in the Software without restriction, including
+without limitation the rights to use, copy, modify, merge, publish,
+distribute, sublicense, and/or sell copies of the Software, and to
+permit persons to whom the Software is furnished to do so, subject to
+the following conditions:
+
+The above copyright notice and this permission notice shall be
+included in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+include *.txt
+include LICENSE
+include README
+include NEWS
+include examples/*
+include twisted/plugins/*
+Turtle is an HTTP proxy whose purpose is to throttle connections to
+specific hostnames to avoid breaking terms of usage of those API
+providers (like del.icio.us, technorati and so on).
+
+The proxy can be used either as a library embedded in another
+application or it can be started stand-alone using the provided twistd
+plugin.
+
+Here's an example to use turtle from inside another python application:
+
+{{{
+from twisted.internet import reactor
+from turtle import proxy, config
+
+s = """\
+# defaults is pulled out of the end results
+# and used to fill hostnames with missing
+# parameters
+defaults: &defaults
+    # calls that can be made inside the interval of time
+    calls: 1
+
+    # interval is specified in seconds
+    interval: 1
+
+    # the number of concurrent calls that we are allowed to
+    # have running at any given time.
+    concurrency: 10
+
+delicious.com:
+    <<: [*defaults]
+
+www.google.com:
+    calls: 5
+    interval: 1
+
+filter-rest: True
+port: 8080
+"""
+
+urlmap, rest, port = config.loadConfigFromString(s)
+f = proxy.TurtleHTTPFactory(urlmap, rest)
+reactor.listenTCP(port, f)
+reactor.run()
+}}}
+
+And here's an example of a command line to start it stand alone:
+
+{{{
+twistd --logfile=/var/log/turtle.log --pidfile=/var/run/turtle.pid turtle -c /some/path/config.yaml -p turtleOne
+}}}
+

examples/basic.py

+from twisted.internet import reactor
+from turtle import proxy, config
+
+s = """\
+# defaults is pulled out of the end results
+# and used to fill hostnames with missing
+# parameters
+defaults: &defaults
+    # calls that can be made inside the interval of time
+    calls: 1
+
+    # interval is specified in seconds
+    interval: 1
+
+    # the number of concurrent calls that we are allowed to
+    # have running at any given time.
+    concurrency: 10
+
+delicious.com:
+    <<: [*defaults]
+
+www.google.com:
+    calls: 5
+    interval: 1
+
+filter-rest: True
+port: 8080
+"""
+
+urlmap, rest, port = config.loadConfigFromString(s)
+f = proxy.TurtleHTTPFactory(urlmap, rest)
+reactor.listenTCP(port, f)
+reactor.run()

examples/basic_client.py

+import urllib2
+
+proxy_handler = urllib2.ProxyHandler({'http': 'http://localhost:8080/'})
+
+hp_opener = urllib2.build_opener(proxy_handler)
+hp_opener.addheaders = [('x-priority', 'interactive')]
+
+lp_opener = urllib2.build_opener(proxy_handler)
+
+def request_asap(url):
+    return hp_opener.open(url)
+
+def request(url):
+    return lp_opener.open(url)
+
+if __name__ == "__main__":
+    import sys
+    priority = 0
+    if len(sys.argv) == 3:
+        # There's a priority specified:
+        priority = int(sys.argv[2])
+
+    url = sys.argv[1]
+
+    if priority:
+        request_asap(url)
+    else:
+        request(url)

examples/default.yaml

+# defaults is pulled out of the end results
+# and used to fill hostnames with missing
+# parameters
+defaults: &defaults
+    # calls that can be made inside the interval of time
+    calls: 1
+
+    # interval is specified in seconds
+    interval: 1
+
+    # the number of concurrent calls that we are allowed to
+    # have running at any given time.
+    concurrency: 10
+
+delicious.com:
+    <<: [*defaults]
+
+www.google.com:
+    calls: 5
+    interval: 1
+
+filter-rest: True
+port: 8080
+[install]
+optimize = 1
+
+[aliases]
+release = egg_info -RDb ''
+#!/usr/bin/env python
+
+# Copyright (c) 2008-2009 Adroll.com, Valentino Volonghi.
+# See LICENSE for details.
+
+"""
+Distutils installer for turtle.
+"""
+
+try:
+    # Load setuptools, to build a specific source package
+    import setuptools
+except ImportError:
+    pass
+
+# For great justice, take off every zig.
+import sys, os, pprint, traceback
+
+import turtle
+version = turtle.__version__
+
+install_requires = ["Twisted>=8.0.1", "PyYAML>=3.0.8"]
+
+setup = setuptools.setup
+find_packages = setuptools.find_packages
+
+# Most of the following code is from Divmod Epsilon, (C) 2008 Divmod, Inc.
+# under MIT license, see http://divmod.org/trac/wiki/DivmodEpsilon
+# since I only need this to install turtle correctly with Twisted
+# Plugins I'd rather copy this few lines than actually add another
+# dependency. Also this version uses the setuptools setup() function.
+
+def pluginModules(moduleNames):
+    from twisted.python.reflect import namedAny
+    for moduleName in moduleNames:
+        try:
+            yield namedAny(moduleName)
+        except ImportError:
+            pass
+        except ValueError, ve:
+            if ve.args[0] != 'Empty module name':
+                traceback.print_exc()
+        except:
+            traceback.print_exc()
+
+def _regeneratePluginCache(pluginPackages):
+    ## print 'Regenerating cache with path: ',
+    ## pprint.pprint(sys.path)
+    from twisted import plugin
+    for pluginModule in pluginModules([
+        p + ".plugins" for p in pluginPackages]):
+        # Not just *some* zigs, mind you - *every* zig:
+        #print 'Full plugin list for %r: ' % (pluginModule.__name__)
+        list(plugin.getPlugins(plugin.IPlugin, pluginModule))
+
+def regeneratePluginCache(dist, pluginPackages):
+    if 'install' in dist.commands:
+        sys.path.insert(0, os.path.abspath(dist.command_obj['install'].install_lib))
+        _regeneratePluginCache(pluginPackages)
+
+def autosetup(**kw):
+    packages = []
+    datafiles = {}
+    pluginPackages = []
+
+    for (dirpath, dirnames, filenames) in os.walk(os.curdir):
+        dirnames[:] = [p for p in dirnames if not p.startswith('.')]
+        pkgName = dirpath[2:].replace('/', '.')
+        if '__init__.py' in filenames:
+            # The current directory is a Python package
+            packages.append(pkgName)
+        elif 'plugins' in dirnames:
+            # The current directory is for the Twisted plugin system
+            pluginPackages.append(pkgName)
+            packages.append(pkgName)
+
+    for package in packages:
+        if '.' in package:
+            continue
+        D = datafiles[package] = []
+        #print 'Files in package %r:' % (package,)
+        #pprint.pprint(os.listdir(package))
+        for (dirpath, dirnames, filenames) in os.walk(package):
+            dirnames[:] = [p for p in dirnames if not p.startswith('.')]
+            for filename in filenames:
+                if filename == 'dropin.cache':
+                    continue
+                if (os.path.splitext(filename)[1] not in ('.py', '.pyc', '.pyo')
+                    or '__init__.py' not in filenames):
+                    D.append(os.path.join(dirpath[len(package)+1:], filename))
+    autoresult = {
+        'packages': packages,
+        'package_data': datafiles,
+        }
+    #pprint.pprint(autoresult, indent=4)
+    assert 'packages' not in kw
+    assert 'package_data' not in kw
+    kw.update(autoresult)
+    distobj = setup(**kw)
+    regeneratePluginCache(distobj, pluginPackages)
+    return distobj
+
+description = """\
+Turtle is an HTTP proxy whose purpose is to throttle connections to
+specific hostnames to avoid breaking terms of usage of those API
+providers (like del.icio.us, technorati and so on).
+"""
+
+autosetup(
+    name = "turtle",
+    author = "Valentino Volonghi",
+    author_email = "valentino@adroll.com",
+    url = "http://adroll.com/labs",
+    description = description,
+    license = "MIT License",
+    version=version,
+    install_requires=install_requires,
+    classifiers = [
+        'Development Status :: 4 - Beta',
+        'Environment :: Web Environment',
+        'Intended Audience :: Developers',
+        'License :: OSI Approved :: MIT License',
+        'Natural Language :: English',
+        'Programming Language :: Python',
+        'Topic :: Internet',
+    ],
+    include_package_data=True,
+    zip_safe=False
+)

turtle/__init__.py

+__version__ = '0.0.3-dev'

Binary file added.

+from yaml import load
+
+def loadConfigFromFile(filepath):
+    """
+    Load the configuration options from a filepath
+
+    @param filepath: The filepath to a configuration file.
+    @type filepath: C{twisted.python.filepath.FilePath}
+    """
+    return loadConfigFromString(filepath.getContent())
+
+def loadConfigFromString(s):
+    """
+    Load the configuration options from a string.
+
+    @param s: a C{str} that contains the yaml formatted
+                    configuration
+    @type s: yaml formatted C{str}
+
+    @returns: a tuple of hostname and L{engine.ThrottlingDeferred}s
+                and the default behavior for unknown urls.
+    """
+    from turtle import engine
+
+    loaded = load(s)
+
+    # Remove the filtering param
+    rest = loaded.pop('filter-rest', True)
+    port = loaded.pop('port', 8080)
+    # Remove the defaults to use them as
+    # a base to work on.
+    defaults = loaded.pop('defaults', {})
+    # If nothing is in the config file, this is the default
+    defaults.update({'calls': 1, 'interval': 1, 'concurrency': 10})
+
+    urlmapping = {}
+    for host, kwargs in loaded.iteritems():
+        # Fill an empty dictionary with the defaults
+        # And then override the defaults with the one
+        # in the current block so that we use them
+        kw = {}
+        kw.update(defaults)
+        kw.update(kwargs)
+        urlmapping[host] = engine.ThrottlingDeferred(**kw)
+
+    return urlmapping, rest, port
+
+from twisted.internet import defer, task
+
+class _ConcurrencyPrimitive(object):
+    _execute = defer.maybeDeferred
+
+    def __init__(self):
+        self.waiting = []
+
+    def _releaseAndReturn(self, r):
+        self.release()
+        return r
+
+    def run(*args, **kwargs):
+        """Acquire, run, release.
+
+        This function takes a callable as its first argument and any
+        number of other positional and keyword arguments.  When the
+        lock or semaphore is acquired, the callable will be invoked
+        with those arguments.
+
+        The callable may return a Deferred; if it does, the lock or
+        semaphore won't be released until that Deferred fires.
+
+        @return: Deferred of function result.
+        """
+        if len(args) < 2:
+            if not args:
+                raise TypeError("run() takes at least 2 arguments, none given.")
+            raise TypeError("%s.run() takes at least 2 arguments, 1 given" % (
+                args[0].__class__.__name__,))
+        self, f = args[:2]
+        args = args[2:]
+
+        def execute(ignoredResult):
+            d = self._execute(f, *args, **kwargs)
+            d.addBoth(self._releaseAndReturn)
+            return d
+
+        d = self.acquire(kwargs.pop('_hpriority', False))
+        d.addCallback(execute)
+        return d
+
+    def runasap(*args, **kwargs):
+        """Acquire, run, release and put in the front of the waiting queue
+
+        @return: Deferred of function result.
+        """
+        kwargs['_hpriority'] = True
+        return _ConcurrencyPrimitive.run(*args, **kwargs)
+
+class ThrottlingDeferred(_ConcurrencyPrimitive):
+    def __init__(self, concurrency, calls, interval):
+        """
+        Throttling deferred that considers both the concurrency
+        requirements and the frequency, over time, of calls that
+        you are allowed to make. It's clear however that if the
+        rate of calls is higher than the tokens there will be
+        a queue, and the queue can grow indefinitely if calls don't
+        return quickly enough. More specifically: if T(f) is the
+        time it takes to execute a call, and this time is formed
+        by Ts(f) and Tp(f) [serial time and parallelizable time]:
+
+            Ts(f)*calls + Tp(f)*(calls/tokens) <= interval
+
+        If this is not true then the ingress could be too high
+        and causing an ever-increasing queue.
+
+        @param concurrency: The maximum number of concurrent
+                            calls.
+        @type concurrency: C{int}
+
+        @param calls: Represents the number of calls that
+                can be made every C{interval}
+        @type calls: C{int}
+
+        @param interval: Represents the time between a
+                C{calls} number of calls
+
+        NOTE: Currently it's not a requirement but if distributed
+                usage of this deferred was a necessity, the points
+                and current concurrency levels should be stored
+                somewhere else and updated every time they are
+                checked (there would also be race conditions and
+                so on).
+        """
+        _ConcurrencyPrimitive.__init__(self)
+
+        self._sem = defer.DeferredSemaphore(concurrency)
+        self._execute = self._sem.run
+
+        self.calls = calls
+        self.interval = interval
+        self.points = calls
+
+        self._resetLoop = task.LoopingCall(self._reset)
+        self._resetLoop.start(interval, now=False)
+
+    def _reset(self):
+        self.points = self.calls
+        self.release()
+
+    def acquire(self, priority=False):
+        """Attempt to acquire the token.
+
+        @param priority: Defines an high priority call that should
+                            either be executed immediately or scheduled
+                            as the immediate next one.
+        @type priority: C{bool}
+
+        @return: a Deferred which fires on token acquisition.
+        """
+        assert self.points >= 0, "Internal inconsistency??  points should never be negative"
+
+        d = defer.Deferred()
+        if not self.points:
+            if priority:
+                # Think about a better data structure for this
+                self.waiting.insert(0, d)
+            else:
+                self.waiting.append(d)
+        else:
+            self.points = self.points - 1
+            d.callback(self)
+        return d
+
+    def release(self):
+        """Release the token.
+
+        Should be called by whoever did the acquire() when the shared
+        resource is free.
+        """
+        if self.points > 0 and self.waiting:
+            self.points = self.points - 1
+            d = self.waiting.pop(0)
+            d.callback(self)
+
+import urlparse
+
+from twisted.internet import reactor, defer, error
+from twisted.web import proxy, http
+from twisted.python import log
+
+class TurtleProxyClientFactory(proxy.ProxyClientFactory):
+    """
+    Redefine the L{proxy.ProxyClientFactory} in order to trigger
+    the L{TurtleProxyRequest.completed} deferred once the connection
+    has been closed.
+    """
+    def clientConnectionLost(self, connector, reason):
+        proxy.ProxyClientFactory.clientConnectionLost(self, connector, reason)
+        if reason.trap(error.ConnectionDone):
+            self.father.completed.callback(None)
+        else:
+            self.father.completed.errback(reason)
+
+class TurtleProxyRequest(proxy.ProxyRequest):
+    """
+    Redefine L{proxy.ProxyRequest} to add support for upcalling the
+    factory to decide whether to forward or filter a specific request.
+    If the request is blocked then return an error page to the client.
+
+    Also introduce a deferred that is fired once the forwarded host
+    request has completed, so that we can unlock the caller that could
+    be waiting for that to be over.
+
+    @ivar completed: a L{defer.Deferred} that is triggered once the
+                        outgoing request has completed.
+    """
+
+    protocols = {'http': TurtleProxyClientFactory}
+
+    def __init__(self, channel, queued, reactor=reactor):
+        proxy.ProxyRequest.__init__(self, channel, queued, reactor)
+        self.completed = defer.Deferred()
+
+    def process(self):
+        parsed = urlparse.urlparse(self.uri)
+        protocol = parsed[0]
+        host = parsed[1]
+        port = self.ports[protocol]
+        if ':' in host:
+            host, port = host.split(':')
+            port = int(port)
+        rest = urlparse.urlunparse(('', '') + parsed[2:])
+        if not rest:
+            rest = rest + '/'
+        class_ = self.protocols[protocol]
+        headers = self.getAllHeaders().copy()
+        priority = headers.pop('x-priority', False)
+
+        if 'host' not in headers:
+            headers['host'] = host
+        self.content.seek(0, 0)
+        s = self.content.read()
+        clientFactory = class_(self.method, rest, self.clientproto,
+                               headers, s, self)
+
+        processed = self.channel.factory.process(host, port, clientFactory,
+                                                 priority, self.completed)
+        if not processed:
+            self.sendError('''<H1>%s domain is filtered</H1>''' % (host,))
+
+        else:
+            processed.addErrback(self.sendError)
+
+    def sendError(self, line):
+        self.transport.write("HTTP/1.0 501 Gateway error\r\n")
+        self.transport.write("Content-Type: text/html\r\n")
+        self.transport.write("\r\n")
+        self.transport.write(str(line))
+        self.transport.loseConnection()
+
+
+class TurtleProxy(proxy.Proxy):
+    requestFactory = TurtleProxyRequest
+
+class TurtleHTTPFactory(http.HTTPFactory):
+    """
+    Override the default HTTPFactory to add support for a shared (among
+    all incoming requests) mapping of urls to request throttlers.
+
+    There is one factory for all incoming connections so shared state has
+    to stay here.
+
+    @ivar urlmapping: A C{dict} that maps hostnames to instances of
+                            L{turtle.engine.ThrottlingDeferred}.
+
+    @ivar filter_rest: Define the behavior of unknown urls. If C{True}
+                        block those requests.
+    """
+
+    protocol = TurtleProxy
+
+    def __init__(self, urlmapping={}, filter_rest=False, reactor=reactor):
+        http.HTTPFactory.__init__(self)
+        self.urlmapping = urlmapping
+        self.filter_rest = filter_rest
+        self.reactor = reactor
+
+    def process(self, host, port, clientFactory, priority, completed):
+        """
+        Process a request from the Proxy object. Check if it's in the
+        mapping for urls so that we can use the limits specified in the
+        configuration file. If there is not then check if we can forward
+        unknown hosts, and if we cannot block them. If priority is high
+        this request is scheduled in front of the queue of waiting requests
+        and will use the next slot available.
+        """
+        if host in self.urlmapping:
+            throttler = self.urlmapping[host]
+            if priority:
+                fun = throttler.runasap
+            else:
+                fun = throttler.run
+            return fun(self.makeRequest, host, port, clientFactory, completed)
+
+        else:
+            if self.filter_rest:
+                log.msg("Filtering %s:%s..." % (host, port))
+                return False
+            else:
+                return self.makeRequest(host, port, clientFactory, completed)
+
+    def makeRequest(self, host, port, clientFactory, completed):
+        """
+        Abstract the function away so that we can use it both directly
+        for unfiltered calls and through the throttling engine or other
+        similar calls.
+        """
+        log.msg("Proxying %s:%s..." % (host, port))
+        self.reactor.connectTCP(host, port, clientFactory)
+        return completed

turtle/service.py

+from twisted.application import service
+from twisted.python import log
+
+try:
+    # windows doesn't support syslog; so ok to pass
+    import syslog
+
+    class SyslogObserver:
+        def __init__(self, prefix):
+            self.prefix = prefix
+            syslog.openlog(prefix, 0, syslog.LOG_LOCAL1)
+
+        def emit(self, eventDict):
+            edm = eventDict['message']
+            if not edm:
+                if eventDict['isError'] and eventDict.has_key('failure'):
+                    text = eventDict['failure'].getTraceback()
+                elif eventDict.has_key('format'):
+                    text = eventDict['format'] % eventDict
+                else:
+                    # we don't know how to log this
+                    return
+            else:
+                text = ' '.join(map(str, edm))
+
+            lines = text.split('\n')
+            while lines[-1:] == ['']:
+                lines.pop()
+
+            firstLine = 1
+            for line in lines:
+                if firstLine:
+                    firstLine=0
+                else:
+                    line = '\t%s' % line
+                syslog.syslog(syslog.LOG_INFO, '[%s] %s' % (self.prefix, line))
+
+    def startLogging(prefix='Twisted', setStdout=1):
+        obs = SyslogObserver(prefix)
+        log.startLoggingWithObserver(obs.emit, setStdout=setStdout)
+except:
+    syslog = None
+
+
+
+def makeService(options):
+    """
+    Create the service for the application
+    """
+    return TurtleService(options)
+
+
+
+class TurtleService(service.Service):
+    """
+    I am the service responsible for starting up a TurtleProxy
+    instance with a command line given configuration file.
+    """
+    def __init__(self, options):
+        self.config = options['config']
+        self.syslog_prefix = options['with_syslog_prefix']
+
+    def startService(self):
+        """
+        Before reactor.run() is called we setup the system.
+        """
+        service.Service.startService(self)
+        try:
+            if self.syslog_prefix and syslog:
+                startLogging(self.syslog_prefix)
+
+            from turtle import proxy, config
+            urlmap, filter_rest, port = config.loadConfigFromFile(self.config)
+            log.msg('Initializing turtle...')
+            log.msg("Domains: %s" % (", ".join(urlmap.keys())))
+            desc = "Allowing"
+            if filter_rest:
+                desc = "Filtering"
+            log.msg("%s unknown domains" % (desc,))
+            f = proxy.TurtleHTTPFactory(urlmap, filter_rest)
+            from twisted.internet import reactor
+            reactor.listenTCP(port, f)
+        except:
+            import traceback
+            print traceback.format_exc()
+            raise
Add a comment to this file

turtle/test/__init__.py

Empty file added.

turtle/test/test_engine.py

+from twisted.internet import defer
+from twisted.trial import unittest
+from turtle import engine
+
+class TestEngine(unittest.TestCase):
+
+    def _incr(self, result, by=1):
+        self.counter += by
+
+    def setUp(self):
+        self.counter = 0
+
+    def testThrottler(self):
+        N = 13
+        thr = engine.ThrottlingDeferred(3*N, N, 200)
+        self.assert_(thr._resetLoop.running)
+        thr._resetLoop.stop()
+        self.assert_(not thr._resetLoop.running)
+
+        controlDeferred = defer.Deferred()
+        def helper(self, arg):
+            self.arg = arg
+            return controlDeferred
+
+        results = []
+        uniqueObject = object()
+        resultDeferred = thr.run(helper, self=self, arg=uniqueObject)
+        resultDeferred.addCallback(results.append)
+        resultDeferred.addCallback(self._incr)
+        self.assertEquals(results, [])
+        self.assertEquals(self.arg, uniqueObject)
+        controlDeferred.callback(None)
+        self.assertEquals(results.pop(), None)
+        self.assertEquals(self.counter, 1)
+
+        thr._reset()
+        self.counter = 0
+        for i in range(1, 1 + N):
+            thr.acquire().addCallback(self._incr)
+            self.assertEquals(self.counter, i)
+
+        thr.acquire().addCallback(self._incr)
+        self.assertEquals(self.counter, N)
+        self.assertEquals(thr.points, 0)
+
+        for i in range(1, N):
+            thr.acquire().addCallback(self._incr)
+            self.assertEquals(self.counter, N)
+
+        # Scheduled a prioritized call for last and
+        # later we'll check that this runs through
+        # before everything else.
+        thr.acquire(True).addCallback(self._incr, 10)
+
+        # Even if I release a lot, nothing happens
+        # until I have enough points to unlock the
+        # situation in the throttler
+        for i in thr.waiting:
+            self.assertEquals(thr.points, 0)
+            thr.release()
+        self.assertEquals(self.counter, N)
+        self.assertEquals(thr.points, 0)
+
+        # At this point I have: 14 waiting calls (N + 1)
+        # and 0 points. After I reset I'll have 13 waiting
+        # calls and 12 points.
+        thr._reset()
+        self.assertEquals(self.counter, N+10)
+        self.assertEquals(thr.points, 12)
+
+        # As I said 13 waiting and 12 points:
+        for i in range(len(thr.waiting)):
+            thr.release()
+
+        self.assertEquals(thr.points, 0)
+        self.assertEquals(self.counter, N*2+9)
+
+        # Now there's one waiting and 0 points
+        thr._reset()
+
+        # The last reset also called a release
+        # 0 waiting and 12 points
+        self.assertEquals(self.counter, 2*N+10)
+        self.assertEquals(thr.points, N-1)

turtle/test/test_proxy.py

+from twisted.test.proto_helpers import StringTransportWithDisconnection, LineSendingProtocol
+
+from twisted.python import failure
+from twisted.internet import defer, error
+from twisted.trial import unittest
+from turtle import proxy
+
+from twisted.web.test.test_proxy import FakeReactor, DummyParent, DummyChannel
+
+class TestEngine(unittest.TestCase):
+
+    def setUp(self):
+        self.counter = 0
+
+    def _incr(self, result, by=1):
+        self.counter += by
+        return result
+
+    def trap(self, failure, what):
+        failure.trap(what)
+
+    def testConnectionDone(self):
+        """
+        C{TurtleProxyClientFactory} calls the father's completed
+        deferred after clientConnectionLost is called.
+        """
+        class Father(object):
+            def __init__(self):
+                self.completed = defer.Deferred()
+
+        f = Father()
+        f.completed.addCallback(self._incr)
+        t = proxy.TurtleProxyClientFactory("GET", "/", "1.0", {}, "", f)
+        t.clientConnectionLost(object(), failure.Failure(error.ConnectionDone()))
+        self.assertEquals(self.counter, 1)
+
+        ## f = Father()
+        ## t = proxy.TurtleProxyClientFactory("GET", "/", "1.0", {}, "", f)
+        ## t.clientConnectionLost(object(), failure.Failure(error.ConnectionLost()))
+        ## self.assertFailure(f.completed, error.ConnectionLost)
+        ## self.assertEquals(self.counter, 1)
+
+# This test cases were taken almost verbatim from Twisted Matrix
+# http://www.twistedmatrix.com
+class TurtleProxyRequestTestCase(unittest.TestCase):
+    """
+    Tests for L{TurtleProxyRequest}.
+    """
+
+    def _testProcess(self, uri, expectedURI, method="GET", data=""):
+        """
+        Build a request pointing at C{uri}, and check that a proxied request
+        is created, pointing a C{expectedURI}.
+        """
+        transport = StringTransportWithDisconnection()
+        channel = DummyChannel(transport)
+        reactor = FakeReactor()
+        proxy_factory = proxy.TurtleHTTPFactory({}, False, reactor)
+        channel.factory = proxy_factory
+        request = proxy.TurtleProxyRequest(channel, False, reactor)
+        request.gotLength(len(data))
+        request.handleContentChunk(data)
+        request.requestReceived(method, 'http://example.com%s' % (uri,),
+                                'HTTP/1.0')
+
+        self.assertEquals(len(reactor.connect), 1)
+        self.assertEquals(reactor.connect[0][0], "example.com")
+        self.assertEquals(reactor.connect[0][1], 80)
+
+        factory = reactor.connect[0][2]
+        self.assertIsInstance(factory, proxy.TurtleProxyClientFactory)
+        self.assertEquals(factory.command, method)
+        self.assertEquals(factory.version, 'HTTP/1.0')
+        self.assertEquals(factory.headers, {'host': 'example.com'})
+        self.assertEquals(factory.data, data)
+        self.assertEquals(factory.rest, expectedURI)
+        self.assertEquals(factory.father, request)
+
+
+    def test_process(self):
+        """
+        L{TurtleProxyRequest.process} should create a connection to the given server,
+        with a L{TurtleProxyClientFactory} as connection factory, with the correct
+        parameters:
+            - forward comment, version and data values
+            - update headers with the B{host} value
+            - remove the host from the URL
+            - pass the request as parent request
+        """
+        return self._testProcess("/foo/bar", "/foo/bar")
+
+
+    def test_processWithoutTrailingSlash(self):
+        """
+        If the incoming request doesn't contain a slash,
+        L{TurtleProxyRequest.process} should add one when instantiating
+        L{TurtleProxyClientFactory}.
+        """
+        return self._testProcess("", "/")
+
+
+    def test_processWithData(self):
+        """
+        L{TurtleProxyRequest.process} should be able to retrieve request body and
+        to forward it.
+        """
+        return self._testProcess(
+            "/foo/bar", "/foo/bar", "POST", "Some content")
+
+
+    def test_processWithPort(self):
+        """
+        Check that L{TurtleProxyRequest.process} correctly parse port in the incoming
+        URL, and create a outgoing connection with this port.
+        """
+        transport = StringTransportWithDisconnection()
+        channel = DummyChannel(transport)
+        reactor = FakeReactor()
+        proxy_factory = proxy.TurtleHTTPFactory({}, False, reactor)
+        channel.factory = proxy_factory
+
+        request = proxy.TurtleProxyRequest(channel, False, reactor)
+        request.gotLength(0)
+        request.requestReceived('GET', 'http://example.com:1234/foo/bar',
+                                'HTTP/1.0')
+
+        # That should create one connection, with the port parsed from the URL
+        self.assertEquals(len(reactor.connect), 1)
+        self.assertEquals(reactor.connect[0][0], "example.com")
+        self.assertEquals(reactor.connect[0][1], 1234)
+
+
+    def test_filtering(self):
+        """
+        Check that L{TurtleProxyRequest.process} filters urls that
+        it doesn't know.
+        """
+        class FakeThrottler(object):
+            called_run = False
+            called_run_asap = False
+            def run(self, fun, *args, **kwargs):
+                self.called_run = True
+                return defer.maybeDeferred(fun, *args, **kwargs)
+
+            def runasap(self, fun, *args, **kwargs):
+                self.called_run_asap = True
+                return defer.maybeDeferred(fun, *args, **kwargs)
+
+
+        transport = StringTransportWithDisconnection()
+        transport.protocol = LineSendingProtocol([], False)
+        channel = DummyChannel(transport)
+        reactor = FakeReactor()
+
+        throttler = FakeThrottler()
+        urlmap = {'delicious.com': throttler}
+        proxy_factory = proxy.TurtleHTTPFactory(urlmap, True, reactor)
+        channel.factory = proxy_factory
+
+        request = proxy.TurtleProxyRequest(channel, False, reactor)
+        request.gotLength(0)
+        request.requestReceived('GET', 'http://example.com:1234/foo/bar',
+                                'HTTP/1.0')
+
+        # That should create one connection, with the port parsed from the URL
+        self.assertEquals(len(reactor.connect), 0)
+
+
+        request = proxy.TurtleProxyRequest(channel, False, reactor)
+        request.gotLength(0)
+        request.requestReceived('GET', 'http://delicious.com:1234/foo/bar',
+                                'HTTP/1.0')
+
+        # That should create one connection, with the port parsed from the URL
+        self.assertEquals(len(reactor.connect), 1)
+        self.assertEquals(reactor.connect[-1][0], "delicious.com")
+        self.assertEquals(reactor.connect[-1][1], 1234)
+        self.assertEquals(throttler.called_run, True)
+        self.assertEquals(throttler.called_run_asap, False)
+
+
+
+
+        request = proxy.TurtleProxyRequest(channel, False, reactor)
+        request.gotLength(0)
+        request.requestHeaders.addRawHeader('x-priority', 'interactive')
+        request.requestReceived('GET', 'http://delicious.com:1234/foo/bar',
+                                'HTTP/1.0')
+
+        # That should create one connection, with the port parsed from the URL
+        self.assertEquals(len(reactor.connect), 2)
+        self.assertEquals(reactor.connect[-1][0], "delicious.com")
+        self.assertEquals(reactor.connect[-1][1], 1234)
+        self.assertEquals(throttler.called_run_asap, True)
+

twisted/plugins/turtle_plugin.py

+"""
+Turtle plugin for Twisted.
+"""
+from zope.interface import classProvides
+from twisted.plugin import IPlugin
+from twisted.python.usage import Options, UsageError
+from twisted.python import filepath
+from twisted.application.service import IServiceMaker
+
+class TurtlePlugin(object):
+    """
+    This plugin provides ways to throttle web request through
+    a proxy server that can be configured on a hostname basis.
+    """
+    classProvides(IPlugin, IServiceMaker)
+
+    tapname = "turtle"
+    description = "Throttling proxy a given set of destination hostnames"
+
+    class options(Options):
+        optParameters = [
+            ["with_syslog_prefix", "p", None, "Activate syslog and give it the specified prefix (default will not use syslog)"],
+            ["config", "c", None, "The yaml configuration file that should be used"],
+        ]
+
+        def postOptions(self):
+            """
+            Check and finalize the value of the arguments.
+            """
+            if self['config'] is None:
+                raise UsageError("Must specify a config file")
+            fp = filepath.FilePath(self['config'])
+            if not fp.exists():
+                raise UsageError("%s doesn't exist." % (fp.path,))
+
+            self['config'] = fp
+
+    @classmethod
+    def makeService(cls, options):
+        """
+        Create an L{IService} for the parameters and return it
+        """
+        from turtle import service
+        return service.makeService(options)
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.