Commits

Miki Tebeka committed 7556b86

wildcard

Comments (0)

Files changed (3)

 
 from webhdfs import WebHDFS
 
-conn = WebHDFS('192.168.1.121', user='hdfs')
+conn = WebHDFS('192.168.1.134', user='hdfs')
 remote = '/tmp/z138'
 # print('listdir')
 # print(conn.listdir('/tmp'))
 # print(conn.checksum(remote))
 # print('home')
 # print(conn.home())
-# print('chmod')
-# conn.chmod(remote, 0o777)
+print('chmod')
+conn.chmod(0o777, remote)
 # print('chown')
-# conn.chown(remote, 'cloudera')
+# conn.chown('cloudera', remote)
 # print('open')
 # print(conn.read(remote).decode('utf-8'))
 # print('put')

webhdfs/__init__.py

     def wrapper(fn):
         def wrapped(*args, **kw):
             resp = fn(*args, **kw)
+            if not resp.ok:
+                raise WebHDFSError(resp.reason)
 
             reply = resp.json()
             for key in path:
     def home(self):
         return self._op('GET', '/', 'GETHOMEDIRECTORY')
 
-    def chmod(self, path, mode):
+    def chmod(self, mode, path):
         query = {'permission':  octperm(mode)}
         self._op('PUT', path, 'SETPERMISSION', query)
 

webhdfs/__main__.py

 #!/usr/bin/env python
 
 from webhdfs import WebHDFS, WebHDFSError
+from requests import ConnectionError
+
 from argparse import ArgumentParser
+from fnmatch import fnmatch
+from os.path import basename, dirname
 from time import localtime, strftime
+import re
 
 fs = None
 
-def stat_long(stat):
+def stat_long(stat, path):
     time = stat['modificationTime']/1000
     stat['mod'] = strftime('%b %d %H:%M', localtime(time))
-    fmt = '{permission:4} {length:10} {owner:8} {group:10} {mod} {pathSuffix}'
+    stat['path'] = stat['pathSuffix'] or path
+    fmt = '{permission:4} {length:10} {owner:8} {group:10} {mod} {path}'
     return fmt.format(**stat)
 
-def stat_short(stat):
-    return stat['pathSuffix']
+def stat_short(stat, path):
+    return stat['pathSuffix'] or path
 
 
 def ls(args):
+    base = basename(args.path)
+    if re.search('[?*\[\]]', base):
+        path = dirname(args.path)
+        is_glob = True
+    else:
+        path = args.path
+        is_glob = False
+
     fmt = stat_long if args.long else stat_short
-    for stat in fs.listdir(args.path):
-        print(fmt(stat))
+    for stat in fs.listdir(path):
+        if is_glob and not fnmatch(stat['pathSuffix'] or path, base):
+            continue
+        print(fmt(stat, args.path))
+
+def stat(args):
+    info = fs.stat(args.path)
+    msize = max(len(key) for key in info)
+    fmt = '{{:{}}}: {{}}'.format(msize)
+    for key, value in info.items():
+        print(fmt.format(key, value))
+
+def checksum(args):
+    print(fs.checksum(args.path)['bytes'])
+
+def home(args):
+    print(fs.home())
+
+def chmod(args):
+    fs.chmod(args.mode, args.path)
 
 def main(argv=None):
     global fs
     parser.add_argument('--user', help='webhdfs user', default=None)
 
     subs = parser.add_subparsers()
+
     ls_parser = subs.add_parser('ls')
     ls_parser.add_argument('path')
     ls_parser.add_argument('-l', help='long format', default=False,
                            action='store_true', dest='long')
     ls_parser.set_defaults(func=ls)
+
+    stat_parser = subs.add_parser('stat')
+    stat_parser.add_argument('path')
+    stat_parser.set_defaults(func=stat)
+
+    cs_parser = subs.add_parser('checksum')
+    cs_parser.add_argument('path')
+    cs_parser.set_defaults(func=checksum)
+
+    home_parser = subs.add_parser('home')
+    home_parser.set_defaults(func=home)
+
+    chmod_parser = subs.add_parser('chmod')
+    chmod_parser.add_argument('mode', type=lambda v: int(v, 8))
+    chmod_parser.add_argument('path')
+    chmod_parser.set_defaults(func=chmod)
+
+
     args = parser.parse_args(argv[1:])
 
     fs = WebHDFS(args.host, args.port, user=args.user)
         args.func(args)
     except WebHDFSError as err:
         raise SystemExit('error: {}'.format(err))
+    except ConnectionError as err:
+        raise SystemExit('error: cannot connect - {}'.format(err.args[0].reason))
 
 
 if __name__ == '__main__':