Source

woocode / py / utils / utils.py

# -*- encoding:utf-8 -*-
import os
import re
import tarfile
from urlparse import urlparse
from time import time
from glob import fnmatch

RE_TYPE = type(re.compile('foo'))

def get_files(path, fn_pat=None):
    files = (os.path.join(path, f) for f in os.listdir(path))
    if fn_pat is not None:
        if isinstance(fn_pat, str):
            files = (f for f in files
                        if fn_pat in f or fnmatch.fnmatch(f, fn_pat))
        elif isinstance(fn_pat, RE_TYPE):
            files = (f for f in files
                        if fn_pat.search(f))
        else:
            raise TypeError("<fn_path> only support regexp or str, got: %r" % repr(type(fn_pat)))

    return files

def dump_obj(fn, obj):
    '''
    dump obj to file
    '''

    import pickle
    with open(fn, 'wb') as fb:
        pickle.dump(obj, fb)

def timeit(func):
    def wrapper(*args, **kwargs):
        start_time = time()
        ret = func(*args, **kwargs)
        print 'Cost %.2f secs' % (time() - start_time)
        return ret
    return wrapper

def query_to_dict(query):
    '''
    >>> q = 'a=a&b=b'
    >>> query_to_dict(q)
    {'a': 'a', 'b': 'b'}
    >>> q = 'a=a&b=b&c='
    >>> query_to_dict(q)
    {'a': 'a', 'c': '', 'b': 'b'}
    >>> q = 'a=a%XX&b=b&c=c'
    >>> query_to_dict(q)
    {'a': 'a%XX', 'c': 'c', 'b': 'b'}
    '''
    return dict([k.split('=') for k in query.split('&')])

def parse_request_url(url_str):
    method, url, protocol = url_str.split()
    o = urlparse.urlparse(url)
    ret = query_to_dict(o.query)
    # convert all str to int which is type is number
    for k, v in ret.iteritems():
        if v.isdigit():
            ret[k] = int(v)
    return ret

def parse_non_digit(s, default=0, keep_postive=True):
    '''
    >>> parse_non_digit('yu')
    0
    >>> parse_non_digit('-11')
    0
    >>> parse_non_digit('-11', False)
    -11
    >>> parse_non_digit('')
    0
    >>> parse_non_digit(0)
    0
    >>> parse_non_digit(1)
    1
    >>> parse_non_digit(32321)
    32321
    '''
    ret = 0
    if isinstance(s, int):
        ret = s
    elif isinstance(s, str):
        try:
            ret = int(s)
        except ValueError:
            pass
    if keep_postive:
        ret = ret > 0 and ret or 0
    return ret

def get_lines(f, line_pat=None):
    if isinstance(f, (file, tarfile.ExFileObject)):
        fobj = f
    elif isinstance(f, str):
        if os.path.isfile(f):
            fobj = file(f, 'rb')
        else:
            raise IOError('No such file: %s' % f)
    else:
        raise TypeError("Support only [file|str|tarfile], got: %r" % repr(type(f)))

    for line in fobj:
        if line_pat:
            if not line_pat.search(line): continue
        yield line
    fobj.close()

def get_fobj_from_tarfile(tarfile, mem_pat=None):
    '''
    参数

    @tarfile: tar文件路径
    @mem_pat: tar文件成员的匹配模式,用于过滤文件

    返回: 一个文件对象
    '''

    tf = tarfile.open(tarfile)
    for member in tf.members:
        if mem_pat:
            if isinstance(mem_pat, RE_TYPE):
                if not mem_pat.search(member.name):
                    continue
            elif isinstance(mem_pat, str):
                if not os.path.basename(member.name) == mem_pat:
                    continue
            raise TypeError("Support only [regexp|str], got: %r" % repr(type(mem_pat)))

        yield tf.extractfile(member)
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.