Source

htsql-sqlascii-patch / src / htsql_pgsql / core / connect.py

Full commit
#
# Copyright (c) 2006-2012, Prometheus Research, LLC
#


from htsql.core.adapter import adapts
from htsql.core.domain import StringDomain, EnumDomain
from htsql.core.connect import Connect, DBError, NormalizeError, Normalize
from htsql.core.context import context
import codecs
import psycopg2, psycopg2.extensions


class PGSQLError(DBError):
    """
    Raised when a database error occurred.
    """


class ConnectPGSQL(Connect):
    """
    Implementation of the connection adapter for PostgreSQL.
    """

    valid_options = ['charset']

    def open(self):
        # Prepare and execute the `psycopg2.connect()` call.
        db = context.app.htsql.db
        parameters = {}
        parameters['database'] = db.database
        if db.host is not None:
            parameters['host'] = db.host
        if db.port is not None:
            parameters['port'] = db.port
        if db.username is not None:
            parameters['user'] = db.username
        if db.password is not None:
            parameters['password'] = db.password
        connection = psycopg2.connect(**parameters)

        encoding = 'UTF8'
        if db.options is not None and db.options.get('charset'):
            charset = db.options.get('charset')
            try:
                codec = codecs.lookup(charset)
            except LookupError:
                raise PGSQLError("unknown charset: %s" % charset)
            for client_encoding in sorted(psycopg2._psycopg.encodings):
                client_charset = psycopg2._psycopg.encodings[client_encoding]
                client_codec = codecs.lookup(client_charset)
                if client_codec.name == codec.name:
                    encoding = client_encoding
                    break
            else:
                raise PGSQLError("unknown charset: %s" % charset)
        connection.set_client_encoding(encoding)

        # Make TEXT values return as `unicode` objects.
        psycopg2.extensions.register_type(psycopg2.extensions.UNICODE,
                                          connection)
        psycopg2.extensions.register_type(psycopg2.extensions.UNICODEARRAY,
                                          connection)

        # Enable autocommit.
        if self.with_autocommit:
            connection.set_isolation_level(
                    psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)

        return connection


class NormalizePGSQLError(NormalizeError):

    def __call__(self):
        # If we got a DBAPI exception, generate our own error.
        if isinstance(self.error, psycopg2.Error):
            message = str(self.error)
            error = PGSQLError(message)
            return error

        # Otherwise, let the superclass return `None`.
        return super(NormalizePGSQLError, self).__call__()


class NormalizePGSQL(Normalize):

    def __init__(self, domain):
        super(NormalizePGSQL, self).__init__(domain)
        db = context.app.htsql.db
        self.charset = 'utf-8'
        if db.options is not None and db.options.get('charset'):
            self.charset = db.options['charset']


class NormalizePGSQLString(Normalize):

    adapts(StringDomain)

    def convert(self, value):
        if isinstance(value, str):
            value = value.decode(self.charset)
        return value


class NormalizePGSQLEnum(Normalize):

    adapts(EnumDomain)

    def convert(self, value):
        if isinstance(value, str):
            value = value.decode(self.charset)
        return value