Source

pypy-postgresql / pypy / module / postgresql / interp_types.py

Full commit
from __future__ import with_statement

import math

from pypy.interpreter.baseobjspace import Wrappable
from pypy.interpreter.gateway import unwrap_spec, interp2app
from pypy.interpreter.typedef import TypeDef
from pypy.module.postgresql import libpq
from pypy.module.rctime.interp_time import c_localtime
from pypy.rlib.rarithmetic import intmask
from pypy.rlib.rfloat import NAN, INFINITY
from pypy.rpython.lltypesystem import lltype, rffi


class W_Type(Wrappable):
    def __init__(self, w_name, values, caster=None, w_py_caster=None,
        base_caster=None):
        self.w_name = w_name
        self.values = values
        self.caster = caster
        self.w_py_caster = w_py_caster
        self.base_caster = base_caster

    @unwrap_spec(other=int)
    def descr_eq(self, space, other):
        return space.wrap(other in self.values)

    def cast(self, space, value, length, cursor):
        if self.w_py_caster is not None:
            return space.call_function(self.w_py_caster,
                space.wrap(rffi.charp2strn(value, length)),
                cursor,
            )
        else:
            return self.caster(space, value, length, cursor)

W_Type.typedef = TypeDef("type",
    __eq__ = interp2app(W_Type.descr_eq),
)

def typecast(space, caster, value, length, cursor):
    old = cursor.caster
    cursor.caster = caster
    w_val = caster.cast(space, value, length, cursor)
    cursor.caster = old
    return w_val

def cast_string(space, value, length, cursor):
    return space.wrap(rffi.charp2strn(value, length))

def cast_longinteger(space, value, length, cursor):
    w_s = space.wrap(rffi.charp2strn(value, length))
    return space.call_function(space.w_long, w_s)

def cast_integer(space, value, length, cursor):
    return space.wrap(int(rffi.charp2strn(value, length)))

def cast_float(space, value, length, cursor):
    s = rffi.charp2strn(value, length)
    if s == "NaN":
        n = NAN
    elif s == "Infinity":
        n = INFINITY
    else:
        n = float(s)
    return space.wrap(n)

def cast_decimal(space, value, length, cursor):
    from pypy.module.postgresql.interp_state import get

    n = rffi.charp2strn(value, length)
    return space.call_function(get(space).w_Decimal, space.wrap(n))

def cast_binary(space, value, length, cursor):
    with lltype.scoped_alloc(rffi.CArray(rffi.SIZE_T), 1) as to_length:
        s = libpq.PQunescapeBytea(value, to_length)
        try:
            w_res = space.buffer_w(
                space.wrap(rffi.charpsize2str(s, intmask(to_length[0])))
            )
        finally:
            libpq.PQfreemem(rffi.cast(rffi.VOIDP, s))
        return w_res

def cast_boolean(space, value, length, cursor):
    return space.wrap(value[0] == "t")

def cast_generic_array(space, value, length, cursor):
    s = rffi.charp2strn(value, length)
    assert s[0] == "{" and s[-1] == "}"
    i = 1
    w_array = space.newlist([])
    stack_w = [w_array]
    while i < len(s) - 1:
        if s[i] == "{":
            w_sub_array = space.newlist([])
            w_array.append(w_sub_array)
            stack_w.append(w_sub_array)
            w_array = w_sub_array
            i += 1
        elif s[i] == "}":
            stack_w.pop()
            w_array = stack_w[-1]
            i += 1
        elif s[i] == ",":
            i += 1
        else:
            start = i
            # If q is odd this is quoted
            q = 0
            # Whether or not the last char was a backslash
            b = False
            while i < len(s) - 1:
                if s[i] == '"':
                    if not b:
                        q += 1;
                elif s[i] == "\\":
                    b = not b
                elif s[i] == "}" or s[i] == ",":
                    if not b and q % 2 == 0:
                        break
                i += 1
            if q:
                start += 1
                end = i - 1
            else:
                end = i

            val = []
            for j in xrange(start, end):
                if s[j] != "\\" or s[j-1] == "\\":
                    val.append(s[j])
            with rffi.scoped_str2charp("".join(val)) as str_buf:
                w_val = typecast(
                    space, cursor.caster.base_caster, str_buf, end - start, cursor
                )
                w_array.append(w_val)
    return stack_w[-1]

def cast_unicode(space, value, length, cursor):
    from pypy.module.postgresql.interp_state import get

    encoding = get(space).encodings[cursor.w_connection.encoding]
    s = rffi.charp2strn(value, length)
    w_s = space.wrap(s)
    return space.call_method(w_s, "decode", space.wrap(encoding))

def _parse_date(space, date):
    from pypy.module.postgresql.interp_state import get

    year, month, day = date.split("-")
    return space.call_function(get(space).w_Date,
        space.wrap(int(year)), space.wrap(int(month)), space.wrap(int(day))
    )

def _parse_time(space, time, cursor):
    from pypy.module.postgresql.interp_state import get

    microsecond = 0
    hour, minute, second = time.split(":", 2)

    w_tzinfo = space.w_None
    sign = 0
    timezone = None
    if "-" in second:
        sign = -1
        second, timezone = second.split("-")
    elif "+" in second:
        sign = 1
        second, timezone = second.split("+")
    if not space.is_w(cursor.w_tzinfo_factory, space.w_None) and sign:
        parts = timezone.split(":")
        tz_min = sign * 60 * int(parts[0])
        if len(parts) > 1:
            tz_min += int(parts[1])
        if len(parts) > 2:
            tz_min += int(int(parts[2]) / 60.)
        w_tzinfo = space.call_function(
            cursor.w_tzinfo_factory, space.wrap(tz_min)
        )
    if "." in second:
        second, microsecond = second.split(".")
        microsecond = int(microsecond) * int(math.pow(10.0, 6.0 - len(microsecond)))

    return space.call_function(get(space).w_Time,
        space.wrap(int(hour)),
        space.wrap(int(minute)),
        space.wrap(int(second)),
        space.wrap(microsecond),
        w_tzinfo,
    )

def cast_datetime(space, value, length, cursor):
    from pypy.module.postgresql.interp_state import get

    s = rffi.charp2strn(value, length)
    date, time = s.split(" ")
    w_date = _parse_date(space, date)
    w_time = _parse_time(space, time, cursor)

    return space.call_method(get(space).w_DateTime, "combine", w_date, w_time)

def cast_date(space, value, length, cursor):
    date = rffi.charp2strn(value, length)
    return _parse_date(space, date)

def cast_time(space, value, length, cursor):
    time = rffi.charp2strn(value, length)
    return _parse_time(space, time, cursor)

def cast_interval(space, value, length, cursor):
    from pypy.module.postgresql.interp_state import get

    years = months = days = 0
    hours = minutes = seconds = hundreths = 0.0
    v = 0.0
    sign = 1
    denominator = 1.0
    part = 0
    skip_to_space = False

    s = rffi.charp2strn(value, length)
    for c in s:
        if skip_to_space:
            if c == " ":
                skip_to_space = False
            continue
        if c == "-":
            sign = -1
        elif "0" <= c <= "9":
            v = v * 10 + ord(c) - ord("0")
            if part == 6:
                denominator *= 10
        elif c == "y":
            if part == 0:
                years = int(v * sign)
                skip_to_space = True
                v = 0.0
                sign = 1
                part = 1
        elif c == "m":
            if part <= 1:
                months = int(v * sign)
                skip_to_space = True
                v = 0.0
                sign = 1
                part = 2
        elif c == "d":
            if part <= 2:
                days = int(v * sign)
                skip_to_space = True
                v = 0.0
                sign = 1
                part = 3
        elif c == ":":
            if part <= 3:
                hours = v
                v = 0.0
                part = 4
            elif part == 4:
                minutes = v
                v = 0.0
                part = 5
        elif c == ".":
            if part == 5:
                seconds = v
                v = 0.0
                part = 6

    if part == 4:
        minutes = v
    elif part == 5:
        seconds = v
    elif part == 6:
        hundreths = v / denominator

    if sign < 0.0:
        seconds = - (hundreths + seconds + minutes * 60 + hours * 3600)
    else:
        seconds += hundreths + minutes * 60 + hours * 3600

    days += years * 365 + months * 30
    micro = (seconds - math.floor(seconds)) * 1000000.0
    seconds = int(math.floor(seconds))
    return space.call_function(get(space).w_TimeDelta,
        space.wrap(days),
        space.wrap(seconds),
        space.wrap(int(micro))
    )


@unwrap_spec(year=int, month=int, day=int)
def w_Date(space, year, month, day):
    from pypy.module.postgresql.interp_state import get
    from pypy.module.postgresql.interp_adapters import W_DateTime

    w_date = space.call_function(
        get(space).w_Date,
        space.wrap(year), space.wrap(month), space.wrap(day)
    )
    return W_DateTime(w_date)

@unwrap_spec(ticks=float)
def w_DateFromTicks(space, ticks):
    with lltype.scoped_alloc(rffi.CArray(rffi.TIME_T), 1) as t:
        t[0] = rffi.cast(rffi.TIME_T, int(ticks))
        tm = c_localtime(t)
        return w_Date(space,
            rffi.cast(lltype.Signed, tm.c_tm_year) + 1900,
            rffi.cast(lltype.Signed, tm.c_tm_mon) + 1,
            rffi.cast(lltype.Signed, tm.c_tm_mday)
        )

def w_Binary(space, w_obj):
    from pypy.module.postgresql.interp_adapters import W_Binary

    return W_Binary(w_obj)