Commits

Gregory Petukhov committed 94bdea1

Cookie support in Kit transport

Comments (0)

Files changed (7)

grab/kit/__init__.py

 from PyQt4.QtCore import QEventLoop, QUrl, QEventLoop, QTimer
 from PyQt4.QtGui import QApplication
 from PyQt4.QtWebKit import QWebView, QWebPage
-from PyQt4.QtNetwork import (QNetworkAccessManager, QNetworkReply, QNetworkRequest,
-                             QNetworkCookieJar)
+from PyQt4.QtNetwork import QNetworkRequest, QNetworkCookieJar, QNetworkCookie
 from lxml.html import fromstring
 from grab.selector import Selector
 from grab.response import Response
 import logging
+from urlparse import urlsplit
+
 from grab.kit.network_access_manager import KitNetworkAccessManager
 from grab.kit.network_reply import KitNetworkReply
 from grab.kit.error import KitError
 
 logger = logging.getLogger('grab.kit')
 
+class Resource(object):
+    def __init__(self, reply):
+        self.reply = reply
+        self.url = str(reply.url().toString())
+
+        self.status_code = reply.attribute(QNetworkRequest.HttpStatusCodeAttribute)\
+                                .toInt()[0]
+        self.headers = {}
+        for header in reply.rawHeaderList():
+            self.headers[header.data()] = reply.rawHeader(header).data()
+
+
 class KitWebView(QWebView):
     def setApplication(self, app):
         self.app = app
         self.app = QApplication(sys.argv)
 
         manager = KitNetworkAccessManager()
-        manager.finished.connect(self.reply_handler)
+        manager.finished.connect(self.network_reply_handler)
 
         self.cookie_jar = QNetworkCookieJar()
         manager.setCookieJar(self.cookie_jar)
         return cookies
 
 
-    def request(self, url, user_agent='Mozilla', timeout=15):
+    def request(self, url, user_agent='Mozilla', cookies={}, timeout=15):
+        url_info = urlsplit(url)
+
+        self.resource_list = []
         loop = QEventLoop()
         self.view.loadFinished.connect(loop.quit)
+
+        # Timeout
         timer = QTimer()
         timer.setSingleShot(True)
         timer.timeout.connect(loop.quit)
         timer.start(timeout * 1000)
 
+        # User-Agent
         self.page.user_agent = user_agent
+
+        # Cookies
+        
+        cookie_obj_list = []
+        for name, value in cookies.items():
+            domain = ('.' + url_info.netloc).split(':')[0]
+            #print 'CREATE COOKIE %s=%s' % (name, value)
+            #print 'DOMAIN = %s' % domain
+            cookie_obj = QNetworkCookie(name, value)
+            cookie_obj.setDomain(domain)
+            cookie_obj_list.append(cookie_obj)
+        self.cookie_jar.setAllCookies(cookie_obj_list)
+
+        # Make a request
         self.view.load(QUrl(url))
 
         loop.exec_()
 
         if timer.isActive():
-            return self.build_response()
+            request_resource = None
+            url = str(self.page.mainFrame().url().toString()).rstrip('/')
+            for res in self.resource_list:
+                if url == res.url or url == res.url.rstrip('/'):
+                    request_resource = res
+                    break
+            if request_resource:
+                return self.build_response(request_resource)
+            else:
+                raise KitError('Request was successfull but it is not possible '\
+                               'to associate the request to one of received responses')
         else:
             raise KitError('Timeout while loading %s' % url)
 
-    def build_response(self):
+    def build_response(self, resource):
         response = Response()
         response.head = ''
-        response.body = unicode(self.page.mainFrame().toHtml())
-        response.code = 200
-        response.url = self.view.url(),
+        response.body = resource.reply.data
+        response.code = resource.status_code
+        response.url = str(resource.reply.url().toString())
+
         response.parse(charset='utf-8')
+
+        response.headers = resource.headers
         response.cookies = self.get_cookies()
         return response
 
     def __del__(self):
         self.view.setPage(None)
 
-    def reply_handler(self, reply):
-        logger.debug('Loaded %s, length %d' % (reply.url().toString(), len(reply.data)))
+    def network_reply_handler(self, reply):
+        status_code = reply.attribute(QNetworkRequest.HttpStatusCodeAttribute)
+        if status_code:
+            logger.debug('Resource loaded: %s [%d]' % (reply.url().toString(),
+                                                       status_code.toInt()[0]))
+            self.resource_list.append(Resource(reply))
 
 
 if __name__ == '__main__':
     logging.basicConfig(level=logging.DEBUG)
 
     br = Kit(gui=False)
-    resp = br.request('http://ya.ru/')
-    print resp.body[:200]
+    resp = br.request('http://httpbin.org/cookies', cookies={'foo': 'bar'})
+    #resp = br.request('http://dumpz.org/')
+    print resp.body
     print resp.cookies

grab/transport/kit.py

         #self.body_path = path
 
     def reset(self):
-        self.request_object = {}
+        self.request_object = {
+            'url': None,
+            'cookies': {},
+        }
         self.response = None
         #self.response_head_chunks = []
         #self.response_body_chunks = []
     def process_config(self, grab):
         self.request_object['url'] = grab.config['url']
 
+        if grab.config['cookiefile']:
+            grab.load_cookies(grab.config['cookiefile'])
+
+        if grab.config['cookies']:
+            if not isinstance(grab.config['cookies'], dict):
+                raise error.GrabMisuseError('cookies option shuld be a dict')
+            self.request_object['cookies'] = grab.config['cookies']
+
     def request(self):
         kit = Kit()
-        self.kit_response = kit.request(self.request_object['url'])
+        self.kit_response = kit.request(self.request_object['url'],
+                                        cookies=self.request_object['cookies'])
 
     def prepare_response(self, grab):
         return self.kit_response
     'test.grab_proxy',
     'test.upload_file',
     'test.limit_option',
-    'test.cookies',
+    'test.grab_cookies',
     'test.response_class',
     'test.charset_issue',
     'test.grab_pickle',

test/cookies.py

-# coding: utf-8
-from unittest import TestCase
-import string
-import json
-
-from grab import Grab, GrabMisuseError
-from util import TMP_FILE, GRAB_TRANSPORT
-from tornado_util import SERVER
-
-class TestCookies(TestCase):
-    def setUp(self):
-        SERVER.reset()
-
-    def test_cookies_parsing(self):
-        g = Grab(transport=GRAB_TRANSPORT)
-        SERVER.RESPONSE['cookies'] = {'foo': 'bar', '1': '2'}
-        g.go(SERVER.BASE_URL)
-        self.assertEqual(g.response.cookies['foo'], 'bar')
-
-    def test_multiple_cookies(self):
-        g = Grab(transport=GRAB_TRANSPORT)
-        SERVER.RESPONSE['cookies'] = {}
-        g.setup(cookies={'foo': '1', 'bar': '2'})
-        g.go(SERVER.BASE_URL)
-        self.assertEqual(
-            set(map(string.strip, SERVER.REQUEST['headers']['Cookie'].split('; '))),
-            set(['foo=1', 'bar=2']))
-
-    def test_session(self):
-        g = Grab(transport=GRAB_TRANSPORT)
-        g.setup(reuse_cookies=True)
-        SERVER.RESPONSE['cookies'] = {'foo': 'bar'}
-        g.go(SERVER.BASE_URL)
-        self.assertEqual(g.response.cookies['foo'], 'bar')
-        g.go(SERVER.BASE_URL)
-        self.assertEqual(SERVER.REQUEST['headers']['Cookie'], 'foo=bar')
-        g.go(SERVER.BASE_URL)
-        self.assertEqual(SERVER.REQUEST['headers']['Cookie'], 'foo=bar')
-
-        g = Grab(transport=GRAB_TRANSPORT)
-        g.setup(reuse_cookies=False)
-        SERVER.RESPONSE['cookies'] = {'foo': 'baz'}
-        g.go(SERVER.BASE_URL)
-        self.assertEqual(g.response.cookies['foo'], 'baz')
-        g.go(SERVER.BASE_URL)
-        self.assertTrue('Cookie' not in SERVER.REQUEST['headers'])
-
-        g = Grab(transport=GRAB_TRANSPORT)
-        g.setup(reuse_cookies=True)
-        SERVER.RESPONSE['cookies'] = {'foo': 'bar'}
-        g.go(SERVER.BASE_URL)
-        self.assertEqual(g.response.cookies['foo'], 'bar')
-        g.clear_cookies()
-        g.go(SERVER.BASE_URL)
-        self.assertTrue('Cookie' not in SERVER.REQUEST['headers'])
-
-    def test_redirect_session(self):
-        g = Grab(transport=GRAB_TRANSPORT)
-        SERVER.RESPONSE['cookies'] = {'foo': 'bar'}
-        g.go(SERVER.BASE_URL)
-        self.assertEqual(g.response.cookies['foo'], 'bar')
-
-        # Setup one-time redirect
-        g = Grab(transport=GRAB_TRANSPORT)
-        SERVER.RESPONSE['cookies'] = {}
-        SERVER.RESPONSE_ONCE_HEADERS.append(('Location', SERVER.BASE_URL))
-        SERVER.RESPONSE_ONCE_HEADERS.append(('Set-Cookie', 'foo=bar'))
-        SERVER.RESPONSE['once_code'] = 302
-        g.go(SERVER.BASE_URL)
-        self.assertEqual(SERVER.REQUEST['headers']['Cookie'], 'foo=bar')
-
-    def test_load_dump(self):
-        g = Grab(transport=GRAB_TRANSPORT)
-        cookies = {'foo': 'bar', 'spam': 'ham'}
-        g.setup(cookies=cookies)
-        g.dump_cookies(TMP_FILE)
-        self.assertEqual(set(cookies.items()), set(json.load(open(TMP_FILE)).items()))
-
-        # Test non-ascii
-        g = Grab(transport=GRAB_TRANSPORT)
-        cookies = {'foo': 'bar', 'spam': u'бегемот'}
-        g.setup(cookies=cookies)
-        g.dump_cookies(TMP_FILE)
-        self.assertEqual(set(cookies.items()), set(json.load(open(TMP_FILE)).items()))
-
-        # Test load cookies
-        g = Grab(transport=GRAB_TRANSPORT)
-        cookies = {'foo': 'bar', 'spam': u'бегемот'}
-        json.dump(cookies, open(TMP_FILE, 'w'))
-        g.load_cookies(TMP_FILE)
-        self.assertEqual(set(g.config['cookies'].items()), set(cookies.items()))
-
-    def test_cookiefile(self):
-        g = Grab(transport=GRAB_TRANSPORT)
-
-        # Empty file should not raise Exception
-        open(TMP_FILE, 'w').write('')
-        g.setup(cookiefile=TMP_FILE)
-        g.go(SERVER.BASE_URL)
-
-        cookies = {'spam': 'ham'}
-        json.dump(cookies, open(TMP_FILE, 'w'))
-
-        # One cookie are sent in server reponse
-        # Another cookies is passed via the `cookiefile` option
-        SERVER.RESPONSE['cookies'] = {'godzilla': 'monkey'}
-        g.setup(cookiefile=TMP_FILE)
-        g.go(SERVER.BASE_URL)
-        self.assertEqual(SERVER.REQUEST['headers']['Cookie'], 'spam=ham')
-
-        # This is correct reslt of combining two cookies
-        MERGED_COOKIES = {'godzilla': 'monkey', 'spam': 'ham'}
-
-        # g.config should contains merged cookies
-        self.assertEqual(set(MERGED_COOKIES.items()),
-                         set(g.config['cookies'].items()))
-
-        # `cookiefile` file should contains merged cookies
-        self.assertEqual(set(MERGED_COOKIES.items()),
-                         set(json.load(open(TMP_FILE)).items()))

test/grab_cookies.py

+# coding: utf-8
+from unittest import TestCase
+import string
+import json
+
+from grab import Grab, GrabMisuseError
+from util import TMP_FILE, GRAB_TRANSPORT
+from tornado_util import SERVER
+
+class TestCookies(TestCase):
+    def setUp(self):
+        SERVER.reset()
+
+    def test_parsing_response_cookies(self):
+        g = Grab(transport=GRAB_TRANSPORT)
+        SERVER.RESPONSE['cookies'] = {'foo': 'bar', '1': '2'}
+        g.go(SERVER.BASE_URL)
+        self.assertEqual(g.response.cookies['foo'], 'bar')
+
+    def test_multiple_cookies(self):
+        g = Grab(transport=GRAB_TRANSPORT)
+        SERVER.RESPONSE['cookies'] = {}
+        g.setup(cookies={'foo': '1', 'bar': '2'})
+        g.go(SERVER.BASE_URL)
+        self.assertEqual(
+            set(map(string.strip, SERVER.REQUEST['headers']['Cookie'].split('; '))),
+            set(['foo=1', 'bar=2']))
+
+    def test_session(self):
+        g = Grab(transport=GRAB_TRANSPORT)
+        g.setup(reuse_cookies=True)
+        SERVER.RESPONSE['cookies'] = {'foo': 'bar'}
+        g.go(SERVER.BASE_URL)
+        self.assertEqual(g.response.cookies['foo'], 'bar')
+        g.go(SERVER.BASE_URL)
+        self.assertEqual(SERVER.REQUEST['headers']['Cookie'], 'foo=bar')
+        g.go(SERVER.BASE_URL)
+        self.assertEqual(SERVER.REQUEST['headers']['Cookie'], 'foo=bar')
+
+        g = Grab(transport=GRAB_TRANSPORT)
+        g.setup(reuse_cookies=False)
+        SERVER.RESPONSE['cookies'] = {'foo': 'baz'}
+        g.go(SERVER.BASE_URL)
+        self.assertEqual(g.response.cookies['foo'], 'baz')
+        g.go(SERVER.BASE_URL)
+        self.assertTrue('Cookie' not in SERVER.REQUEST['headers'])
+
+        g = Grab(transport=GRAB_TRANSPORT)
+        g.setup(reuse_cookies=True)
+        SERVER.RESPONSE['cookies'] = {'foo': 'bar'}
+        g.go(SERVER.BASE_URL)
+        self.assertEqual(g.response.cookies['foo'], 'bar')
+        g.clear_cookies()
+        g.go(SERVER.BASE_URL)
+        self.assertTrue('Cookie' not in SERVER.REQUEST['headers'])
+
+    def test_redirect_session(self):
+        g = Grab(transport=GRAB_TRANSPORT)
+        SERVER.RESPONSE['cookies'] = {'foo': 'bar'}
+        g.go(SERVER.BASE_URL)
+        self.assertEqual(g.response.cookies['foo'], 'bar')
+
+        # Setup one-time redirect
+        g = Grab(transport=GRAB_TRANSPORT)
+        SERVER.RESPONSE['cookies'] = {}
+        SERVER.RESPONSE_ONCE_HEADERS.append(('Location', SERVER.BASE_URL))
+        SERVER.RESPONSE_ONCE_HEADERS.append(('Set-Cookie', 'foo=bar'))
+        SERVER.RESPONSE['once_code'] = 302
+        g.go(SERVER.BASE_URL)
+        self.assertEqual(SERVER.REQUEST['headers']['Cookie'], 'foo=bar')
+
+    def test_load_dump(self):
+        g = Grab(transport=GRAB_TRANSPORT)
+        cookies = {'foo': 'bar', 'spam': 'ham'}
+        g.setup(cookies=cookies)
+        g.dump_cookies(TMP_FILE)
+        self.assertEqual(set(cookies.items()), set(json.load(open(TMP_FILE)).items()))
+
+        # Test non-ascii
+        g = Grab(transport=GRAB_TRANSPORT)
+        cookies = {'foo': 'bar', 'spam': u'бегемот'}
+        g.setup(cookies=cookies)
+        g.dump_cookies(TMP_FILE)
+        self.assertEqual(set(cookies.items()), set(json.load(open(TMP_FILE)).items()))
+
+        # Test load cookies
+        g = Grab(transport=GRAB_TRANSPORT)
+        cookies = {'foo': 'bar', 'spam': u'бегемот'}
+        json.dump(cookies, open(TMP_FILE, 'w'))
+        g.load_cookies(TMP_FILE)
+        self.assertEqual(set(g.config['cookies'].items()), set(cookies.items()))
+
+    def test_cookiefile(self):
+        g = Grab(transport=GRAB_TRANSPORT)
+
+        # Empty file should not raise Exception
+        open(TMP_FILE, 'w').write('')
+        g.setup(cookiefile=TMP_FILE)
+        g.go(SERVER.BASE_URL)
+
+        cookies = {'spam': 'ham'}
+        json.dump(cookies, open(TMP_FILE, 'w'))
+
+        # One cookie are sent in server reponse
+        # Another cookies is passed via the `cookiefile` option
+        SERVER.RESPONSE['cookies'] = {'godzilla': 'monkey'}
+        g.setup(cookiefile=TMP_FILE)
+        g.go(SERVER.BASE_URL)
+        self.assertEqual(SERVER.REQUEST['headers']['Cookie'], 'spam=ham')
+
+        # This is correct reslt of combining two cookies
+        MERGED_COOKIES = {'godzilla': 'monkey', 'spam': 'ham'}
+
+        # g.config should contains merged cookies
+        self.assertEqual(set(MERGED_COOKIES.items()),
+                         set(g.config['cookies'].items()))
+
+        # `cookiefile` file should contains merged cookies
+        self.assertEqual(set(MERGED_COOKIES.items()),
+                         set(json.load(open(TMP_FILE)).items()))

test/grab_simple.py

         g.go(SERVER.BASE_URL)
         self.assertTrue('Final Countdown' in g.response.body)
 
-    #def test_body_inmemory(self):
-        #g = Grab()
-        #g.setup(body_inmemory=False)
-        #self.assertRaises(GrabMisuseError, lambda: g.go(SERVER.BASE_URL))
+    def test_identity_of_downloaded_content(self):
+        SERVER.RESPONSE['get'] = 'Simple String'
+        g = Grab(transport=GRAB_TRANSPORT)
+        g.go(SERVER.BASE_URL)
+        self.assertEqual('Simple String', g.response.body)
 
-        #SERVER.RESPONSE['get'] = 'foo'
-        #g = Grab()
-        #g.setup(body_inmemory=False)
-        #g.setup(body_storage_dir=TMP_DIR)
-        #g.go(SERVER.BASE_URL)
-        #self.assertTrue(os.path.exists(g.response.body_path))
-        #self.assertTrue(TMP_DIR in g.response.body_path)
-        #self.assertEqual('foo', open(g.response.body_path).read())
-        #old_path = g.response.body_path
+    def test_status_code(self):
+        SERVER.RESPONSE['get'] = 'Simple String'
+        g = Grab(transport=GRAB_TRANSPORT)
+        g.go(SERVER.BASE_URL)
+        self.assertEqual(200, g.response.code)
 
-        #g.go(SERVER.BASE_URL)
-        #self.assertTrue(old_path != g.response.body_path)
+    def test_response_headers(self):
+        SERVER.RESPONSE['headers'] = [('Hello', 'Grab')]
+        g = Grab(transport=GRAB_TRANSPORT)
+        g.go(SERVER.BASE_URL)
+        self.assertTrue(g.response.headers['Hello'] == 'Grab')
 
-        #SERVER.RESPONSE['get'] = 'foo'
-        #g = Grab()
-        #g.setup(body_inmemory=False)
-        #g.setup(body_storage_dir=TMP_DIR)
-        #g.setup(body_storage_filename='musik.mp3')
-        #g.go(SERVER.BASE_URL)
-        #self.assertTrue(os.path.exists(g.response.body_path))
-        #self.assertTrue(TMP_DIR in g.response.body_path)
-        #self.assertEqual('foo', open(g.response.body_path).read())
-        #self.assertEqual(os.path.join(TMP_DIR, 'musik.mp3'), g.response.body_path)
+    def test_response_cookies(self):
+        SERVER.RESPONSE['cookies'] = {'Hello': 'Grab'}
+        g = Grab(transport=GRAB_TRANSPORT)
+        g.go(SERVER.BASE_URL)
+        self.assertTrue(g.response.cookies['Hello'] == 'Grab')

test/tornado_util.py

             'get': '',
             'post': '',
             'cookies': None,
-             'once_code': None,
-             'get_callback': None,
+            'headers': [],
+            'once_code': None,
+            'get_callback': None,
         })
         self.RESPONSE_ONCE.update({
             'get': None,
                 for name, value in SERVER.RESPONSE['cookies'].items():
                     self.set_header('Set-Cookie', '%s=%s' % (name, value))
 
+            if SERVER.RESPONSE['headers']:
+                for name, value in SERVER.RESPONSE['headers']:
+                    self.set_header(name, value)
+
             while SERVER.RESPONSE_ONCE_HEADERS:
                 key, value = SERVER.RESPONSE_ONCE_HEADERS.pop()
                 self.set_header(key, value)