Source

bloodhound-rpc / trunk / tracrpc / util.py

Full commit
# -*- coding: utf-8 -*-
"""
License: BSD

(c) 2005-2008 ::: Alec Thomas (alec@swapoff.org)
(c) 2009      ::: www.CodeResort.com - BV Network AS (simon-code@bvnetwork.no)
(c) 2013      ::: Olemis Lang (olemis+trac@gmail.com)
"""

from trac.util.compat import any

try:
  from cStringIO import StringIO
except ImportError:
  from StringIO import StringIO

try:
    # Method only available in Trac 0.11.3 or higher
    from trac.util.text import exception_to_unicode
except ImportError:
    def exception_to_unicode(e, traceback=""):
        from trac.util.text import to_unicode
        message = '%s: %s' % (e.__class__.__name__, to_unicode(e))
        if traceback:
            from trac.util import get_last_traceback
            traceback_only = get_last_traceback().split('\n')[:-2]
            message = '\n%s\n%s' % (to_unicode('\n'.join(traceback_only)),
                                        message)
        return message

try:
    # Constant available from Trac 0.12dev r8612
    from trac.util.text import empty
except ImportError:
    empty = None

def accepts_mimetype(req, mimetype):
    if isinstance(mimetype, basestring):
      mimetype = (mimetype,)
    accept = req.get_header('Accept')
    if accept is None :
        # Don't make judgements if no MIME type expected and method is GET
        return req.method == 'GET'
    else :
        accept = accept.split(',')
        return any(x.strip().startswith(y) for x in accept for y in mimetype)

def prepare_docs(text, indent=4):
    r"""Remove leading whitespace"""
    return text and ''.join(l[indent:] for l in text.splitlines(True)) or ''

try:
    # Micro-second support added to 0.12dev r9210
    from trac.util.datefmt import to_utimestamp, from_utimestamp
except ImportError:
    from trac.util.datefmt import to_timestamp, to_datetime, utc
    to_utimestamp = to_timestamp
    from_utimestamp = lambda x: to_datetime(x, utc)


#--------------------------------------------------
# Imported from TracGVizPlugin
# needed for report, timeline and version control RPC
#--------------------------------------------------

from datetime import datetime, date, time
from fnmatch import translate
from itertools import chain, imap, repeat, izip
from re import compile
import types
from xmlrpclib import DateTime

from bhdashboard.util import dummy_request

TYPES_LABELS = {type(None): 'string',
                str : 'string',
                unicode : 'string',
                long : 'number',
                int : 'number',
                datetime : 'datetime',
                date : 'date', 
                time : 'timeofday',
                DateTime : 'datetime',
                bool : 'boolean',}

def get_column_desc(cursor, infer=False):
    r"""Retrieve a sequence of tuples (name, type) describing 
    the columns present in the results provider by a cursor object 
    after executing a database query.
    """
    row = None
    if cursor.description:
        for i, d in enumerate(cursor.description):
            name, type_code = d[:2]
            if isinstance(name, str):
                name = unicode(name, 'utf-8')
            if type_code is None and infer:
                if row is None:
                    try:
                        row, = cursor.fetchmany(1)
                    except:
                        row = ('',) * len(list(cursor.description))
                type_code = TYPES_LABELS.get(row[i].__class__)
            yield name, type_code

def rpc_to_datetime(DT, req):
    r"""Return the datetime object representing the xmlrpclib.DateTime 
    value in `DT`. The return value is at the timezone of the 
    environment processing the request `req`.
    """
    dt = datetime.strptime(DT.value, '%Y%m%dT%H:%M:%S')
    return dt.replace(tzinfo=req.tz)

def __insert_many_id(id, _tuple): 
    return (id,) + _tuple

def __insert_value_id(id, value): 
    return (id, value)

def map_with_id(req, ids, func, ins, *iterables):
    if iterables:
        iterables = izip(*iterables)
    else:
        iterables = repeat(tuple())
    return chain(*(imap(ins, repeat(x), func(req, x, *args)) \
            for x, args in izip(ids, iterables)))

def map_many_with_id(req, ids, func, *iterables):
    return map_with_id(req, ids, func, __insert_many_id, *iterables)

def map_value_with_id(req, ids, func, *iterables):
    return map_with_id(req, ids, func, __insert_value_id, *iterables)

DEFAULT_DATE_FORMATS = {'date' : "%Y-%m-%d",
                        'datetime' : "%Y-%m-%d %H:%M:%S",
                        'timeofday' : "%H:%M:%S",}

def rpc_opt_sigs(ret_type, fixed_types=None, *opt_types):
    r"""Generate tuples describing the signatures of an XML-RPC method 
    whose arguments can take values in a set of optional types or 
    be missing in the method call.
    """
    if fixed_types is None:
        fixed_types = ()
    else:
        fixed_types = tuple(fixed_types)
    
    new_sig = (ret_type,) + fixed_types
    yield new_sig
    old_gen = [new_sig]
    
    for arg_types in opt_types:
        new_gen = []
        for sig in old_gen:
            for arg_type in arg_types:
                new_sig = sig + (arg_type,)
                yield new_sig
                new_gen.append(new_sig)
        old_gen = new_gen


#--------------------------------------------------
#   Miscellaneous
#--------------------------------------------------

def compile_pattern(fnp):
  r"""Return a compiled UNIX file name pattern. Used for efficiency.
  """
  return compile(translate(fnp))