1. Evgeniy Tatarkin
  2. pomp

Commits

Evgeniy Tatarkin  committed 5842c26

pomp now may be async, add twisted support

  • Participants
  • Parent commits 835d87a
  • Branches default

Comments (0)

Files changed (10)

File pomp/__init__.py

View file
-__version__ = (0, 0, 2, 'dev', 0)
+__version__ = (0, 0, 2, 'dev', 1)

File pomp/contrib/__init__.py

View file
 except ImportError:
     from urllib2 import urlopen, Request
 
-
 import logging
 from multiprocessing.pool import ThreadPool
+
 from pomp.core.base import BaseDownloader, BaseHttpRequest, \
     BaseHttpResponse, BaseDownloaderMiddleware, BaseDownloadException
 from pomp.core.utils import iterator

File pomp/contrib/twistedtools.py

View file
+"""
+Twisted
+```````
+
+Simple downloaders and middlewares for fetching urls by Twisted.
+"""
+
+
+import logging
+import defer as dfr
+
+from pomp.core.base import BaseDownloader, BaseHttpRequest, \
+    BaseHttpResponse
+from pomp.core.utils import iterator
+
+from twisted.internet import defer, protocol
+from twisted.web.client import Agent
+from twisted.web.iweb import IBodyProducer
+from zope.interface import implements
+from twisted.web.http_headers import Headers
+import urllib
+
+
+log = logging.getLogger('pomp.contrib.twisted')
+
+
+class TwistedDownloader(BaseDownloader):
+
+    def __init__(self, reactor, middlewares=None):
+        super(TwistedDownloader, self).__init__(middlewares=middlewares)
+        self.reactor = reactor
+        self.agent = Agent(self.reactor)
+
+    def get(self, requests):
+        responses = []
+        for request in iterator(requests):
+            response = self._fetch(request)
+            responses.append(response)
+        return responses
+
+    def _fetch(self, request):
+        d = self.agent.request(
+            request.method,
+            request.url,
+            request.headers,
+            request.data
+        )
+
+        d.addCallback(self._handle_response)
+
+        res = dfr.Deferred()
+
+        # connect twised with pomp engine
+        # TODO error_backs
+        def _fire(response):
+            res.callback(TwistedHttpResponse(request, response))
+        d.addCallback(_fire)
+
+        return res
+
+    def _handle_response(self, response):
+        if response.code == 204:
+            d = defer.succeed('')
+        else:
+            d = defer.Deferred()
+            response.deliverBody(SimpleReceiver(d))
+        return d
+
+
+class TwistedHttpRequest(BaseHttpRequest):
+
+    def __init__(self, url, data=None, headers=None, method='GET'):
+        self._url = url.encode('utf-8')
+        self.data = StringProducer(urllib.urlencode(data)) if data else None
+        self.headers = Headers(headers)
+        self.method = method
+
+    @property
+    def url(self):
+        return self._url
+
+
+class TwistedHttpResponse(BaseHttpResponse):
+
+    def __init__(self, request, response):
+        self.req = request
+        self.resp = response
+
+        if not isinstance(response, Exception):
+            self.body = self.resp
+
+    @property
+    def request(self):
+        return self.req
+
+    @property
+    def response(self):
+        return self.resp 
+
+
+class SimpleReceiver(protocol.Protocol):
+
+    def __init__(s, d):
+        s.buf = ''; s.d = d
+
+    def dataReceived(s, data):
+        s.buf += data
+
+    def connectionLost(s, reason):
+        # TODO: test if reason is twisted.web.client.ResponseDone, if not, do an errback
+        s.d.callback(s.buf) 
+
+
+class StringProducer(object):
+    implements(IBodyProducer)
+
+    def __init__(self, body):
+        self.body = body
+        self.length = len(body)
+
+    def startProducing(self, consumer):
+        consumer.write(self.body)
+        return defer.succeed(None)
+
+    def pauseProducing(self):
+        pass
+
+    def stopProducing(self):
+        pass

File pomp/core/base.py

View file
 
     All this class must be overridden
 """
+import defer
+
+
 CRAWL_DEPTH_FIRST_METHOD = 'depth'
 CRAWL_WIDTH_FIRST_METHOD = 'width'
 
     ENTRY_URL = None
     CRAWL_METHOD = CRAWL_DEPTH_FIRST_METHOD
 
+    def __init__(self):
+        self._in_process = 0
+
     def next_url(self, page):
         """Getting next urls for processing.
  
         """    
         raise NotImplementedError()
 
-    @property
     def is_depth_first(self):
         return self.CRAWL_METHOD == CRAWL_DEPTH_FIRST_METHOD
 
+    def dive(self, value=1):
+        self._in_process += value
+
+    def _reset_state(self):
+        self._in_process = 0
+
+    def in_process(self):
+        return self._in_process != 0
+
 
 class BaseDownloader(object):
     """Downloader interface
         self.response_middlewares.reverse()
 
     def process(self, urls, callback, crawler):
+        return list(self._lazy_process_async(urls, callback, crawler))
+
+    def _lazy_process_async(self, urls, callback, crawler):
+        if not urls:
+            return
+
+        requests = []
+        for request in urls:
+
+            for middleware in self.request_middlewares:
+                request = middleware.process_request(request)
+                if not request:
+                    break
+
+            if not request:
+                continue
+
+            requests.append(request)
+
+        if not requests:
+            return
+
+        def _process_resp(response):
+            for middleware in self.response_middlewares:
+                response = middleware.process_response(response)
+                if not response:
+                    break
+
+            if response:
+                return callback(crawler, response)
+
+        crawler.dive(len(requests)) # dive in
+        for response in self.get(requests):
+            if isinstance(response, defer.Deferred): # async behavior
+                def _(res):
+                    crawler.dive(-1) # dive out
+                    return res
+                response.add_callback(_)
+                response.add_callback(_process_resp)
+                yield response
+            else: # sync behavior
+                crawler.dive(-1) # dive out
+                yield _process_resp(response)
+
+    def process_sync(self, urls, callback, crawler):
         # start downloading and processing
         return list(self._lazy_process(urls, callback, crawler))
 
-    def _lazy_process(self, urls, callback, crawler):
+    def _lazy_process_sync(self, urls, callback, crawler):
 
         if not urls:
             return

File pomp/core/engine.py

View file
 """
 import logging
 import itertools
-from pomp.core.utils import iterator
+
+import defer
+
+from pomp.core.utils import iterator, DeferredList
 
 
 log = logging.getLogger('pomp.engine')
         log.info('Process %s', response)
         items = crawler.process(response)
 
-
         if items:
             for pipe in self.pipelines:
                 items = filter(None, list(
                 ))
 
         urls = crawler.next_url(response)
-        if crawler.is_depth_first:
+        if crawler.is_depth_first():
             if urls:
                 self.downloader.process(
                     iterator(urls),
                     self.response_callback,
                     crawler
                 )
+            else:
+                if not self.stoped and not crawler.in_process():
+                    self._stop(crawler)
 
             return None # end of recursion
         else:
         log.info('Prepare downloader: %s', self.downloader)
         self.downloader.prepare()
 
+        self.stoped = False
+        crawler._reset_state()
+
         log.info('Start crawler: %s', crawler)
 
         for pipe in self.pipelines:
             log.info('Start pipe: %s', pipe)
             pipe.start()
 
-        try:
-            next_urls = self.downloader.process(
-                iterator(crawler.ENTRY_URL),
+        self.stop_deferred = defer.Deferred()
+
+        next_urls = self.downloader.process(
+            iterator(crawler.ENTRY_URL),
+            self.response_callback,
+            crawler
+        )
+
+        if not crawler.is_depth_first():
+            self._call_next_urls(next_urls, crawler)
+        return self.stop_deferred
+
+    def _call_next_urls(self, next_urls, crawler):
+        deferreds = [n for n in next_urls if n and isinstance(n, defer.Deferred)]
+        if deferreds: # async behavior
+            d = DeferredList(deferreds)
+            d.add_callback(self._on_next_urls, crawler)
+        else: # sync behavior
+            self._on_next_urls(next_urls, crawler)
+
+    def _on_next_urls(self, next_urls, crawler):
+        for urls in next_urls:
+
+            if not urls:
+                continue
+
+            _urls = self.downloader.process(
+                iterator(urls),
                 self.response_callback,
                 crawler
             )
 
-            if not crawler.is_depth_first:
-                while True:
-                    if not next_urls:
-                        break
-                    _urls = ()
-                    for urls in next_urls:
-                        if not urls:
-                            continue
-                        _urls = itertools.chain(
-                            _urls,
-                            self.downloader.process(
-                                iterator(urls),
-                                self.response_callback,
-                                crawler
-                            )
-                        )
-                    next_urls = _urls
+            self._call_next_urls(_urls, crawler)
 
-        finally:
+        if not self.stoped and not crawler.in_process():
+            self._stop(crawler)
 
-            for pipe in self.pipelines:
-                log.info('Stop pipe: %s', pipe)
-                pipe.stop() 
-
+    def _stop(self, crawler):
+        self.stoped = True
+        for pipe in self.pipelines:
+            log.info('Stop pipe: %s', pipe)
+            pipe.stop()
 
         log.info('Stop crawler: %s', crawler)
+        self.stop_deferred.callback(None) 

File pomp/core/utils.py

View file
 import types
+import defer
 
 
 def iterator(var):
         return isinstance(obj, basestring)
     except NameError:
         return isinstance(obj, str)
+
+
+class DeferredList(defer.Deferred):
+
+    def __init__(self, deferredList):
+        """Initialize a DeferredList"""
+        self.result_list = [None] * len(deferredList)
+        super(DeferredList, self).__init__()
+
+        self.finished_count = 0
+
+        for index, deferred in enumerate(deferredList):
+            deferred.add_callbacks(
+                self._cb_deferred,
+                self._cb_deferred,
+                callback_args=(index,),
+                errback_args=(index,)
+            )
+
+    def _cb_deferred(self, result, index):
+        """(internal) Callback for when one of my deferreds fires.
+        """
+        self.result_list[index] = result
+
+        self.finished_count += 1
+        if self.finished_count == len(self.result_list):
+            self.callback(self.result_list)
+
+        return result

File setup.py

View file
     tests_require=['nose >= 1.0',],
     test_suite='nose.collector',
     setup_requires=['versiontools >= 1.8',],
+    install_requires=['defer',],
     classifiers=[
         'Intended Audience :: Developers',
         'License :: OSI Approved :: BSD License',

File tests/mockserver.py

View file
 log = logging.getLogger('mockserver')
 
 
+# TODO silent server
 def simple_app(environ, start_response):
     setup_testing_defaults(environ)
 
         log.debug('Stop http server: %s', self)
         self.process.terminate()
         self.process.join()
+        self.httpd.server_close()

File tests/test_contrib_twisted.py

View file
+import json
+import logging
+from nose import SkipTest
+from nose.tools import assert_set_equal
+
+try:
+    from nose.twistedtools import reactor, stop_reactor, deferred
+    from twisted.internet import defer
+except ImportError:
+    raise SkipTest('twisted not installed')
+
+from pomp.core.base import BaseCrawler, BaseDownloaderMiddleware, BasePipeline
+from pomp.core.engine import Pomp
+from pomp.contrib.twistedtools import TwistedDownloader, TwistedHttpRequest
+from pomp.core.base import CRAWL_WIDTH_FIRST_METHOD
+from pomp.core.item import Item, Field
+
+from mockserver import HttpServer, make_sitemap
+
+logging.basicConfig(level=logging.DEBUG)
+
+
+class DummyItem(Item):
+    value = Field()
+    url = Field()
+
+    def __repr__(self):
+        return '<DummyItem(%s, %s)>' % (self.url, self.value)
+
+
+class DummyCrawler(BaseCrawler):
+    ENTRY_URL = None
+    CRAWL_METHOD = CRAWL_WIDTH_FIRST_METHOD
+
+    def __init__(self):
+        super(DummyCrawler, self).__init__()
+
+    def next_url(self, response):
+        res = response.body.get('links', [])
+        return res
+
+    def extract_items(self, response):
+        item = DummyItem()
+        item.value = 1
+        item.url = response.request.url
+        yield item 
+
+
+class RequestResponseMiddleware(BaseDownloaderMiddleware):
+
+    def __init__(self, prefix_url=None):
+        self.requested_urls = []
+        self.prefix_url = prefix_url
+    
+    def process_request(self, url):
+        self.requested_urls.append(url)
+        url = '%s%s' % (self.prefix_url, url) \
+            if self.prefix_url else url
+        return TwistedHttpRequest(url)
+    
+    def process_response(self, response):
+        response.body = json.loads(response.body.decode('utf-8'))
+        return response
+
+
+class PrintPipeline(BasePipeline):
+
+    def process(self, crawler, item):
+        print('Pipeline:', item)
+
+
+class TestContribTiwsted(object):
+
+    @classmethod
+    def setupClass(cls):
+        cls.httpd = HttpServer(sitemap=make_sitemap(level=2, links_on_page=2))
+        cls.httpd.start()
+
+    @classmethod
+    def teardownClass(cls):
+        stop_reactor()
+        cls.httpd.stop()
+
+    @deferred(timeout=1.0)
+    def test_downloader(self):
+
+        req_resp_midlleware = RequestResponseMiddleware(prefix_url=self.httpd.location)
+        downloader = TwistedDownloader(reactor)
+
+        downloader.middlewares.insert(0, req_resp_midlleware)
+
+        pomp = Pomp(
+            downloader=downloader,
+            pipelines=[PrintPipeline()],
+        )
+
+        DummyCrawler.ENTRY_URL = '/root'
+
+        done_defer = defer.Deferred()
+        d = pomp.pump(DummyCrawler())
+
+        d.add_callback(done_defer.callback)
+
+        def check(x):
+            assert_set_equal(
+                set(req_resp_midlleware.requested_urls),
+                set(self.httpd.sitemap.keys())
+            )
+
+        done_defer.addCallback(check)
+        return done_defer

File tests/test_contrib_urllib.py

View file
             set(self.httpd.sitemap.keys())
         )
 
-
     def test_exception_handling(self):
 
         class CatchException(BaseDownloaderMiddleware):