Commits

Miki Tebeka  committed 0a9f2b0

common code, redirects and checksum

  • Participants
  • Parent commits 708e6d3

Comments (0)

Files changed (1)

File webhdfs/__init__.py

 import requests
+import sys
 
-DEFAULT_HOST = 'localhost'
-DEFAULT_PORT = '50070'
+if sys.version_info[0] > 2:
+    from urllib.parse import urlparse
+else:
+    from urlparse import urlparse
+
+HOST, PORT = 'localhost', 50070
 
 class WebHDFSError(Exception):
     pass
 
 class WebHDFS(object):
-    def __init__(self, host=DEFAULT_HOST, port=DEFAULT_PORT):
+    def __init__(self, host=HOST, port=PORT):
         self.host, self.port = host, port
+        self.base_url = self._gen_base(self.host, self.port)
 
     def listdir(self, path):
-        url = 'http://{}:{}/webhdfs/v1{}?op=LISTSTATUS'.format(
-            self.host, self.port, path)
+        return self._op('GET', path, 'LISTSTATUS', ['FileStatuses', 'FileStatus'])
 
-        resp = requests.get(url)
+    def stat(self, path):
+        return self._op('GET', path, 'GETFILESTATUS', ['FileStatus'])
+
+    def checksum(self, path):
+        return self._op('GET', path, 'GETFILECHECKSUM', ['FileChecksum'])
+
+    def _call(self, method, url):
+        resp = requests.request(method, url, allow_redirects=False)
+        while True:
+            if not resp.ok:
+                raise WebHDFSError(resp.reason)
+
+            if resp.status_code != 307:
+                return resp
+
+            # The host in the redirect URL is *internal* one, so we need to fix the
+            # url. Otherwise we'd just follow the redirects
+            url = urlparse(resp.headers['Location'])
+            host, port = url.netloc.split(':')
+            url = url._replace(netloc='{}:{}'.format(self.host, port))
+
+            resp = requests.request(
+                method, url.geturl(), allow_redirects=False)
+
+        return resp
+
+    def _op(self, method, path, op, getters=None):
+        url = '{}{}?op={}'.format(self.base_url, path, op)
+
+        resp = self._call(method, url)
         if not resp.ok:
             raise WebHDFSError
 
-        return resp.json()['FileStatuses']['FileStatus']
 
+        reply = resp.json()
+        for key in (getters or []):
+            reply = reply[key]
 
+        return reply
 
+    def _find_host(self, url):
+        url = urlparse(url)
+        host = url.netloc
+        i = host.find(':')
+        return host if i == -1 else host[:i]
+
+    def _gen_base(self, host, port):
+        return 'http://{}:{}/webhdfs/v1'.format(host, port)
+