Commits

Evgeniy Tatarkin committed 6f0c954

base request/response objects

Comments (0)

Files changed (6)

examples/pythonnews.py

 import logging
 from pomp.core.base import BaseCrawler, BasePipeline
 from pomp.core.item import Item, Field
-from pomp.contrib import SimpleDownloader
+from pomp.contrib import SimpleDownloader, UrllibAdapterMiddleware
 
 
 logging.basicConfig(level=logging.DEBUG)
 class PythonNewsCrawler(BaseCrawler):
     ENTRY_URL = 'http://python.org/news/'
 
-    def extract_items(self, url, page):
-        page = page.decode('utf-8')
-        for i in news_re.findall(page):
+    def extract_items(self, response):
+        response.body = response.body.decode('utf-8')
+        for i in news_re.findall(response.body):
             item = PythonNewsItem()
             item.title, item.published = i[0], i[2]
             yield item
 
-    def next_url(self, page):
+    def next_url(self, response):
         return None # one page crawler
 
 
     from pomp.core.engine import Pomp
 
     pomp = Pomp(
-        downloader=SimpleDownloader(),
+        downloader=SimpleDownloader(middlewares=[UrllibAdapterMiddleware()]),
         pipelines=[PrintPipeline()],
     )
 

pomp/contrib/__init__.py

     from urllib2 import urlopen
 
 from multiprocessing.pool import ThreadPool
-from pomp.core.base import BaseDownloader
+from pomp.core.base import BaseDownloader, BaseHttpRequest, \
+    BaseHttpResponse, BaseDownloaderMiddleware
 from pomp.core.utils import iterator
 
 
-def fetch(url, timeout):
-    return urlopen(url, timeout=timeout).read()
-
-
 class SimpleDownloader(BaseDownloader):
 
     TIMEOUT = 5
 
-    def get(self, urls):
+    def get(self, requests):
         responses = []
-        for url in iterator(urls):
-            response = fetch(url, timeout=self.TIMEOUT)
-            responses.append((url, response))
+        for request in iterator(requests):
+            response = self._fetch(request)
+            responses.append(response)
         return responses
 
+    def _fetch(self, request, timeout=TIMEOUT):
+        res = urlopen(request.url, timeout=timeout)
+        return UrllibHttpResponse(request, res)  
 
-class ThreadedDownloader(BaseDownloader):
 
-    TIMEOUT = 5
+class ThreadedDownloader(SimpleDownloader):
 
     def __init__(self, pool_size=5, *args, **kwargs):
         self.workers_pool = ThreadPool(processes=pool_size)
         super(ThreadedDownloader, self).__init__(*args, **kwargs)
+    
+    def get(self, requests):
+        return self.workers_pool.map(self._fetch, requests)
 
-    def fetch(self, url):
-        return fetch(url, timeout=self.TIMEOUT)
-    
-    def get(self, urls):
-        return zip(urls, self.workers_pool.map(self.fetch, urls))
+
+class UrllibHttpRequest(BaseHttpRequest):
+    pass
+
+
+class UrllibHttpResponse(BaseHttpResponse):
+
+    def __init__(self, request, urlopen_res):
+        self.urlopen_res = urlopen_res
+        self.request = request
+        self.body = self.urlopen_res.read()
+
+
+class UrllibAdapterMiddleware(BaseDownloaderMiddleware):
+
+    def process_request(self, req):
+        if isinstance(req, BaseHttpRequest):
+            return req
+        return UrllibHttpRequest(url=req)
+
+    def process_response(self, response):
+        return response
     def next_url(self, page):
         raise NotImplementedError()
 
-    def process(self, url, page):
-        return self.extract_items(url, page)
+    def process(self, response):
+        return self.extract_items(response)
 
     def extract_items(self, url, page):
         raise NotImplementedError()
         if not urls:
             return
 
-        _urls = []
-        for url in urls:
+        requests = []
+        for request in urls:
 
             for middleware in self.middlewares:
-                url = middleware.process_request(url)
-                if not url:
+                request = middleware.process_request(request)
+                if not request:
                     break
 
-            if not url:
+            if not request:
                 continue
 
-            _urls.append(url)
+            requests.append(request)
 
-        if not _urls:
+        if not requests:
             return
 
-        for response in self.get(_urls):
+        for response in self.get(requests):
 
             for middleware in self.middlewares:
-                response = middleware.process_response(*response)
+                response = middleware.process_response(response)
+                if not response:
+                    break
 
             if response:
-                yield callback(crawler, *response)
+                yield callback(crawler, response)
 
     def get(self, url):
         raise NotImplementedError()
 
 class BaseDownloaderMiddleware(object):
 
-    def porcess_request(self, url):
+    def porcess_request(self, request):
         raise NotImplementedError()
  
-    def porcess_response(self, url, page):
+    def porcess_response(self, response):
         raise NotImplementedError() 
+
+
+class BaseHttpRequest(object):
+
+    def __init__(self, *args, **kwargs):
+        for key, value in kwargs.items():
+            setattr(self, key, value)
+
+
+class BaseHttpResponse(object):
+ 
+    def __init__(self, *args, **kwargs):
+        for key, value in kwargs.items():
+            setattr(self, key, value)
         self.downloader = downloader
         self.pipelines = pipelines or tuple()
 
-    def response_callback(self, crawler, url, page):
+    def response_callback(self, crawler, response):
 
-        log.info('Process %s', url)
-        items = crawler.process(url, page)
+        log.info('Process %s', response)
+        items = crawler.process(response)
 
         if items:
             for pipe in self.pipelines:
                 items = filter(None, list(map(pipe.process, items)))
 
-        urls = crawler.next_url(page)
+        urls = crawler.next_url(response)
         if crawler.is_depth_first:
             if urls:
                 self.downloader.process(
             if not crawler.is_depth_first:
                 while True:
                     if not next_urls:
-                        return
+                        break
                     _urls = ()
                     for urls in next_urls:
+                        if not urls:
+                            continue
                         _urls = itertools.chain(
                             _urls,
                             self.downloader.process(

tests/test_simple_crawler.py

 import logging
 from nose.tools import assert_equal
-from pomp.core.base import BaseCrawler, BaseDownloader, BasePipeline
+from pomp.core.base import BaseCrawler, BaseDownloader, BasePipeline, \
+    BaseDownloaderMiddleware, BaseHttpRequest, BaseHttpResponse
 from pomp.core.base import CRAWL_WIDTH_FIRST_METHOD
 from pomp.core.engine import Pomp
 from pomp.core.item import Item, Field
         super(DummyCrawler, self).__init__()
         self.crawled_urls = []
 
-    def next_url(self, page):
+    def next_url(self, response):
         url = 'http://python.org/1/trash'
         result = url if url not in self.crawled_urls else None
         self.crawled_urls.append(url)
         return result
 
-    def extract_items(self, url, page):
+    def extract_items(self, response):
         item = DummyItem()
         item.value = 1
-        item.url = url
+        item.url = response.request.url
         yield item
 
 
 class DummyDownloader(BaseDownloader):
 
-    def get(self, urls):
-        for url in urls:
-            yield (url, '<html><head></head><body></body></html>')
+    def get(self, requests):
+        for request in requests:
+            response = BaseHttpResponse()
+            response.request = request
+            response.body = 'some html code'
+            yield response
+
+
+class UrlToRequestMiddleware(BaseDownloaderMiddleware):
+
+    def process_request(self, req):
+        if isinstance(req, BaseHttpRequest):
+            return req
+        return BaseHttpRequest(url=req)
+
+    def process_response(self, response):
+        return response
 
 
 class TestSimplerCrawler(object):
         road = RoadPipeline()
 
         pomp = Pomp(
-            downloader=DummyDownloader(),
+            downloader=DummyDownloader(middlewares=[UrlToRequestMiddleware()]),
             pipelines=[
                 road,
             ],
         result = []
 
         pomp = Pomp(
-            downloader=DummyDownloader(),
+            downloader=DummyDownloader(middlewares=[UrlToRequestMiddleware()]),
             pipelines=[
                 IncPipeline(),
                 FilterPipeline(),

tests/test_threaded.py

 from nose.tools import assert_set_equal
 from pomp.core.base import BaseCrawler, BaseDownloaderMiddleware
 from pomp.core.engine import Pomp
-from pomp.contrib import ThreadedDownloader
+from pomp.contrib import ThreadedDownloader, UrllibAdapterMiddleware
 from pomp.core.base import CRAWL_WIDTH_FIRST_METHOD
 
 from mockserver import HttpServer, make_sitemap
     def __init__(self):
         super(DummyCrawler, self).__init__()
 
-    def next_url(self, page):
-        return page.get('links', [])
+    def next_url(self, response):
+        return response.body.get('links', [])
 
-    def extract_items(self, url, page):
+    def extract_items(self, response):
         return
 
 
         self.requested_urls = []
         self.prefix_url = prefix_url
     
-    def process_request(self, url):
-        self.requested_urls.append(url)
-        return '%s%s' % (self.prefix_url, url) if self.prefix_url else url
+    def process_request(self, request):
+        self.requested_urls.append(request.url)
+        request.url = '%s%s' % (self.prefix_url, request.url) \
+            if self.prefix_url else request.url
+        return request
     
-    def process_response(self, url, page):
-        return url, json.loads(page.decode('utf-8'))
+    def process_response(self, response):
+        response.body = json.loads(response.body.decode('utf-8'))
+        return response
 
 
 class TestThreadedCrawler(object):
     def test_thread_pooled_downloader(self):
         req_resp_midlleware = RequestResponseMiddleware(prefix_url=self.httpd.location)
         pomp = Pomp(
-            downloader=ThreadedDownloader(middlewares=[req_resp_midlleware]),
+            downloader=ThreadedDownloader(
+                middlewares=[UrllibAdapterMiddleware(), req_resp_midlleware]
+            ),
             pipelines=[],
         )