Commits

Evgeniy Tatarkin committed 190d229

add pipelines

  • Participants
  • Parent commits 4becf06

Comments (0)

Files changed (3)

File pomp/core/base.py

         raise NotImplementedError()
 
     def process(self, url, page):
-        self.extract_items(url, page)
+        return self.extract_items(url, page)
 
     def extract_items(self, url, page):
         raise NotImplementedError()
 
     def get(self, url):
         raise NotImplementedError()
+
+
+class BasePipeline(object):
+
+    def start(self):
+        pass
+
+    def process(self, item):
+        raise NotImplementedError()
+
+    def stop(self):
+        pass

File pomp/core/engine.py

 
 class Pomp(object):
 
-    def __init__(self, downloader):
+    def __init__(self, downloader, pipelines=None):
 
         self.downloader = downloader
+        self.pipelines = pipelines or tuple()
+        self.collector = []
+
+    def collect(self, items):
+        self.collector += items
 
     def response_callback(self, crawler, url, page):
 
-        crawler.process(url, page)
+        items = crawler.process(url, page)
+
+        if items:
+            for pipe in self.pipelines:
+                items = filter(None, map(pipe.process, items))
+
+        self.collect(items)
 
         urls = crawler.next_url(page)
 
 
     def pump(self, crawler):
 
-        self.downloader.process(
-            iterator(crawler.ENTRY_URL),
-            self.response_callback,
-            crawler
-        )
+        map(lambda pipe: pipe.start(), self.pipelines)
+
+        try:
+            self.downloader.process(
+                iterator(crawler.ENTRY_URL),
+                self.response_callback,
+                crawler
+            )
+
+            return self.collector
+        finally:
+            map(lambda pipe: pipe.stop(), self.pipelines)

File tests/test_simple_crawler.py

-from pomp.core.base import BaseCrawler, BaseDownloader
+from pomp.core.base import BaseCrawler, BaseDownloader, BasePipeline
 from pomp.core.engine import Pomp
 from pomp.core.item import Item, Field
 
     def test_crawler(self):
 
         class DummyItem(Item):
-            f1 = Field()
-            f2 = Field()
-            f3 = Field()
+            value = Field()
+            url = Field()
+
+            def __repr__(self):
+                return '<DummyItem(%s, %s)>' % (self.url, self.value)
 
         class DummyCrawler(BaseCrawler):
             ENTRY_URL = (
                 self.crawled_urls = []
 
             def next_url(self, page):
-                url = 'http://python.org/1/1'
+                url = 'http://python.org/1/trash'
                 return url if url not in self.crawled_urls else None
 
             def extract_items(self, url, page):
                 self.crawled_urls.append(url)
                 item = DummyItem()
-                item.f1 = '1'
-                item.f2 = '2'
-                item.f3 = '3'
+                item.value = 1
+                item.url = url
                 return [item]
 
         class DummyDownloader(BaseDownloader):
             def get(slef, url):
                 return '<html><head></head><body></body></html>'
 
-        pomp = Pomp(downloader=DummyDownloader())
 
-        pomp.pump(DummyCrawler())
+        class IncPipeline(BasePipeline):
+
+            def process(self, item):
+                item.value += 1
+                return item
+
+        class FilterPipeline(BasePipeline):
+
+            def process(self, item):
+                if 'trash' in item.url:
+                    return None
+                return item
+
+        pomp = Pomp(
+            downloader=DummyDownloader(),
+            pipelines=[IncPipeline(), FilterPipeline()],
+        )
+
+        result = pomp.pump(DummyCrawler())
+
+        assert len(result) == 2