Source

proxytools / superproxy.py

Full commit
#!/usr/bin/python
if __name__ == '__main__':
    from gevent import monkey; monkey.patch_all()

import httplib
import urlparse
import logging
import base64

import webob
import gevent
from gevent.pool import Pool
from gevent.queue import Queue, Empty

log = logging.getLogger('superproxy')

USER_AGENT_RANDOM_VALUE = 'random'
#X-Superproxy-Worker-Timeout: 10
#X-Superproxy-Workers-Count: 1
#X-Superproxy-Require-Status: 200; 301; 302
#X-Superproxy-Require-Content: base64(str1) and base64(str2) or base64(str1) and base64(str3)
#X-Superproxy-Priority: 1
#X-Superproxy-Parameters: 'method=best; ip=234.234.267.87'
#X-Superproxy-Client: 'google-positions'
#
#response
#X-Superproxy-Proxy
#X-Superproxy-Meta

import browser
from proxylist import ProxyList

class RequireContentError(browser.ProxyError):
    pass

class RequireStatusError(browser.ProxyError):
    pass


class SuperProxyRequest(object):
    #Request default options
    timeout = 60
    worker_timeout = 15
    workers_count = 2
    require_status = [200, 301, 302]
    require_content = []
    priority = 1
    parameters = {}
    client = 'Unknown'

    def __init__(self, path, body, headers):

        self.path, self.body = path, body

        #Configuring options from headers
        for header, value in headers.items():
            if header.lower().startswith('x-superproxy-'):
                opt = header[13:].lower()
                try:
                    setattr(self, opt, getattr(self, 'parse_%s' % opt)(value))
                except AttributeError:
                    raise ValueError('Option %s not defined.' % opt)
                del headers[header]
        self.headers = headers

    parse_worker_timeout = staticmethod(int)
    parse_workers_count = staticmethod(int)
    parse_priority = staticmethod(int)
    parse_client = staticmethod(str)
    parse_require_status = staticmethod(lambda x: [int(v.strip()) for v
                                                   in x.split(';') if v.strip()])

    @staticmethod
    def parse_require_content(value):
        result = []
        condition = []
        for val in value.split():
            if val == 'or':
                result.append(condition)
                condition = []
            elif val != 'and':
                condition.append(base64.urlsafe_b64decode(val))
        if condition:
            result.append(condition)
        return result

    @staticmethod
    def parse_parameters(value):
        result = {}
        for value_ in value.strip().split(';'):
            key, val = value_.split('=')
            result[key.strip()] = val.strip()
        return result

    def validate_status(self, status):
        if not status in self.require_status:
            raise RequireStatusError('Status %s not in %s'
                                      % (status, self.require_status))

    def validate_content(self, content):
        if self.require_content:
            for condition in self.require_content:
                for value in condition:
                    if not value in content:
                        break
                else:
                    return
            raise RequireContentError('Required content not found')

    def __call__(self, pool, proxylist):
        queue = Queue()
        workers = []
        try:
            for _ in range(self.workers_count):
                # Blocks if pool is full, even if first worker is spawned
                # and already got result. To cancel spawning next worker
                # if pool is full and we already got result use code below:
                #if workers and not pool.free_count() and not self.queue.empty():
                #        break
                workers.append(pool.spawn(self.worker, queue, proxylist))

            resp, content, proxy = queue.get(timeout=self.timeout)
            #this raises gevent.queue.Empty on timeout
        finally:
            log.debug('Killing workers for %s' % self.path)
            for worker in workers:
                worker.kill()

        drop_headers = ('content-encoding', 'transfer-encoding', 'content-length')
        headers = [(k, v) for k, v in resp.headers.items() if k not in drop_headers]
        headers += [('X-Superproxy-Proxy', proxy.url),
                    ('X-Superproxy-Meta', '; '.join(['%s=%s' % (k,v) for k, v
                                                     in proxy.meta.items()])),
        ]
        return '%s %s' % (resp.code, resp.msg), headers, content

    def worker(self, queue, proxylist):
        while True:
            with proxylist.best_proxy_context() as proxy:
                log.debug('Trying %s using %s' % (self.path, proxy))
                resp = browser.urlopen(self.path, headers=dict(self.headers),
                                       proxy=proxy, gzip=True,
                                       timeout=self.worker_timeout)
                self.validate_status(resp.code)
                content = resp.read()
                self.validate_content(content)
                #print '---------'
                #print content
                #print '---------'
                log.debug('Response recieved for %s by %s' % (self.path, proxy))
                queue.put((resp, content, proxy))
                return


class SuperProxy(object):
    def __init__(self, timeout=60, pool_size=50, **kwargs):
        self.timeout = timeout
        self.proxylist = ProxyList(**kwargs)
        self.workers_pool = Pool(pool_size)
        self.requests = []

    def __call__(self, env, start_response):
        req = webob.Request(env)

        if req.path_info == '/':
            return self.statistics(req, start_response)

        if not req.path_info.startswith('http://'):
            return self.deny_request(req, start_response, '501 Not Implemented')

        return self.request(req, start_response)

    def deny_request(self, req, start_response, response='503 Service Unavailable'):
        start_response(response, [])
        return [response]

    def request(self, req, start_response):
        path = req.path_info
        if req.query_string:
            path += '?' + req.query_string

        headers = req.headers
        log.debug('Request from %s to %s', req.remote_addr, path)

        r = SuperProxyRequest(path, req.body, headers)
        self.requests.append(r)
        try:
            status_line, headers, content = r(self.workers_pool, self.proxylist)
        except Empty:
            return self.deny_request(req, start_response, '504 Gateway Timeout')
        finally:
            self.requests.remove(r)

        start_response(status_line, headers)
        return [content]

    def statistics(self, req, start_response):
        #TODO make this usable and use jinja2 template
        result = '<table><tr><td>url</td><td>in_use</td><td>failed</td><td>sucess_time</td><td>fail_time</td><td>meta</td></tr>'
        for p in self.proxylist.proxies:
            result += '<tr><td>%s</td> <td>%s</td><td>%s</td><td>%s</td><td>%s</td><td>%s<td/></tr>' % (p.url,
                '<b>%s</b>' % p.in_use if p.in_use else p.in_use,
                '<font color="red">%s</font>' % p.failed if p.failed > 0 else '<font color="green">%s</font>' % p.failed, p.success_time, p.fail_time, p.meta)
        result += '</table>'
        start_response('200 OK', [('Content-Type', 'text/html')])
        return ['<html>Cool statistics here: Free slots: %s/%s<br/>%s</html>' %
            (self.pool.free_count(), self.pool.size, result)
        ]


def main(interface='0.0.0.0', port=8088, **settings):
    from gevent.pywsgi import WSGIServer
    app = SuperProxy(**settings)
    server = WSGIServer(
        (interface, port),
        app,
    )
    log.info('Serving on %s:%s...' % (interface, port))
    try:
        server.serve_forever()
    finally:
        app.proxylist.save_status()

if __name__ == '__main__':
    import sys
    logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(name)s %(message)s', stream=sys.stdout)
    main(fetchers={'hideme_ru': {}, 'xroxy_com': {'end_page': 10}}, status_filename='proxylist_status')