Commits

Miki Tebeka  committed 605ad29

put and append

  • Participants
  • Parent commits 09b26af

Comments (0)

Files changed (2)

+#!/usr/bin/env python
+
+
+from webhdfs import WebHDFS
+
+conn = WebHDFS('192.168.1.121', user='hdfs')
+# print('listdir')
+# print(conn.listdir('/tmp'))
+# print('stat')
+# print(conn.stat('/tmp/z8'))
+# print('checksum')
+# print(conn.checksum('/tmp/z8'))
+# print('home')
+# print(conn.home())
+# print('chmod')
+# conn.chmod('/tmp/z8', 0o777)
+# print('chown')
+# conn.chown('/tmp/z8', 'cloudera')
+# print('open')
+# print(conn.read('/tmp/z8').decode('utf-8'))
+remote = '/tmp/z138'
+# print('put')
+# conn.put('README.rst', remote, overwrite=True)
+print('append')
+conn.append(__file__, remote)

File webhdfs/__init__.py

     pass
 
 
+def jsonpath(path):
+    def wrapper(fn):
+        def wrapped(*args, **kw):
+            resp = fn(*args, **kw)
+
+            reply = resp.json()
+            for key in path:
+                reply = reply[key]
+            return reply
+        return wrapped
+    return wrapper
+
+
+octperm = '{:03o}'.format
+intparam = '{:d}'.format
+
+
 class WebHDFS(object):
     def __init__(self, host=HOST, port=PORT, **kw):
         self.host, self.port = host, port
         self.base_url = self._gen_base(self.host, self.port)
         self.user = kw.get('user')
 
+    @jsonpath(['FileStatuses', 'FileStatus'])
     def listdir(self, path):
-        return self._op('GET', path, 'LISTSTATUS',
-                        ['FileStatuses', 'FileStatus'])
+        return self._op('GET', path, 'LISTSTATUS')
 
+    @jsonpath(['FileStatus'])
     def stat(self, path):
-        return self._op('GET', path, 'GETFILESTATUS', ['FileStatus'])
+        return self._op('GET', path, 'GETFILESTATUS')
 
+    @jsonpath(['FileChecksum'])
     def checksum(self, path):
-        return self._op('GET', path, 'GETFILECHECKSUM', ['FileChecksum'])
+        resp = self._op('GET', path, 'GETFILECHECKSUM')
+        url = self._get_redirect(resp)
+        return requests.get(url)
 
+    @jsonpath(['Path'])
     def home(self):
         return self._op('GET', '/', 'GETHOMEDIRECTORY')
 
     def chmod(self, path, mode):
-        return self._op('PUT', path, 'SETPERMISSION', is_json=False,
-                        permission='{:o}'.format(mode))
+        query = {'permission':  octperm(mode)}
+        self._op('PUT', path, 'SETPERMISSION', query)
 
     def chown(self, path, user=None, group=None):
         query = {}
         if not query:
             raise WebHDFSError('need to specify at least one of user or group')
 
-        self._op('PUT', path, 'SETOWNER', is_json=False, query=query)
+        self._op('PUT', path, 'SETOWNER', query=query)
 
     def read(self, path, offset=0, length=0, buffersize=0):
         # FIXME: Find a way to unite handling of optional parameters
         query = {}
         if offset:
-            query['offset'] = str(offset)
+            query['offset'] = intparam(offset)
         if length:
-            query['length'] = str(length)
+            query['length'] = intparam(length)
         if buffersize:
-            query['buffersize'] = str(buffersize)
+            query['buffersize'] = intparam(buffersize)
 
-        return self._op('GET', path, 'OPEN', is_json=False, query=query)
+        resp = self._op('GET', path, 'OPEN', query=query)
+        url = self._get_redirect(resp)
+        return requests.get(url).content
 
     def put(self, local, path, overwrite=False, blocksize=0, replication=None,
             permission=0, buffersize=0):
 
+        query = {'overwrite': 'true' if overwrite else 'false'}
+        if blocksize:
+            query['blocksize'] = intparam(blocksize)
+        if replication:
+            query['replication'] = intparam(replication)
+        if permission:
+            query['permission'] = octperm(permission)
+        if buffersize:
+            query['buffersize'] = intparam(buffersize)
+
+        self._put('CREATE', 'PUT', local, path, query)
+
+    def append(self, local, path, buffersize=0):
+        query = {}
+        if buffersize:
+            query['buffersize'] = intparam(buffersize)
+        self._put('APPEND', 'POST', local, path, query)
+
+
+    def _put(self, op, method, local, path, query):
         if not isfile(local):
             raise WebHDFSError('put error: {} not found'.format(local))
 
-        query = {'overwrite': 'true' if overwrite else 'false'}
-        if blocksize:
-            query['blocksize'] = str(blocksize)
-        if replication:
-            query['replication'] = str(replication)
-        if permission:
-            query['permission'] = '{:0o}'.format(permission)
-        if buffersize:
-            query['buffersize'] = str(buffersize)
+        resp = self._op(method, path, op, query)
+        url = self._get_redirect(resp)
 
+        with open(local) as fo:
+            data = fo.read()
 
+        resp = requests.request(method, url, data=data)
+        self._check_resp(resp)
 
-    def _call(self, method, url, opts):
-        resp = requests.request(method, url, allow_redirects=False, **opts)
-        while True:
-            if not resp.ok:
-                raise WebHDFSError(resp.reason)
+    def _get_redirect(self, resp):
+        # The host in the redirect URL is *internal* one, so we need to fix
+        # the url. Otherwise we'd just follow the redirects
+        url = urlparse(resp.headers['Location'])
+        host, port = url.netloc.split(':')
+        url = url._replace(netloc='{}:{}'.format(self.host, port))
+        return url.geturl()
 
-            if resp.status_code != 307:
-                return resp
-
-            # The host in the redirect URL is *internal* one, so we need to fix
-            # the url. Otherwise we'd just follow the redirects
-            url = urlparse(resp.headers['Location'])
-            host, port = url.netloc.split(':')
-            url = url._replace(netloc='{}:{}'.format(self.host, port))
-
-            resp = requests.request(
-                method, url.geturl(), allow_redirects=False)
-
+    def _check_resp(self, resp):
+        if not resp.ok:
+            raise WebHDFSError(resp.reason)
         return resp
 
-    def _op(self, method, path, op, getters=None, is_json=False, query=None,
-            opts=None):
+    def _op(self, method, path, op, query=None, opts=None):
         opts = opts or {}
 
         url = '{}{}?op={}'.format(self.base_url, path, op)
         if query:
             url += '&{}'.format(urlencode(query))
 
-        resp = self._call(method, url, opts)
-        if not resp.ok:
-            raise WebHDFSError
-
-        if not is_json:
-            return resp.content
-
-        reply = resp.json()
-        for key in (getters or []):
-            reply = reply[key]
-
-        return reply
-
-    def _find_host(self, url):
-        url = urlparse(url)
-        host = url.netloc
-        i = host.find(':')
-        return host if i == -1 else host[:i]
+        resp = requests.request(method, url, allow_redirects=False, **opts)
+        return self._check_resp(resp)
 
     def _gen_base(self, host, port):
         return 'http://{}:{}/webhdfs/v1'.format(host, port)