1. Victor Gavro
  2. proxytools

Commits

Victor Gavro  committed 16cb9c0

created package for easy use with pip/virtualenv

  • Participants
  • Parent commits 8290f5c
  • Branches default

Comments (0)

Files changed (29)

File __init__.py

-from proxytools.proxylist import ProxyList

File browser.py

-import urllib, urllib2, urlparse, httplib
-import socket
-import os.path
-import random
-from datetime import datetime
-try:
-    from cStringIO import StringIO
-except ImportError:
-    from StringIO import StringIO
-import gzip
-
-
-HTTPError, URLError, SocketError, BadStatusLine, IncompleteRead = urllib2.HTTPError, urllib2.URLError, socket.error, httplib.BadStatusLine, httplib.IncompleteRead
-
-class ProxyError(Exception):
-    pass
-
-exceptions = (HTTPError, URLError, SocketError, BadStatusLine, IncompleteRead, ProxyError)
-
-user_agents = [row.strip() for row in open(os.path.join(os.path.dirname(__file__), 'user-agents.txt'), 'r').read().split('\n') if row.strip()]
-
-def get_random_user_agent():
-    return random.choice(user_agents)
-
-DEFAULT_HEADERS = {
-    'User-Agent': get_random_user_agent,
-    'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
-    'Accept-Language': 'en-us,en;q=0.5',
-}
-
-def urlencode(data):
-    '''
-    this is small fix for dictionaries that have tuples as their values,
-    to make things more simple
-    '''
-    if hasattr(data, 'items'):
-        query = []
-        for key, value in data.items():
-            if hasattr(value, '__iter__'):
-                for val in value:
-                    query.append((key, val))
-            else:
-                query.append((key, value))
-        return urllib.urlencode(query)
-    return urllib.urlencode(data)
-
-#from http://bugs.python.org/issue9500
-from gzip import GzipFile
-from StringIO import StringIO
-import zlib
-class ContentEncodingHandler(urllib2.BaseHandler):
-  """A handler to add gzip capabilities to urllib2 requests """
-
-  def http_request(self, req):
-    req.add_header("Accept-Encoding", "gzip, deflate")
-    return req
-
-  def http_response(self, req, resp):
-    old_resp = resp
-    if resp.headers.get("content-encoding") == "gzip":
-        gz = GzipFile(
-                    fileobj=StringIO(resp.read()),
-                    mode="r"
-                  )
-        resp = urllib2.addinfourl(gz, old_resp.headers, old_resp.url, old_resp.code)
-        resp.msg = old_resp.msg
-    if resp.headers.get("content-encoding") == "deflate":
-        content = resp.read()
-        gz = StringIO( self.deflate(resp.read()) )
-        resp = urllib2.addinfourl(gz, old_resp.headers, old_resp.url, old_resp.code)
-        resp.msg = old_resp.msg
-    return resp
-
-    @staticmethod
-    def deflate(data):
-    # zlib only provides the zlib compress format, not the deflate format;
-        try:               # so on top of all there's this workaround:
-            return zlib.decompress(data, -zlib.MAX_WBITS)
-        except zlib.error:
-            return zlib.decompress(data)
-
-def urlopen(url, data=None, headers={}, proxy=None, gzip=False, handlers=[], timeout=20, http_debuglevel=0):
-    '''
-    Helper function to browse the web using python :-)
-    Note that using gzip=True all page internally would be readed.
-    '''
-    handlers = handlers[:]
-    if http_debuglevel:
-        handlers.insert(0, urllib2.HTTPHandler(debuglevel=http_debuglevel))
-    headers = headers.copy()
-    if proxy:
-        parsed_proxy = urlparse.urlsplit(str(proxy))
-        handlers.append(urllib2.ProxyHandler({parsed_proxy.scheme: parsed_proxy.netloc}))
-    if gzip:
-        handlers.append(ContentEncodingHandler)
-    opener = urllib2.build_opener(*handlers)
-
-    if data:
-        data = urlencode(data)
-
-    for header, default in DEFAULT_HEADERS.items():
-        if not headers.has_key(header):
-            if callable(default):
-                default = default()
-            elif hasattr(default, '__iter__'):
-                default = random.choice(default)
-            headers[header] = default
-
-    request = urllib2.Request(url, data, headers)
-    return opener.open(request, timeout=timeout)
-
-if __name__ == '__main__':
-    print 'content size: %s' % len(urlopen('http://google.com/', gzip=True).read())

File fetchers/__init__.py

Empty file removed.

File fetchers/hideme_ru.py

-from utils import async_popen_communicate
-import re
-from lxml import html
-
-#note - you'll need gocr and imagemagick packages
-
-URL = 'http://hideme.ru/proxy-list/'
-
-def get_country_data(doc):
-    for o in doc.cssselect('select#country option'):
-        yield o.get('value'), int(re.search('(\d+)', o.text_content()).groups()[0])
-
-def get_proxy_params(doc):
-    for tr in doc.cssselect('div#tgr table.pl tr'):
-        tds = tr.cssselect('td')
-        if not tds:
-            #this is header
-            continue
-        ip = tds[0].text_content()
-        port_img_url = tds[1].cssselect('img')[0].get('src')
-        protocols = [p.strip().lower() for p in tds[5].text_content().split(',') if p.strip()]
-        yield ip, port_img_url, protocols
-
-def get_workers(fetcher, **kwargs):
-    doc = html.document_fromstring(fetcher.urlopen(URL))
-    for proxy_params in get_proxy_params(doc):
-        yield worker, (fetcher, proxy_params)
-    for code, count in get_country_data(doc):
-        if count >= 10:
-            #not wasting time
-            #print 'parsing %s, %s' % (code, count)
-            doc = html.document_fromstring(fetcher.urlopen(URL + '?country=%s' % code))
-            for proxy_params in get_proxy_params(doc):
-                yield worker, (fetcher, proxy_params)
-
-def worker(fetcher, proxy_params):
-    ip, image_url, protocols = proxy_params
-    port = async_popen_communicate('convert - pbm:- | gocr -C 0-9 -', fetcher.urlopen('http://hideme.ru%s' % image_url), shell=True).strip().replace(' ', '')
-    if not re.match('^\d+$', port):
-        return
-    yield ('%s://%s:%s' % (protocols[0], ip, port), {})

File fetchers/hidemyass_com.py

-from utils import IP_REGEXP
-import re
-
-def get_workers(fetcher, start_page=1, end_page=33):
-    for page_num in range(start_page, end_page+1):
-        yield (worker, (fetcher, page_num))
-
-def worker(fetcher, page_num):
-    for ip, port, country in re.findall('>(%s)<\/span><\/td>\s*<td>\s*(\d+)<\/td>\s*<td rel=\"(\w{2})\">' % IP_REGEXP, fetcher.urlopen('http://hidemyass.com/proxy-list/%d' % page_num), re.DOTALL):
-        yield ('http://%s:%s' % (ip, port), {'country': country.upper()})

File fetchers/proxyhttp_net.py

-from lxml import html
-
-def get_workers(fetcher, start_page=1, end_page=9):
-    if 'http' in fetcher.allowed_protocols:
-        for page_num in range(start_page, end_page+1):
-            yield (worker, (fetcher, page_num))
-
-def worker(fetcher, page_num):
-    doc = html.fromstring(fetcher.urlread('http://proxyhttp.net/free-list/anonymous-server-hide-ip-address/%d' % page_num))
-    code = doc.cssselect('div#proxylist')[0].getnext().getnext().text.split('\n')[2]
-    #content = 
-    #port_modifier = int(findall('String\.fromCharCode\(([\-\+]{1}\d+)\+parseInt\(a\[i\]\)\)',content)[0])
-    #for ip, port_cripted in findall('<td class=\"t_ip\">(\d+\.\d+\.\d+\.\d+)<\/td>\s*<td class=\"t_port\">([\d\,]+)<\/td>',content):
-    #    port = ''.join([chr(int(x) + port_modifier) for x in port_cripted.split(',')])
-    #    yield ('http://%s:%s' % (ip, port), {})

File fetchers/samair_ru.py

-from utils import IP_REGEXP
-import re
-
-REGEXP = '<tr><td>(%s)<script type="text\/javascript">document\.write\(":"((?:\+\w)+)\)' % IP_REGEXP
-
-def _get_digits_map(page):
-    result = {}
-    for x in re.findall('(\w=\d;)', page):
-        if result.has_key(x[0]):
-            raise ValueError
-        result[x[0]] = x[2]
-    return result
-
-def get_workers(fetcher, start_page=1, end_page=25):
-    if 'http' in fetcher.allowed_protocols:
-        for page_num in range(start_page, end_page+1):
-            yield (worker, (fetcher, page_num))
-
-def worker(fetcher, page_num):
-    page = fetcher.urlopen('http://www.samair.ru/proxy/proxy-%02d.htm' % page_num)
-    digits_map = _get_digits_map(page)
-    for ip, port_coded in re.findall(REGEXP, page):
-        port = ''.join(map(lambda x: digits_map[x], port_coded.strip('+').split('+')))
-        yield ('http://%s:%s' % (ip, port), {})

File fetchers/sockslist_net.py

-from re import findall, DOTALL
-
-def get_workers(fetcher, start_page=1, end_page=3):
-    if 'socks' in fetcher.allowed_protocols:
-        for page_num in range(start_page, end_page+1):
-            yield (worker, (fetcher, page_num))
-
-def worker(fetcher, page_num):
-    content = fetcher.urlopen('http://sockslist.net/proxy/server-socks-hide-ip-address/%d' % page_num)
-    port_modifier = int(findall('String\.fromCharCode\(([\-\+]{1}\d+)\+parseInt\(a\[i\]\)\)',content)[0])
-    for ip, port_cripted, country, types in findall('<td class=\"t_ip\">(\d+\.\d+\.\d+\.\d+)<\/td>\s*<td class=\"t_port\">([\d\,]+)<\/td>\s*<td class=\"t_country\">\s*<img src=\"\/images\/flags\/(\w{2})\.png\".*?<td class=\"t_type\">\s*([\d\/]+)',content, DOTALL):
-        port = ''.join([chr(int(x) + port_modifier) for x in port_cripted.split(',')])
-        yield ('socks://%s:%s' % (ip, port), {'types': map(int, types.split('/')), 'country': country.upper()})

File fetchers/xroxy_com.py

-from re import findall
-
-URL = 'http://www.xroxy.com/proxylist.php?pnum=%d'
-REGEXP = '&host=(\d+\.\d+\.\d+\.\d+)&port=(\d+)'
-
-def get_workers(fetcher, start_page=1, end_page=9):
-    if 'http' in fetcher.allowed_protocols:
-        for page_num in range(start_page-1, end_page):
-            yield (worker, (fetcher, page_num))
-
-def worker(fetcher, page_num):
-    content = fetcher.urlopen(URL % page_num)
-    for ip, port in findall(REGEXP, content):
-        yield ('http://%s:%s' % (ip, port), {})

File proxyfetcher.py

-#!/usr/bin/python
-import glob
-import os
-import logging
-import datetime
-
-import gevent
-from gevent.queue import Queue
-from gevent.pool import Pool
-if __name__ == '__main__':
-    from gevent import monkey; monkey.patch_all()
-
-import browser
-from utils import IP_REGEXP, IP_PORT_REGEXP
-
-log = logging.getLogger('proxyfetcher')
-
-class ProxyFetcher(object):
-    fetchers = {}
-    fetched_time = None
-
-    def __init__(self, urlopen_timeout=5, urlopen_tries=3, fetchers={},
-                 allowed_protocols=('http',), workers_pool_size=30, worker_timeout=30,
-                 autopopulate_fetchers=False, fetching_delay=5*60):
-
-        self.urlopen_timeout = urlopen_timeout
-        self.urlopen_tries = urlopen_tries
-        self.allowed_protocols = allowed_protocols
-        self.fetching_delay = fetching_delay
-
-        if autopopulate_fetchers:
-            self.fetchers = self.get_fetchers()
-        else:
-            self.fetchers = self.get_fetchers(fetchers.keys())
-        self.fetchers_config = fetchers
-
-
-        self.worker_timeout = worker_timeout
-
-        self.pool = Pool(workers_pool_size)
-        self.queue = Queue()
-
-    @property
-    def is_working(self):
-        return self.pool.free_count() != self.pool.size
-
-    def from_text(self, text, protocol='http'):
-        return ['%s:%s' % (protocol, proxy) for proxy in re.findall(IP_PORT_REGEXP, text)]
-
-    def from_url(self, url, **kwargs):
-        return self.from_text(self.urlread(url, **kwargs))
-
-    def from_file(self, filename, **kwargs):
-        return self.from_text(open(filename, 'r').read())
-
-    def get_fetchers(self, names=None):
-        result = {}
-        for path in glob.iglob(os.path.join(os.path.dirname(__file__), 'fetchers', '*.py')):
-            name = os.path.splitext(os.path.basename(path))[0]
-            if name != '__init__' and ((names == None) or name in names):
-                result[name] = __import__('fetchers.%s' % name, fromlist=['fetchers'])
-        if not len(result):
-            raise ValueError('No fetchers configured. You should set autopopulate_fetchers or specify fetchers.')
-        return result
-
-    def urlread(self, url, data=None, timeout=None, tries=None):
-        tries = self.urlopen_tries
-        while True:
-            try:
-                log.debug('Opening %s' % url)
-                return browser.urlopen(url, timeout=self.urlopen_timeout, gzip=True).read()
-            except browser.exceptions, e:
-                log.debug('Error opening %s: %s' % (url, e))
-                tries -= 1
-                if tries <= 0:
-                    raise
-
-    #temporarily for compability in fetchers
-    def urlopen(self, *args, **kwargs):
-        return self.urlread(*args, **kwargs)
-
-    def get_workers(self):
-        for name, module in self.fetchers.items():
-            try:
-                for worker, args in module.get_workers(self, **self.fetchers_config.get(name, {})):
-                    #yield worker, args
-                    yield gevent.with_timeout, (self.worker_timeout, self._process_worker, name, worker, args)
-            except Exception, e:
-                log.warning('Failed module %s: %s: %s' % (name, e.__class__.__name__, e))
-
-    def _process_worker(self, source_name, worker, args):
-        import sys
-        for proxy, meta in worker(*args):
-            meta['source'] = source_name
-            self.queue.put((proxy, meta))
-
-    def get_proxies(self):
-        if self.fetching_delay and self.fetched_time:
-            past_seconds = (datetime.datetime.now() - self.fetched_time).seconds
-            if past_seconds < self.fetching_delay:
-                sleep_seconds = self.fetching_delay - past_seconds
-                log.info('Sleeping %s seconds till next proxy fetching' % sleep_seconds)
-                sleep(sleep_seconds)
-
-        #pool = Pool(self.workers_pool_size)
-        pool = self.pool
-        log.debug('Spawning new proxy workers')
-        workers_counter = 0
-        proxies_counter = 0
-        for worker, args in self.get_workers():
-            pool.spawn(worker, *args)
-            workers_counter += 1
-            while not self.queue.empty():
-                yield self.queue.next()
-                proxies_counter += 1
-        log.debug('Spawned %d proxy workers' % workers_counter)
-        pool.join()
-        while not self.queue.empty():
-            yield self.queue.next()
-            proxies_counter += 1
-        log.debug('Fetched %d proxies' % proxies_counter)
-
-        self.fetched_time = datetime.datetime.now()
-
-if __name__ == '__main__':
-    import sys
-    logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(name)s %(message)s', stream=sys.stdout)
-    #fetcher = ProxyFetcher(fetchers={'proxyhttp_net': {}})
-    fetcher = ProxyFetcher(fetchers={'xroxy_com': {'end_page': 37}})
-
-    for proxy, meta in fetcher.get_proxies():
-        logging.debug('%s %s' % (proxy, meta))

File proxylist.py

-#!/usr/bin/python
-import logging
-import urlparse
-from datetime import datetime
-import os.path
-import random
-
-try:
-    import cPickle as pickle
-except ImportError:
-    import pickle
-
-import gevent
-from gevent.coros import Semaphore
-if __name__ == '__main__':
-    from gevent import monkey; monkey.patch_all()
-
-from proxyfetcher import ProxyFetcher
-from browser import exceptions as proxy_fail_exceptions
-
-__all__ = ['Proxy', 'InsufficientProxiesError', 'ProxyList']
-
-log = logging.getLogger('proxylist')
-
-
-class InsufficientProxiesError(RuntimeError):
-    pass
-
-
-class Proxy(object):
-    url = None
-    in_use = 0
-    failed = 0
-    success_time = None
-    fail_time = None
-    meta = {}
-
-    def split(self):
-        return urlparse.urlsplit(self.url)
-
-    def __init__(self, url, meta={}):
-        self.url, self.meta = url, meta.copy()
-
-    def __str__(self):
-        return self.url
-
-    def __hash__(self):
-        return hash(self.url)
-
-    def __cmp__(self, other):
-        return cmp(hash(self.url), hash(other.url))
-
-    #Below simple compability with urlparse.SplitResult
-    def geturl(self):
-        return self.url
-
-    @property
-    def netloc(self):
-        return self.split().netloc
-
-    @property
-    def scheme(self):
-        return self.split().scheme
-
-
-class ProxyList(object):
-    proxies = []
-    blacklist = []
-
-    proxy_fail_exceptions = proxy_fail_exceptions
-
-    _update_initiated = False
-
-    def __init__(self, min_size=30, max_fail=3, max_simultaneous=1,
-                 clear_blacklist_on_update=True, raise_on_insufficient=False,
-                 status_filename=None, debug_error_responses=False,
-                 **fetcher_kwargs):
-        self.min_size = min_size
-        self.max_fail = max_fail
-        if min_size < 1 or max_simultaneous < 1:
-            raise ValueError
-        self.max_simultaneous = max_simultaneous
-        self.clear_blacklist_on_update = clear_blacklist_on_update
-        self.debug_error_responses = debug_error_responses
-        self.raise_on_insufficient = raise_on_insufficient
-
-        self.fetcher = ProxyFetcher(**fetcher_kwargs)
-        self.ready = Semaphore()
-
-        self.status_filename = status_filename
-        if status_filename and os.path.exists(status_filename):
-                try:
-                    self.load_status_from_file(status_filename)
-                except Exception, e:
-                    log.error('Error loading status: %s' % e)
-
-    #this cause problems because garbage collector deletes objects without links
-    #before we could save Proxy instance to pickle..
-    #def __del__(self):
-    #    if self.status_filename:
-    #        self.save_status(self.status_filename)
-
-    #@property
-    #def is_fetching(self):
-    #    return self.fetcher.pool.free_count() != self.fetcher.pool.size
-
-    def load_status_from_file(self, filename):
-        self.proxies, self.blacklist = pickle.loads(open(filename).read())
-        for proxy in self.proxies:
-            proxy.in_use = 0
-        for proxy in self.blacklist:
-            proxy.in_use = 0
-        log.info('Loaded proxies: good: %s, blacklist: %s' % (len(self.proxies), len(self.blacklist)))
-
-    def save_status(self):
-        self.save_status_to_file(self.status_filename)
-
-    def save_status_to_file(self, filename):
-        open(filename, 'w').write(pickle.dumps([self.proxies, self.blacklist], pickle.HIGHEST_PROTOCOL))
-        log.debug('Saved proxies: good: %s, blacklist: %s' % (len(self.proxies), len(self.blacklist)))
-
-
-    def _check_ready(self):
-        if self.need_update and not self._update_initiated:
-            self.update()
-        if self.ready.locked():
-            self.ready.wait()
-
-    @property
-    def need_update(self):
-        return (len(self.proxies) < self.min_size)
-
-    def _update(self):
-        if self.fetcher.is_working:
-            log.warning('Fetcher already working, initiating update again')
-        if self.clear_blacklist_on_update:
-            self.blacklist = []
-        for proxy, meta in self.fetcher.get_proxies():
-            self.add_proxy(proxy, meta)
-        self._update_initiated = False
-        if len(self.proxies) < self.min_size:
-            raise RuntimeError('Insufficient proxies after fetching')
-
-    def update(self):
-        self._update_initiated = True
-        gevent.spawn(self._update)
-
-    def add_proxy(self, proxy, meta={}):
-        if not isinstance(proxy, Proxy):
-            proxy = Proxy(proxy, meta)
-
-        if proxy not in self.proxies and proxy not in self.blacklist:
-            self.proxies.append(proxy)
-            if self.ready.locked():
-                self.ready.release()
-
-    def get_random_proxy(self, **kwargs):
-        self._check_ready()
-        tries = len(self.proxies)
-        if not tries:
-            return self.get_best_proxy(**kwargs)
-        while True:
-            proxy = random.choice(self.proxies)
-            if proxy.in_use < self.max_simultaneous:
-                break
-            tries -= 1
-            if tries <= 0:
-                return self.get_best_proxy(**kwargs)
-        proxy.in_use += 1
-        return proxy
-
-    def get_best_proxy(self, **kwargs):
-        self._check_ready()
-        min_failed = self.max_fail
-        best_proxy = None
-
-        for proxy in self.proxies:
-            if proxy.in_use < self.max_simultaneous:
-                min_failed = proxy.failed
-                best_proxy = proxy
-
-        if not best_proxy:
-            if self.raise_on_insufficient:
-                raise InsufficientProxiesError()
-            self.ready.acquire(blocking=False)
-            # if not self.fetcher.is_working:
-            #    this could happens on initial
-            #    raise RuntimeError('Fetcher is not working, while there are no proxies left')
-            return self.get_best_proxy(**kwargs)
-        best_proxy.in_use += 1
-        return best_proxy
-
-    def get_proxy_by_url(self, url):
-        for proxy in self.proxies:
-            if proxy.url == url:
-                return proxy
-        for proxy in self.blacklist:
-            if proxy.url == url:
-                return proxy
-        raise ValueError('Proxy with url %s not found' % url)
-
-    def failed_proxy(self, proxy):
-        if not isinstance(proxy, Proxy):
-            proxy = self.get_proxy_by_url(proxy)
-        proxy.failed += 1
-        proxy.fail_time = datetime.now()
-
-        if proxy.failed >= self.max_fail:
-            proxy.in_use -= 1
-            self.remove_proxy(proxy)
-        else:
-            self.release_proxy(proxy)
-
-    def succeed_proxy(self, proxy):
-        if not isinstance(proxy, Proxy):
-            proxy = self.get_proxy_by_url(proxy)
-        proxy.failed -= 1
-        proxy.success_time = datetime.now()
-        self.release_proxy(proxy)
-
-    success_proxy = succeed_proxy #for backwards compability
-
-    def remove_proxy(self, proxy):
-        try:
-            if not isinstance(proxy, Proxy):
-                proxy = self.get_proxy_by_url(proxy)
-            self.proxies.remove(proxy)
-        except ValueError:
-            # proxy already not in list
-            pass
-        else:
-            self.blacklist.append(proxy)
-        log.debug('Removed proxy %s, proxies left: %s' % (proxy, len(self.proxies)))
-
-    def release_proxy(self, proxy):
-        if not isinstance(proxy, Proxy):
-            proxy = self.get_proxy_by_url(proxy)
-        proxy.in_use -= 1
-        if self.ready.locked() and proxy in self.proxies:
-            self.ready.release()
-
-    def random_proxy_context(self, *args, **kwargs):
-        return ProxyContextManager(self, self.get_random_proxy, *args, **kwargs)
-
-    def best_proxy_context(self, *args, **kwargs):
-        return ProxyContextManager(self, self.get_best_proxy, *args, **kwargs)
-
-
-class ProxyContextManager:
-    def __init__(self, proxylist, function, *args, **kwargs):
-        (self.proxylist, self.function, self.args, self.kwargs) = \
-        (proxylist, function, args, kwargs)
-
-    def __enter__(self):
-        proxy = self.function(*self.args, **self.kwargs)
-        self.proxy = proxy
-        return proxy
-
-    def __exit__(self, type, value, traceback):
-        if type == None:
-            self.proxylist.succeed_proxy(self.proxy)
-        elif type in self.proxylist.proxy_fail_exceptions:
-            self.proxylist.failed_proxy(self.proxy)
-            log.debug('Proxy failed %s: %s: %s' % (self.proxy, type.__name__, value))
-            if self.proxylist.debug_error_responses:
-                open('%s-%s' % (value.code, self.proxy.replace('/','')), 'w').write(value.read())
-            return True
-        else:
-            self.proxylist.release_proxy(self.proxy)
-
-
-if __name__ == '__main__':
-    import sys
-    logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(name)s %(message)s', stream=sys.stdout)
-    proxylist = ProxyList(min_size=5, clear_blacklist_on_update=False, fetchers={'xroxy_com': {'end_page': 30}}, workers_pool_size=10)
-    def test1():
-        proxy = proxylist.get_best_proxy()
-        logging.debug(proxy)
-        proxylist.failed_proxy(proxy)
-    def test2():
-        proxy = proxylist.get_random_proxy()
-        logging.debug(proxy)
-        proxylist.succeed_proxy(proxy)
-    gevent.joinall([gevent.spawn(x) for x in [test1, test2]*3])
-    proxylist.save_status()

File proxytools/__init__.py

View file
+from proxytools.proxylist import *
+from proxytools.proxyfetcher import *

File proxytools/browser.py

View file
+import urllib, urllib2, urlparse, httplib
+import socket
+import os.path
+import random
+from datetime import datetime
+try:
+    from cStringIO import StringIO
+except ImportError:
+    from StringIO import StringIO
+import gzip
+
+
+HTTPError, URLError, SocketError, BadStatusLine, IncompleteRead = urllib2.HTTPError, urllib2.URLError, socket.error, httplib.BadStatusLine, httplib.IncompleteRead
+
+class ProxyError(Exception):
+    pass
+
+exceptions = (HTTPError, URLError, SocketError, BadStatusLine, IncompleteRead, ProxyError)
+
+user_agents = [row.strip() for row in open(os.path.join(os.path.dirname(__file__), 'user-agents.txt'), 'r').read().split('\n') if row.strip()]
+
+def get_random_user_agent():
+    return random.choice(user_agents)
+
+DEFAULT_HEADERS = {
+    'User-Agent': get_random_user_agent,
+    'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
+    'Accept-Language': 'en-us,en;q=0.5',
+}
+
+def urlencode(data):
+    '''
+    this is small fix for dictionaries that have tuples as their values,
+    to make things more simple
+    '''
+    if hasattr(data, 'items'):
+        query = []
+        for key, value in data.items():
+            if hasattr(value, '__iter__'):
+                for val in value:
+                    query.append((key, val))
+            else:
+                query.append((key, value))
+        return urllib.urlencode(query)
+    return urllib.urlencode(data)
+
+#from http://bugs.python.org/issue9500
+from gzip import GzipFile
+from StringIO import StringIO
+import zlib
+class ContentEncodingHandler(urllib2.BaseHandler):
+  """A handler to add gzip capabilities to urllib2 requests """
+
+  def http_request(self, req):
+    req.add_header("Accept-Encoding", "gzip, deflate")
+    return req
+
+  def http_response(self, req, resp):
+    old_resp = resp
+    if resp.headers.get("content-encoding") == "gzip":
+        gz = GzipFile(
+                    fileobj=StringIO(resp.read()),
+                    mode="r"
+                  )
+        resp = urllib2.addinfourl(gz, old_resp.headers, old_resp.url, old_resp.code)
+        resp.msg = old_resp.msg
+    if resp.headers.get("content-encoding") == "deflate":
+        content = resp.read()
+        gz = StringIO( self.deflate(resp.read()) )
+        resp = urllib2.addinfourl(gz, old_resp.headers, old_resp.url, old_resp.code)
+        resp.msg = old_resp.msg
+    return resp
+
+    @staticmethod
+    def deflate(data):
+    # zlib only provides the zlib compress format, not the deflate format;
+        try:               # so on top of all there's this workaround:
+            return zlib.decompress(data, -zlib.MAX_WBITS)
+        except zlib.error:
+            return zlib.decompress(data)
+
+def urlopen(url, data=None, headers={}, proxy=None, gzip=False, handlers=[], timeout=20, http_debuglevel=0):
+    '''
+    Helper function to browse the web using python :-)
+    Note that using gzip=True all page internally would be readed.
+    '''
+    handlers = handlers[:]
+    if http_debuglevel:
+        handlers.insert(0, urllib2.HTTPHandler(debuglevel=http_debuglevel))
+    headers = headers.copy()
+    if proxy:
+        parsed_proxy = urlparse.urlsplit(str(proxy))
+        handlers.append(urllib2.ProxyHandler({parsed_proxy.scheme: parsed_proxy.netloc}))
+    if gzip:
+        handlers.append(ContentEncodingHandler)
+    opener = urllib2.build_opener(*handlers)
+
+    if data:
+        data = urlencode(data)
+
+    for header, default in DEFAULT_HEADERS.items():
+        if not headers.has_key(header):
+            if callable(default):
+                default = default()
+            elif hasattr(default, '__iter__'):
+                default = random.choice(default)
+            headers[header] = default
+
+    request = urllib2.Request(url, data, headers)
+    return opener.open(request, timeout=timeout)
+
+if __name__ == '__main__':
+    print 'content size: %s' % len(urlopen('http://google.com/', gzip=True).read())

File proxytools/fetchers/__init__.py

Empty file added.

File proxytools/fetchers/hideme_ru.py

View file
+#NOTE: you'll need gocr and imagemagick packages
+import re
+from lxml import html
+
+from proxytools.utils import async_popen_communicate
+
+URL = 'http://hideme.ru/proxy-list/'
+
+def get_country_data(doc):
+    for o in doc.cssselect('select#country option'):
+        yield o.get('value'), int(re.search('(\d+)', o.text_content()).groups()[0])
+
+def get_proxy_params(doc):
+    for tr in doc.cssselect('div#tgr table.pl tr'):
+        tds = tr.cssselect('td')
+        if not tds:
+            #this is header
+            continue
+        ip = tds[0].text_content()
+        port_img_url = tds[1].cssselect('img')[0].get('src')
+        protocols = [p.strip().lower() for p in tds[5].text_content().split(',') if p.strip()]
+        yield ip, port_img_url, protocols
+
+def get_workers(fetcher, **kwargs):
+    doc = html.document_fromstring(fetcher.urlopen(URL))
+    for proxy_params in get_proxy_params(doc):
+        yield worker, (fetcher, proxy_params)
+    for code, count in get_country_data(doc):
+        if count >= 10:
+            #not wasting time
+            #print 'parsing %s, %s' % (code, count)
+            doc = html.document_fromstring(fetcher.urlopen(URL + '?country=%s' % code))
+            for proxy_params in get_proxy_params(doc):
+                yield worker, (fetcher, proxy_params)
+
+def worker(fetcher, proxy_params):
+    ip, image_url, protocols = proxy_params
+    port = async_popen_communicate('convert - pbm:- | gocr -C 0-9 -', fetcher.urlopen('http://hideme.ru%s' % image_url), shell=True).strip().replace(' ', '')
+    if not re.match('^\d+$', port):
+        return
+    yield ('%s://%s:%s' % (protocols[0], ip, port), {})

File proxytools/fetchers/hidemyass_com.py

View file
+from proxytools.utils import IP_REGEXP
+import re
+
+def get_workers(fetcher, start_page=1, end_page=33):
+    for page_num in range(start_page, end_page+1):
+        yield (worker, (fetcher, page_num))
+
+def worker(fetcher, page_num):
+    for ip, port, country in re.findall('>(%s)<\/span><\/td>\s*<td>\s*(\d+)<\/td>\s*<td rel=\"(\w{2})\">' % IP_REGEXP, fetcher.urlopen('http://hidemyass.com/proxy-list/%d' % page_num), re.DOTALL):
+        yield ('http://%s:%s' % (ip, port), {'country': country.upper()})

File proxytools/fetchers/proxyhttp_net.py

View file
+from lxml import html
+
+def get_workers(fetcher, start_page=1, end_page=9):
+    if 'http' in fetcher.allowed_protocols:
+        for page_num in range(start_page, end_page+1):
+            yield (worker, (fetcher, page_num))
+
+def worker(fetcher, page_num):
+    doc = html.fromstring(fetcher.urlread('http://proxyhttp.net/free-list/anonymous-server-hide-ip-address/%d' % page_num))
+    code = doc.cssselect('div#proxylist')[0].getnext().getnext().text.split('\n')[2]
+    #content = 
+    #port_modifier = int(findall('String\.fromCharCode\(([\-\+]{1}\d+)\+parseInt\(a\[i\]\)\)',content)[0])
+    #for ip, port_cripted in findall('<td class=\"t_ip\">(\d+\.\d+\.\d+\.\d+)<\/td>\s*<td class=\"t_port\">([\d\,]+)<\/td>',content):
+    #    port = ''.join([chr(int(x) + port_modifier) for x in port_cripted.split(',')])
+    #    yield ('http://%s:%s' % (ip, port), {})

File proxytools/fetchers/samair_ru.py

View file
+from proxytools.utils import IP_REGEXP
+import re
+
+REGEXP = '<tr><td>(%s)<script type="text\/javascript">document\.write\(":"((?:\+\w)+)\)' % IP_REGEXP
+
+def _get_digits_map(page):
+    result = {}
+    for x in re.findall('(\w=\d;)', page):
+        if result.has_key(x[0]):
+            raise ValueError
+        result[x[0]] = x[2]
+    return result
+
+def get_workers(fetcher, start_page=1, end_page=25):
+    if 'http' in fetcher.allowed_protocols:
+        for page_num in range(start_page, end_page+1):
+            yield (worker, (fetcher, page_num))
+
+def worker(fetcher, page_num):
+    page = fetcher.urlopen('http://www.samair.ru/proxy/proxy-%02d.htm' % page_num)
+    digits_map = _get_digits_map(page)
+    for ip, port_coded in re.findall(REGEXP, page):
+        port = ''.join(map(lambda x: digits_map[x], port_coded.strip('+').split('+')))
+        yield ('http://%s:%s' % (ip, port), {})

File proxytools/fetchers/sockslist_net.py

View file
+from re import findall, DOTALL
+
+def get_workers(fetcher, start_page=1, end_page=3):
+    if 'socks' in fetcher.allowed_protocols:
+        for page_num in range(start_page, end_page+1):
+            yield (worker, (fetcher, page_num))
+
+def worker(fetcher, page_num):
+    content = fetcher.urlopen('http://sockslist.net/proxy/server-socks-hide-ip-address/%d' % page_num)
+    port_modifier = int(findall('String\.fromCharCode\(([\-\+]{1}\d+)\+parseInt\(a\[i\]\)\)',content)[0])
+    for ip, port_cripted, country, types in findall('<td class=\"t_ip\">(\d+\.\d+\.\d+\.\d+)<\/td>\s*<td class=\"t_port\">([\d\,]+)<\/td>\s*<td class=\"t_country\">\s*<img src=\"\/images\/flags\/(\w{2})\.png\".*?<td class=\"t_type\">\s*([\d\/]+)',content, DOTALL):
+        port = ''.join([chr(int(x) + port_modifier) for x in port_cripted.split(',')])
+        yield ('socks://%s:%s' % (ip, port), {'types': map(int, types.split('/')), 'country': country.upper()})

File proxytools/fetchers/xroxy_com.py

View file
+from re import findall
+
+URL = 'http://www.xroxy.com/proxylist.php?pnum=%d'
+REGEXP = '&host=(\d+\.\d+\.\d+\.\d+)&port=(\d+)'
+
+def get_workers(fetcher, start_page=1, end_page=9):
+    if 'http' in fetcher.allowed_protocols:
+        for page_num in range(start_page-1, end_page):
+            yield (worker, (fetcher, page_num))
+
+def worker(fetcher, page_num):
+    content = fetcher.urlopen(URL % page_num)
+    for ip, port in findall(REGEXP, content):
+        yield ('http://%s:%s' % (ip, port), {})

File proxytools/proxyfetcher.py

View file
+#!/usr/bin/env python
+import glob
+import os
+import logging
+import datetime
+
+import gevent
+from gevent.queue import Queue
+from gevent.pool import Pool
+if __name__ == '__main__':
+    from gevent import monkey; monkey.patch_all()
+
+from proxytools import browser
+from proxytools.utils import IP_REGEXP, IP_PORT_REGEXP
+
+__all__ = ['ProxyFetcher']
+
+log = logging.getLogger('proxytools.proxyfetcher')
+
+
+class ProxyFetcher(object):
+    fetchers = {}
+    fetched_time = None
+
+    def __init__(self, urlopen_timeout=5, urlopen_tries=3, fetchers={},
+                 allowed_protocols=('http',), workers_pool_size=30, worker_timeout=30,
+                 autopopulate_fetchers=False, fetching_delay=5*60):
+
+        self.urlopen_timeout = urlopen_timeout
+        self.urlopen_tries = urlopen_tries
+        self.allowed_protocols = allowed_protocols
+        self.fetching_delay = fetching_delay
+
+        if autopopulate_fetchers:
+            self.fetchers = self.get_fetchers()
+        else:
+            self.fetchers = self.get_fetchers(fetchers.keys())
+        self.fetchers_config = fetchers
+
+
+        self.worker_timeout = worker_timeout
+
+        self.pool = Pool(workers_pool_size)
+        self.queue = Queue()
+
+    @property
+    def is_working(self):
+        return self.pool.free_count() != self.pool.size
+
+    def from_text(self, text, protocol='http'):
+        return ['%s:%s' % (protocol, proxy) for proxy in re.findall(IP_PORT_REGEXP, text)]
+
+    def from_url(self, url, **kwargs):
+        return self.from_text(self.urlread(url, **kwargs))
+
+    def from_file(self, filename, **kwargs):
+        return self.from_text(open(filename, 'r').read())
+
+    def get_fetchers(self, names=None):
+        result = {}
+        from proxytools import fetchers
+        for path in glob.iglob(os.path.join(os.path.dirname(fetchers.__file__), '*.py')):
+            name = os.path.splitext(os.path.basename(path))[0]
+            if name != '__init__' and ((names == None) or name in names):
+                result[name] = __import__('proxytools.fetchers.%s' % name, fromlist=['proxytools', 'fetchers'])
+        if not len(result):
+            raise ValueError('No fetchers configured. You should set autopopulate_fetchers or specify fetchers.')
+        return result
+
+    def urlread(self, url, data=None, timeout=None, tries=None):
+        tries = self.urlopen_tries
+        while True:
+            try:
+                log.debug('Opening %s' % url)
+                return browser.urlopen(url, timeout=self.urlopen_timeout, gzip=True).read()
+            except browser.exceptions, e:
+                log.debug('Error opening %s: %s' % (url, e))
+                tries -= 1
+                if tries <= 0:
+                    raise
+
+    #temporarily for compability in fetchers
+    def urlopen(self, *args, **kwargs):
+        return self.urlread(*args, **kwargs)
+
+    def get_workers(self):
+        for name, module in self.fetchers.items():
+            try:
+                for worker, args in module.get_workers(self, **self.fetchers_config.get(name, {})):
+                    #yield worker, args
+                    yield gevent.with_timeout, (self.worker_timeout, self._process_worker, name, worker, args)
+            except Exception, e:
+                log.warning('Failed module %s: %s: %s' % (name, e.__class__.__name__, e))
+
+    def _process_worker(self, source_name, worker, args):
+        import sys
+        for proxy, meta in worker(*args):
+            meta['source'] = source_name
+            self.queue.put((proxy, meta))
+
+    def get_proxies(self):
+        if self.fetching_delay and self.fetched_time:
+            past_seconds = (datetime.datetime.now() - self.fetched_time).seconds
+            if past_seconds < self.fetching_delay:
+                sleep_seconds = self.fetching_delay - past_seconds
+                log.info('Sleeping %s seconds till next proxy fetching' % sleep_seconds)
+                sleep(sleep_seconds)
+
+        #pool = Pool(self.workers_pool_size)
+        pool = self.pool
+        log.debug('Spawning new proxy workers')
+        workers_counter = 0
+        proxies_counter = 0
+        for worker, args in self.get_workers():
+            pool.spawn(worker, *args)
+            workers_counter += 1
+            while not self.queue.empty():
+                yield self.queue.next()
+                proxies_counter += 1
+        log.debug('Spawned %d proxy workers' % workers_counter)
+        pool.join()
+        while not self.queue.empty():
+            yield self.queue.next()
+            proxies_counter += 1
+        log.debug('Fetched %d proxies' % proxies_counter)
+
+        self.fetched_time = datetime.datetime.now()
+
+
+if __name__ == '__main__':
+    import sys
+    logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(name)s %(message)s', stream=sys.stdout)
+    #fetcher = ProxyFetcher(fetchers={'proxyhttp_net': {}})
+    fetcher = ProxyFetcher(fetchers={'xroxy_com': {'end_page': 37}})
+
+    for proxy, meta in fetcher.get_proxies():
+        logging.debug('%s %s' % (proxy, meta))

File proxytools/proxylist.py

View file
+#!/usr/bin/env python
+import logging
+import urlparse
+from datetime import datetime
+import os.path
+import random
+
+try:
+    import cPickle as pickle
+except ImportError:
+    import pickle
+
+import gevent
+from gevent.coros import Semaphore
+if __name__ == '__main__':
+    from gevent import monkey; monkey.patch_all()
+
+from proxytools.proxyfetcher import ProxyFetcher
+from proxytools.browser import exceptions as proxy_fail_exceptions
+
+__all__ = ['Proxy', 'InsufficientProxiesError', 'ProxyList']
+
+log = logging.getLogger('proxytools.proxylist')
+
+
+class InsufficientProxiesError(RuntimeError):
+    pass
+
+
+class Proxy(object):
+    url = None
+    in_use = 0
+    failed = 0
+    success_time = None
+    fail_time = None
+    meta = {}
+
+    def split(self):
+        return urlparse.urlsplit(self.url)
+
+    def __init__(self, url, meta={}):
+        self.url, self.meta = url, meta.copy()
+
+    def __str__(self):
+        return self.url
+
+    def __hash__(self):
+        return hash(self.url)
+
+    def __cmp__(self, other):
+        return cmp(hash(self.url), hash(other.url))
+
+    #Below simple compability with urlparse.SplitResult
+    def geturl(self):
+        return self.url
+
+    @property
+    def netloc(self):
+        return self.split().netloc
+
+    @property
+    def scheme(self):
+        return self.split().scheme
+
+
+class ProxyList(object):
+    proxies = []
+    blacklist = []
+
+    proxy_fail_exceptions = proxy_fail_exceptions
+
+    _update_initiated = False
+
+    def __init__(self, min_size=30, max_fail=3, max_simultaneous=1,
+                 clear_blacklist_on_update=True, raise_on_insufficient=False,
+                 status_filename=None, debug_error_responses=False,
+                 **fetcher_kwargs):
+        self.min_size = min_size
+        self.max_fail = max_fail
+        if min_size < 1 or max_simultaneous < 1:
+            raise ValueError
+        self.max_simultaneous = max_simultaneous
+        self.clear_blacklist_on_update = clear_blacklist_on_update
+        self.debug_error_responses = debug_error_responses
+        self.raise_on_insufficient = raise_on_insufficient
+
+        self.fetcher = ProxyFetcher(**fetcher_kwargs)
+        self.ready = Semaphore()
+
+        self.status_filename = status_filename
+        if status_filename and os.path.exists(status_filename):
+                try:
+                    self.load_status_from_file(status_filename)
+                except Exception, e:
+                    log.error('Error loading status: %s' % e)
+
+    #this cause problems because garbage collector deletes objects without links
+    #before we could save Proxy instance to pickle..
+    #def __del__(self):
+    #    if self.status_filename:
+    #        self.save_status(self.status_filename)
+
+    #@property
+    #def is_fetching(self):
+    #    return self.fetcher.pool.free_count() != self.fetcher.pool.size
+
+    def load_status_from_file(self, filename):
+        self.proxies, self.blacklist = pickle.loads(open(filename).read())
+        for proxy in self.proxies:
+            proxy.in_use = 0
+        for proxy in self.blacklist:
+            proxy.in_use = 0
+        log.info('Loaded proxies: good: %s, blacklist: %s' % (len(self.proxies), len(self.blacklist)))
+
+    def save_status(self):
+        self.save_status_to_file(self.status_filename)
+
+    def save_status_to_file(self, filename):
+        open(filename, 'w').write(pickle.dumps([self.proxies, self.blacklist], pickle.HIGHEST_PROTOCOL))
+        log.debug('Saved proxies: good: %s, blacklist: %s' % (len(self.proxies), len(self.blacklist)))
+
+
+    def _check_ready(self):
+        if self.need_update and not self._update_initiated:
+            self.update()
+        if self.ready.locked():
+            self.ready.wait()
+
+    @property
+    def need_update(self):
+        return (len(self.proxies) < self.min_size)
+
+    def _update(self):
+        if self.fetcher.is_working:
+            log.warning('Fetcher already working, initiating update again')
+        if self.clear_blacklist_on_update:
+            self.blacklist = []
+        for proxy, meta in self.fetcher.get_proxies():
+            self.add_proxy(proxy, meta)
+        self._update_initiated = False
+        if len(self.proxies) < self.min_size:
+            raise RuntimeError('Insufficient proxies after fetching')
+
+    def update(self):
+        self._update_initiated = True
+        gevent.spawn(self._update)
+
+    def add_proxy(self, proxy, meta={}):
+        if not isinstance(proxy, Proxy):
+            proxy = Proxy(proxy, meta)
+
+        if proxy not in self.proxies and proxy not in self.blacklist:
+            self.proxies.append(proxy)
+            if self.ready.locked():
+                self.ready.release()
+
+    def get_random_proxy(self, **kwargs):
+        self._check_ready()
+        tries = len(self.proxies)
+        if not tries:
+            return self.get_best_proxy(**kwargs)
+        while True:
+            proxy = random.choice(self.proxies)
+            if proxy.in_use < self.max_simultaneous:
+                break
+            tries -= 1
+            if tries <= 0:
+                return self.get_best_proxy(**kwargs)
+        proxy.in_use += 1
+        return proxy
+
+    def get_best_proxy(self, **kwargs):
+        self._check_ready()
+        min_failed = self.max_fail
+        best_proxy = None
+
+        for proxy in self.proxies:
+            if proxy.in_use < self.max_simultaneous:
+                min_failed = proxy.failed
+                best_proxy = proxy
+
+        if not best_proxy:
+            if self.raise_on_insufficient:
+                raise InsufficientProxiesError()
+            self.ready.acquire(blocking=False)
+            # if not self.fetcher.is_working:
+            #    this could happens on initial
+            #    raise RuntimeError('Fetcher is not working, while there are no proxies left')
+            return self.get_best_proxy(**kwargs)
+        best_proxy.in_use += 1
+        return best_proxy
+
+    def get_proxy_by_url(self, url):
+        for proxy in self.proxies:
+            if proxy.url == url:
+                return proxy
+        for proxy in self.blacklist:
+            if proxy.url == url:
+                return proxy
+        raise ValueError('Proxy with url %s not found' % url)
+
+    def failed_proxy(self, proxy):
+        if not isinstance(proxy, Proxy):
+            proxy = self.get_proxy_by_url(proxy)
+        proxy.failed += 1
+        proxy.fail_time = datetime.now()
+
+        if proxy.failed >= self.max_fail:
+            proxy.in_use -= 1
+            self.remove_proxy(proxy)
+        else:
+            self.release_proxy(proxy)
+
+    def succeed_proxy(self, proxy):
+        if not isinstance(proxy, Proxy):
+            proxy = self.get_proxy_by_url(proxy)
+        proxy.failed -= 1
+        proxy.success_time = datetime.now()
+        self.release_proxy(proxy)
+
+    success_proxy = succeed_proxy #for backwards compability
+
+    def remove_proxy(self, proxy):
+        try:
+            if not isinstance(proxy, Proxy):
+                proxy = self.get_proxy_by_url(proxy)
+            self.proxies.remove(proxy)
+        except ValueError:
+            # proxy already not in list
+            pass
+        else:
+            self.blacklist.append(proxy)
+        log.debug('Removed proxy %s, proxies left: %s' % (proxy, len(self.proxies)))
+
+    def release_proxy(self, proxy):
+        if not isinstance(proxy, Proxy):
+            proxy = self.get_proxy_by_url(proxy)
+        proxy.in_use -= 1
+        if self.ready.locked() and proxy in self.proxies:
+            self.ready.release()
+
+    def random_proxy_context(self, *args, **kwargs):
+        return ProxyContextManager(self, self.get_random_proxy, *args, **kwargs)
+
+    def best_proxy_context(self, *args, **kwargs):
+        return ProxyContextManager(self, self.get_best_proxy, *args, **kwargs)
+
+
+class ProxyContextManager:
+    def __init__(self, proxylist, function, *args, **kwargs):
+        (self.proxylist, self.function, self.args, self.kwargs) = \
+        (proxylist, function, args, kwargs)
+
+    def __enter__(self):
+        proxy = self.function(*self.args, **self.kwargs)
+        self.proxy = proxy
+        return proxy
+
+    def __exit__(self, type, value, traceback):
+        if type == None:
+            self.proxylist.succeed_proxy(self.proxy)
+        elif type in self.proxylist.proxy_fail_exceptions:
+            self.proxylist.failed_proxy(self.proxy)
+            log.debug('Proxy failed %s: %s: %s' % (self.proxy, type.__name__, value))
+            if self.proxylist.debug_error_responses:
+                open('%s-%s' % (value.code, self.proxy.replace('/','')), 'w').write(value.read())
+            return True
+        else:
+            self.proxylist.release_proxy(self.proxy)
+
+
+if __name__ == '__main__':
+    import sys
+    logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(name)s %(message)s', stream=sys.stdout)
+    proxylist = ProxyList(min_size=5, clear_blacklist_on_update=False, fetchers={'xroxy_com': {'end_page': 30}}, workers_pool_size=10)
+    def test1():
+        proxy = proxylist.get_best_proxy()
+        logging.debug(proxy)
+        proxylist.failed_proxy(proxy)
+    def test2():
+        proxy = proxylist.get_random_proxy()
+        logging.debug(proxy)
+        proxylist.succeed_proxy(proxy)
+    gevent.joinall([gevent.spawn(x) for x in [test1, test2]*3])
+    proxylist.save_status()

File proxytools/superproxy.py

View file
+#!/usr/bin/env python
+if __name__ == '__main__':
+    from gevent import monkey; monkey.patch_all()
+
+import httplib
+import urlparse
+import logging
+import base64
+
+import webob
+import gevent
+from gevent.pool import Pool
+from gevent.queue import Queue, Empty
+
+log = logging.getLogger('proxytools.superproxy')
+
+USER_AGENT_RANDOM_VALUE = 'random'
+#X-Superproxy-Worker-Timeout: 10
+#X-Superproxy-Workers-Count: 1
+#X-Superproxy-Require-Status: 200; 301; 302
+#X-Superproxy-Require-Content: base64(str1) and base64(str2) or base64(str1) and base64(str3)
+#X-Superproxy-Priority: 1
+#X-Superproxy-Parameters: 'method=best; ip=234.234.267.87'
+#X-Superproxy-Client: 'google-positions'
+#
+#response
+#X-Superproxy-Proxy
+#X-Superproxy-Meta
+
+from proxytools import browser
+from proxytools.proxylist import ProxyList
+
+class RequireContentError(browser.ProxyError):
+    pass
+
+class RequireStatusError(browser.ProxyError):
+    pass
+
+
+class SuperProxyRequest(object):
+    #Request default options
+    timeout = 60
+    worker_timeout = 15
+    workers_count = 2
+    require_status = [200, 301, 302]
+    require_content = []
+    priority = 1
+    parameters = {}
+    client = 'Unknown'
+
+    def __init__(self, path, body, headers):
+
+        self.path, self.body = path, body
+
+        #Configuring options from headers
+        for header, value in headers.items():
+            if header.lower().startswith('x-superproxy-'):
+                opt = header[13:].lower()
+                try:
+                    setattr(self, opt, getattr(self, 'parse_%s' % opt)(value))
+                except AttributeError:
+                    raise ValueError('Option %s not defined.' % opt)
+                del headers[header]
+        self.headers = headers
+
+    parse_worker_timeout = staticmethod(int)
+    parse_workers_count = staticmethod(int)
+    parse_priority = staticmethod(int)
+    parse_client = staticmethod(str)
+    parse_require_status = staticmethod(lambda x: [int(v.strip()) for v
+                                                   in x.split(';') if v.strip()])
+
+    @staticmethod
+    def parse_require_content(value):
+        result = []
+        condition = []
+        for val in value.split():
+            if val == 'or':
+                result.append(condition)
+                condition = []
+            elif val != 'and':
+                condition.append(base64.urlsafe_b64decode(val))
+        if condition:
+            result.append(condition)
+        return result
+
+    @staticmethod
+    def parse_parameters(value):
+        result = {}
+        for value_ in value.strip().split(';'):
+            key, val = value_.split('=')
+            result[key.strip()] = val.strip()
+        return result
+
+    def validate_status(self, status):
+        if not status in self.require_status:
+            raise RequireStatusError('Status %s not in %s'
+                                      % (status, self.require_status))
+
+    def validate_content(self, content):
+        if self.require_content:
+            for condition in self.require_content:
+                for value in condition:
+                    if not value in content:
+                        break
+                else:
+                    return
+            raise RequireContentError('Required content not found')
+
+    def __call__(self, pool, proxylist):
+        queue = Queue()
+        workers = []
+        try:
+            for _ in range(self.workers_count):
+                # Blocks if pool is full, even if first worker is spawned
+                # and already got result. To cancel spawning next worker
+                # if pool is full and we already got result use code below:
+                #if workers and not pool.free_count() and not self.queue.empty():
+                #        break
+                workers.append(pool.spawn(self.worker, queue, proxylist))
+
+            resp, content, proxy = queue.get(timeout=self.timeout)
+            #this raises gevent.queue.Empty on timeout
+        finally:
+            log.debug('Killing workers for %s' % self.path)
+            for worker in workers:
+                worker.kill()
+
+        drop_headers = ('content-encoding', 'transfer-encoding', 'content-length')
+        headers = [(k, v) for k, v in resp.headers.items() if k not in drop_headers]
+        headers += [('X-Superproxy-Proxy', proxy.url),
+                    ('X-Superproxy-Meta', '; '.join(['%s=%s' % (k,v) for k, v
+                                                     in proxy.meta.items()])),
+        ]
+        return '%s %s' % (resp.code, resp.msg), headers, content
+
+    def worker(self, queue, proxylist):
+        while True:
+            with proxylist.best_proxy_context() as proxy:
+                log.debug('Trying %s using %s' % (self.path, proxy))
+                resp = browser.urlopen(self.path, headers=dict(self.headers),
+                                       proxy=proxy, gzip=True,
+                                       timeout=self.worker_timeout)
+                self.validate_status(resp.code)
+                content = resp.read()
+                self.validate_content(content)
+                #print '---------'
+                #print content
+                #print '---------'
+                log.debug('Response recieved for %s by %s' % (self.path, proxy))
+                queue.put((resp, content, proxy))
+                return
+
+
+class SuperProxy(object):
+    def __init__(self, timeout=60, pool_size=50, **kwargs):
+        self.timeout = timeout
+        self.proxylist = ProxyList(**kwargs)
+        self.workers_pool = Pool(pool_size)
+        self.requests = []
+
+    def __call__(self, env, start_response):
+        req = webob.Request(env)
+
+        if req.path_info == '/':
+            return self.statistics(req, start_response)
+
+        if not req.path_info.startswith('http://'):
+            return self.deny_request(req, start_response, '501 Not Implemented')
+
+        return self.request(req, start_response)
+
+    def deny_request(self, req, start_response, response='503 Service Unavailable'):
+        start_response(response, [])
+        return [response]
+
+    def request(self, req, start_response):
+        path = req.path_info
+        if req.query_string:
+            path += '?' + req.query_string
+
+        headers = req.headers
+        log.debug('Request from %s to %s', req.remote_addr, path)
+
+        r = SuperProxyRequest(path, req.body, headers)
+        self.requests.append(r)
+        try:
+            status_line, headers, content = r(self.workers_pool, self.proxylist)
+        except Empty:
+            return self.deny_request(req, start_response, '504 Gateway Timeout')
+        finally:
+            self.requests.remove(r)
+
+        start_response(status_line, headers)
+        return [content]
+
+    def statistics(self, req, start_response):
+        #TODO make this usable and use jinja2 template
+        result = '<table><tr><td>url</td><td>in_use</td><td>failed</td><td>sucess_time</td><td>fail_time</td><td>meta</td></tr>'
+        for p in self.proxylist.proxies:
+            result += '<tr><td>%s</td> <td>%s</td><td>%s</td><td>%s</td><td>%s</td><td>%s<td/></tr>' % (p.url,
+                '<b>%s</b>' % p.in_use if p.in_use else p.in_use,
+                '<font color="red">%s</font>' % p.failed if p.failed > 0 else '<font color="green">%s</font>' % p.failed, p.success_time, p.fail_time, p.meta)
+        result += '</table>'
+        start_response('200 OK', [('Content-Type', 'text/html')])
+        return ['<html>Cool statistics here: Free slots: %s/%s<br/>%s</html>' %
+            (self.pool.free_count(), self.pool.size, result)
+        ]
+
+
+def main(interface='0.0.0.0', port=8088, **settings):
+    from gevent.pywsgi import WSGIServer
+    app = SuperProxy(**settings)
+    server = WSGIServer(
+        (interface, port),
+        app,
+    )
+    log.info('Serving on %s:%s...' % (interface, port))
+    try:
+        server.serve_forever()
+    finally:
+        app.proxylist.save_status()
+
+if __name__ == '__main__':
+    import sys
+    logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(name)s %(message)s', stream=sys.stdout)
+    main(fetchers={'hideme_ru': {}, 'xroxy_com': {'end_page': 10}}, status_filename='proxylist_status')

File proxytools/user-agents.txt

View file
+Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.9.0.6) Gecko/2009011913 Firefox/3.0.6
+Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10.5; en-US; rv:1.9.0.6) Gecko/2009011912 Firefox/3.0.6
+Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.9.0.6) Gecko/2009011913 Firefox/3.0.6 (.NET CLR 3.5.30729)
+Mozilla/5.0 (X11; U; Linux i686; en-US; rv:1.9.0.6) Gecko/2009020911 Ubuntu/8.10 (intrepid) Firefox/3.0.6
+Mozilla/5.0 (Windows; U; Windows NT 6.0; en-US; rv:1.9.0.6) Gecko/2009011913 Firefox/3.0.6
+Mozilla/5.0 (Windows; U; Windows NT 6.0; en-US; rv:1.9.0.6) Gecko/2009011913 Firefox/3.0.6 (.NET CLR 3.5.30729)
+Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US) AppleWebKit/525.19 (KHTML, like Gecko) Chrome/1.0.154.48 Safari/525.19
+Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; .NET CLR 1.1.4322; .NET CLR 2.0.50727; .NET CLR 3.0.04506.30; .NET CLR 3.0.04506.648)
+Mozilla/5.0 (X11; U; Linux x86_64; en-US; rv:1.9.0.6) Gecko/2009020911 Ubuntu/8.10 (intrepid) Firefox/3.0.6
+Mozilla/5.0 (X11; U; Linux i686; en-US; rv:1.9.0.5) Gecko/2008121621 Ubuntu/8.04 (hardy) Firefox/3.0.5
+Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10_5_6; en-us) AppleWebKit/525.27.1 (KHTML, like Gecko) Version/3.2.1 Safari/525.27.1
+Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; .NET CLR 1.1.4322)
+Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; .NET CLR 2.0.50727)
+Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1)

File proxytools/utils.py

View file
+import gevent
+from gevent import socket
+
+import subprocess
+import errno
+import sys
+import os
+import fcntl
+import re
+
+IP_REGEXP = '(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)'
+IP_PORT_REGEXP = '%s:\d+' % IP_REGEXP
+ip_regexp = re.compile('(%s)' % IP_REGEXP)
+ip_port_regexp = re.compile('(%s)' % IP_PORT_REGEXP)
+
+def async_popen_communicate(args, data='', shell=False):
+    """Communicate with the process non-blockingly."""
+    """An example on how to communicate with a subprocess.
+
+    Written by Marcus Cavanaugh.
+    See http://groups.google.com/group/gevent/browse_thread/thread/7fca7230db0509f6
+    where it was first posted.
+    """
+    p = subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=shell)
+    fcntl.fcntl(p.stdin, fcntl.F_SETFL, os.O_NONBLOCK)  # make the file nonblocking
+    fcntl.fcntl(p.stdout, fcntl.F_SETFL, os.O_NONBLOCK)  # make the file nonblocking
+
+    bytes_total = len(data)
+    bytes_written = 0
+    while bytes_written < bytes_total:
+        try:
+            # p.stdin.write() doesn't return anything, so use os.write.
+            bytes_written += os.write(p.stdin.fileno(), data[bytes_written:])
+        except IOError, ex:
+            if ex[0] != errno.EAGAIN:
+                raise
+            sys.exc_clear()
+        socket.wait_write(p.stdin.fileno())
+
+    p.stdin.close()
+
+    chunks = []
+
+    while True:
+        try:
+            chunk = p.stdout.read(4096)
+            if not chunk:
+                break
+            chunks.append(chunk)
+        except IOError, ex:
+            if ex[0] != errno.EAGAIN:
+                raise
+            sys.exc_clear()
+        socket.wait_read(p.stdout.fileno())
+
+    p.stdout.close()
+    return ''.join(chunks)

File setup.py

View file
+from distutils.core import setup
+
+setup(
+    name='proxytools',
+    version='0.01',
+    url='http://bitbucket.org/vgavro/proxytools/',
+    author='Victor Gavro',
+    author_email='vgavro@gmail.com',
+    description='Utilities for work with free proxy servers (ProxyFetcher, '
+                'ProxyList, WSGI proxy server distributor) based on gevent.',
+    packages=['proxytools', 'proxytools.fetchers'],
+    package_data={'proxytools': ['user-agents.txt']},
+    install_requires=['gevent >= 0.13'],
+    scripts=['proxytools/superproxy.py', 'proxytools/proxyfetcher.py']
+)

File superproxy.py

-#!/usr/bin/python
-if __name__ == '__main__':
-    from gevent import monkey; monkey.patch_all()
-
-import httplib
-import urlparse
-import logging
-import base64
-
-import webob
-import gevent
-from gevent.pool import Pool
-from gevent.queue import Queue, Empty
-
-log = logging.getLogger('superproxy')
-
-USER_AGENT_RANDOM_VALUE = 'random'
-#X-Superproxy-Worker-Timeout: 10
-#X-Superproxy-Workers-Count: 1
-#X-Superproxy-Require-Status: 200; 301; 302
-#X-Superproxy-Require-Content: base64(str1) and base64(str2) or base64(str1) and base64(str3)
-#X-Superproxy-Priority: 1
-#X-Superproxy-Parameters: 'method=best; ip=234.234.267.87'
-#X-Superproxy-Client: 'google-positions'
-#
-#response
-#X-Superproxy-Proxy
-#X-Superproxy-Meta
-
-import browser
-from proxylist import ProxyList
-
-class RequireContentError(browser.ProxyError):
-    pass
-
-class RequireStatusError(browser.ProxyError):
-    pass
-
-
-class SuperProxyRequest(object):
-    #Request default options
-    timeout = 60
-    worker_timeout = 15
-    workers_count = 2
-    require_status = [200, 301, 302]
-    require_content = []
-    priority = 1
-    parameters = {}
-    client = 'Unknown'
-
-    def __init__(self, path, body, headers):
-
-        self.path, self.body = path, body
-
-        #Configuring options from headers
-        for header, value in headers.items():
-            if header.lower().startswith('x-superproxy-'):
-                opt = header[13:].lower()
-                try:
-                    setattr(self, opt, getattr(self, 'parse_%s' % opt)(value))
-                except AttributeError:
-                    raise ValueError('Option %s not defined.' % opt)
-                del headers[header]
-        self.headers = headers
-
-    parse_worker_timeout = staticmethod(int)
-    parse_workers_count = staticmethod(int)
-    parse_priority = staticmethod(int)
-    parse_client = staticmethod(str)
-    parse_require_status = staticmethod(lambda x: [int(v.strip()) for v
-                                                   in x.split(';') if v.strip()])
-
-    @staticmethod
-    def parse_require_content(value):
-        result = []
-        condition = []
-        for val in value.split():
-            if val == 'or':
-                result.append(condition)
-                condition = []
-            elif val != 'and':
-                condition.append(base64.urlsafe_b64decode(val))
-        if condition:
-            result.append(condition)
-        return result
-
-    @staticmethod
-    def parse_parameters(value):
-        result = {}
-        for value_ in value.strip().split(';'):
-            key, val = value_.split('=')
-            result[key.strip()] = val.strip()
-        return result
-
-    def validate_status(self, status):
-        if not status in self.require_status:
-            raise RequireStatusError('Status %s not in %s'
-                                      % (status, self.require_status))
-
-    def validate_content(self, content):
-        if self.require_content:
-            for condition in self.require_content:
-                for value in condition:
-                    if not value in content:
-                        break
-                else:
-                    return
-            raise RequireContentError('Required content not found')
-
-    def __call__(self, pool, proxylist):
-        queue = Queue()
-        workers = []
-        try:
-            for _ in range(self.workers_count):
-                # Blocks if pool is full, even if first worker is spawned
-                # and already got result. To cancel spawning next worker
-                # if pool is full and we already got result use code below:
-                #if workers and not pool.free_count() and not self.queue.empty():
-                #        break
-                workers.append(pool.spawn(self.worker, queue, proxylist))
-
-            resp, content, proxy = queue.get(timeout=self.timeout)
-            #this raises gevent.queue.Empty on timeout
-        finally:
-            log.debug('Killing workers for %s' % self.path)
-            for worker in workers:
-                worker.kill()
-
-        drop_headers = ('content-encoding', 'transfer-encoding', 'content-length')
-        headers = [(k, v) for k, v in resp.headers.items() if k not in drop_headers]
-        headers += [('X-Superproxy-Proxy', proxy.url),
-                    ('X-Superproxy-Meta', '; '.join(['%s=%s' % (k,v) for k, v
-                                                     in proxy.meta.items()])),
-        ]
-        return '%s %s' % (resp.code, resp.msg), headers, content
-
-    def worker(self, queue, proxylist):
-        while True:
-            with proxylist.best_proxy_context() as proxy:
-                log.debug('Trying %s using %s' % (self.path, proxy))
-                resp = browser.urlopen(self.path, headers=dict(self.headers),
-                                       proxy=proxy, gzip=True,
-                                       timeout=self.worker_timeout)
-                self.validate_status(resp.code)
-                content = resp.read()
-                self.validate_content(content)
-                #print '---------'
-                #print content
-                #print '---------'
-                log.debug('Response recieved for %s by %s' % (self.path, proxy))
-                queue.put((resp, content, proxy))
-                return
-
-
-class SuperProxy(object):
-    def __init__(self, timeout=60, pool_size=50, **kwargs):
-        self.timeout = timeout
-        self.proxylist = ProxyList(**kwargs)
-        self.workers_pool = Pool(pool_size)
-        self.requests = []
-
-    def __call__(self, env, start_response):
-        req = webob.Request(env)
-
-        if req.path_info == '/':
-            return self.statistics(req, start_response)
-
-        if not req.path_info.startswith('http://'):
-            return self.deny_request(req, start_response, '501 Not Implemented')
-
-        return self.request(req, start_response)
-
-    def deny_request(self, req, start_response, response='503 Service Unavailable'):
-        start_response(response, [])
-        return [response]
-
-    def request(self, req, start_response):
-        path = req.path_info
-        if req.query_string:
-            path += '?' + req.query_string
-
-        headers = req.headers
-        log.debug('Request from %s to %s', req.remote_addr, path)
-
-        r = SuperProxyRequest(path, req.body, headers)
-        self.requests.append(r)
-        try:
-            status_line, headers, content = r(self.workers_pool, self.proxylist)
-        except Empty:
-            return self.deny_request(req, start_response, '504 Gateway Timeout')
-        finally:
-            self.requests.remove(r)
-
-        start_response(status_line, headers)
-        return [content]
-
-    def statistics(self, req, start_response):
-        #TODO make this usable and use jinja2 template
-        result = '<table><tr><td>url</td><td>in_use</td><td>failed</td><td>sucess_time</td><td>fail_time</td><td>meta</td></tr>'
-        for p in self.proxylist.proxies:
-            result += '<tr><td>%s</td> <td>%s</td><td>%s</td><td>%s</td><td>%s</td><td>%s<td/></tr>' % (p.url,
-                '<b>%s</b>' % p.in_use if p.in_use else p.in_use,
-                '<font color="red">%s</font>' % p.failed if p.failed > 0 else '<font color="green">%s</font>' % p.failed, p.success_time, p.fail_time, p.meta)
-        result += '</table>'
-        start_response('200 OK', [('Content-Type', 'text/html')])
-        return ['<html>Cool statistics here: Free slots: %s/%s<br/>%s</html>' %
-            (self.pool.free_count(), self.pool.size, result)
-        ]
-
-
-def main(interface='0.0.0.0', port=8088, **settings):
-    from gevent.pywsgi import WSGIServer
-    app = SuperProxy(**settings)
-    server = WSGIServer(
-        (interface, port),
-        app,
-    )
-    log.info('Serving on %s:%s...' % (interface, port))
-    try:
-        server.serve_forever()
-    finally:
-        app.proxylist.save_status()
-
-if __name__ == '__main__':
-    import sys
-    logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(name)s %(message)s', stream=sys.stdout)
-    main(fetchers={'hideme_ru': {}, 'xroxy_com': {'end_page': 10}}, status_filename='proxylist_status')

File user-agents.txt

-Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.9.0.6) Gecko/2009011913 Firefox/3.0.6
-Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10.5; en-US; rv:1.9.0.6) Gecko/2009011912 Firefox/3.0.6
-Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.9.0.6) Gecko/2009011913 Firefox/3.0.6 (.NET CLR 3.5.30729)
-Mozilla/5.0 (X11; U; Linux i686; en-US; rv:1.9.0.6) Gecko/2009020911 Ubuntu/8.10 (intrepid) Firefox/3.0.6
-Mozilla/5.0 (Windows; U; Windows NT 6.0; en-US; rv:1.9.0.6) Gecko/2009011913 Firefox/3.0.6
-Mozilla/5.0 (Windows; U; Windows NT 6.0; en-US; rv:1.9.0.6) Gecko/2009011913 Firefox/3.0.6 (.NET CLR 3.5.30729)
-Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US) AppleWebKit/525.19 (KHTML, like Gecko) Chrome/1.0.154.48 Safari/525.19
-Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; .NET CLR 1.1.4322; .NET CLR 2.0.50727; .NET CLR 3.0.04506.30; .NET CLR 3.0.04506.648)
-Mozilla/5.0 (X11; U; Linux x86_64; en-US; rv:1.9.0.6) Gecko/2009020911 Ubuntu/8.10 (intrepid) Firefox/3.0.6
-Mozilla/5.0 (X11; U; Linux i686; en-US; rv:1.9.0.5) Gecko/2008121621 Ubuntu/8.04 (hardy) Firefox/3.0.5
-Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10_5_6; en-us) AppleWebKit/525.27.1 (KHTML, like Gecko) Version/3.2.1 Safari/525.27.1
-Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; .NET CLR 1.1.4322)
-Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; .NET CLR 2.0.50727)
-Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1)

File utils.py

-import gevent
-from gevent import socket
-
-import subprocess
-import errno
-import sys
-import os
-import fcntl
-import re
-
-IP_REGEXP = '(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)'
-IP_PORT_REGEXP = '%s:\d+' % IP_REGEXP
-ip_regexp = re.compile('(%s)' % IP_REGEXP)
-ip_port_regexp = re.compile('(%s)' % IP_PORT_REGEXP)
-
-def async_popen_communicate(args, data='', shell=False):
-    """Communicate with the process non-blockingly."""
-    """An example on how to communicate with a subprocess.
-
-    Written by Marcus Cavanaugh.
-    See http://groups.google.com/group/gevent/browse_thread/thread/7fca7230db0509f6
-    where it was first posted.
-    """
-    p = subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=shell)
-    fcntl.fcntl(p.stdin, fcntl.F_SETFL, os.O_NONBLOCK)  # make the file nonblocking
-    fcntl.fcntl(p.stdout, fcntl.F_SETFL, os.O_NONBLOCK)  # make the file nonblocking
-
-    bytes_total = len(data)
-    bytes_written = 0
-    while bytes_written < bytes_total:
-        try:
-            # p.stdin.write() doesn't return anything, so use os.write.
-            bytes_written += os.write(p.stdin.fileno(), data[bytes_written:])
-        except IOError, ex:
-            if ex[0] != errno.EAGAIN:
-                raise
-            sys.exc_clear()
-        socket.wait_write(p.stdin.fileno())
-
-    p.stdin.close()
-
-    chunks = []
-
-    while True:
-        try:
-            chunk = p.stdout.read(4096)
-            if not chunk:
-                break
-            chunks.append(chunk)
-        except IOError, ex:
-            if ex[0] != errno.EAGAIN:
-                raise
-            sys.exc_clear()
-        socket.wait_read(p.stdout.fileno())
-
-    p.stdout.close()
-    return ''.join(chunks)