Commits

Evgeniy Tatarkin committed fa00826

processing requests through queue, pomp may be used in distributed systems

Comments (0)

Files changed (4)

 
 Not released yet
 
+- processing requests through queue
 - better generator usage
 
 

pomp/core/base.py

     All this class must be subclassed
 """
 import logging
-
 import defer
 
 
 
     - ``depth first`` is pomp.core.base.CRAWL_DEPTH_FIRST_METHOD (default)
     - ``width first`` is pomp.core.base.CRAWL_WIDTH_FIRST_METHOD
+
+    :note:
+        If engine used queue for requests processing
+        the ```CRAWL_METHOD``` is ignored. And ```ENTRY_REQUESTS```
+        not required
     """
     ENTRY_REQUESTS = None
     CRAWL_METHOD = CRAWL_DEPTH_FIRST_METHOD
         return self._in_process != 0
 
 
+class BaseQueue(object):
+    """Queue interface
+
+    request is the task
+    """
+
+    def get_requests(self):
+        """Blocking get from queue
+
+        :rtype: instance of :class:`BaseRequest` or list of them
+        """
+        raise NotImplementedError()
+
+    def put_requests(self, requests):
+        """Blocking put to queue
+
+        :param requests: instance of :class:`BaseRequest` or list of them
+        """
+        raise NotImplementedError()
+
+
 class BaseDownloader(object):
     """Downloader interface
 
         self.response_middlewares.reverse()
 
     def process(self, urls, callback, crawler):
-        return self._lazy_process_async(urls, callback, crawler)
-
-    def _lazy_process_async(self, urls, callback, crawler):
         if not urls:
             return
 

pomp/core/engine.py

 import logging
 
 import defer
+
 from pomp.core.utils import iterator, DeferredList
 
 
     :param downloader: :class:`pomp.core.base.BaseDownloader`
     :param pipelines: list of item pipelines
                       :class:`pomp.core.base.BasePipeline`
+    :param queue: instance of :class:`pomp.core.base.BaseQueue`
+    :param stop_on_empty_queue: stop processing if queue is empty
     """
 
-    def __init__(self, downloader, pipelines=None):
+    def __init__(
+            self, downloader, pipelines=None,
+            queue=None, stop_on_empty_queue=True):
 
         self.downloader = downloader
         self.pipelines = pipelines or tuple()
+        self.queue = queue
+        self.stop_on_empty_queue = stop_on_empty_queue
 
     def response_callback(self, crawler, response):
 
                 [pipe.process(crawler, i) for i in items],
             )
 
-        urls = crawler.next_requests(response)
-        if crawler.is_depth_first():
-            if urls:
-                self.downloader.process(
-                    iterator(urls),
-                    self.response_callback,
-                    crawler
-                )
+        # get next requests
+        next_requests = crawler.next_requests(response)
+
+        if self.queue:  # return request for queue get/put loop
+            return next_requests
+        else:  # execute requests by `witdh first` or `depth first` methods
+            if crawler.is_depth_first():
+                if next_requests:
+                    self.downloader.process(
+                        iterator(next_requests),
+                        self.response_callback,
+                        crawler
+                    )
+                else:
+                    if not self.stoped and not crawler.in_process():
+                        self._stop(crawler)
+
+                return None  # end of recursion
             else:
-                if not self.stoped and not crawler.in_process():
-                    self._stop(crawler)
-
-            return None  # end of recursion
-        else:
-            return urls
+                return next_requests
 
     def pump(self, crawler):
         """Start crawling
 
         self.stop_deferred = defer.Deferred()
 
-        next_requests = self.downloader.process(
-            iterator(crawler.ENTRY_REQUESTS),
-            self.response_callback,
-            crawler
-        )
+        next_requests = getattr(crawler, 'ENTRY_REQUESTS', None)
 
-        if not crawler.is_depth_first():
-            self._call_next_requests(next_requests, crawler)
-        else:
-            # is width first method
-            # execute generator
-            if isinstance(next_requests, types.GeneratorType):
-                list(next_requests)  # fire generator
+        # process ENTRY_REQUESTS
+        if next_requests:
+            next_requests = self.downloader.process(
+                iterator(crawler.ENTRY_REQUESTS),
+                self.response_callback,
+                crawler
+            )
+
+        if self.queue:
+
+            # if ENTRY_REQUESTS produce new requests
+            # put them to queue
+            if next_requests:
+                next_requests = list(next_requests)
+                self.queue.put_requests(
+                    list(filter(None, iterator(next_requests)))  # fire
+                )
+
+            # queue processing loop
+            while True:
+
+                # wait requests from queue
+                try:
+                    requests = self.queue.get_requests()
+                except Exception:
+                    log.exception('On get requests from %s', self.queue)
+                    raise
+
+                if self.stop_on_empty_queue and not requests:
+                    log.info('Queue empty - STOP')
+                    break
+
+                # process request from queue
+                next_requests = self.downloader.process(
+                    iterator(requests),
+                    self.response_callback,
+                    crawler
+                )
+
+                # put new requests to queue
+                self.queue.put_requests(
+                    list(filter(None, iterator(next_requests)))  # fire
+                )
+
+        else:  # recursive process
+            if not crawler.is_depth_first():
+                self._call_next_requests(next_requests, crawler)
+            else:
+                # is width first method
+                # execute generator
+                if isinstance(next_requests, types.GeneratorType):
+                    list(next_requests)  # fire generator
         return self.stop_deferred
 
     def _call_next_requests(self, next_requests, crawler):

tests/test_simple_crawler.py

 import logging
 from nose.tools import assert_equal
-from pomp.core.base import BasePipeline
+from pomp.core.base import BasePipeline, BaseQueue
 from pomp.core.base import CRAWL_WIDTH_FIRST_METHOD
 from pomp.core.engine import Pomp
 
         return result
 
 
+class RoadPipeline(BasePipeline):
+
+    def __init__(self):
+        self.collection = []
+
+    def process(self, crawler, item):
+        self.collection.append(item)
+        return item
+
+    def reset(self):
+        self.collection = []
+
+
 class TestSimplerCrawler(object):
 
     def test_crawler_dive_methods(self):
-        class RoadPipeline(BasePipeline):
-
-            def __init__(self):
-                self.collection = []
-
-            def process(self, crawler, item):
-                self.collection.append(item)
-                return item
-
-            def reset(self):
-                self.collection = []
-
         road = RoadPipeline()
 
         pomp = Pomp(
             ('http://python.org/1', 2),
             ('http://python.org/2', 2),
         ])
+
+    def test_queue_crawler(self):
+        road = RoadPipeline()
+
+        class SimpleQueue(BaseQueue):
+
+            def __init__(self):
+                self.requests = []
+
+            def get_requests(self):
+                try:
+                    return self.requests.pop()
+                except IndexError:
+                    return  # empty queue
+
+            def put_requests(self, request):
+                self.requests.append(request)
+
+        queue = SimpleQueue()
+
+        pomp = Pomp(
+            downloader=DummyDownloader(middlewares=[url_to_request_middl]),
+            pipelines=[
+                road,
+            ],
+            queue=queue,
+            stop_on_empty_queue=True,
+        )
+
+        class DummyWidthCrawler(Crawler):
+            CRAWL_METHOD = CRAWL_WIDTH_FIRST_METHOD
+
+        pomp.pump(DummyWidthCrawler())
+
+        assert_equal(set([item.url for item in road.collection]), set([
+            'http://python.org/1',
+            'http://python.org/1/trash',
+            'http://python.org/2',
+        ]))