1. Kirill Simonov
  2. htsql-sqlascii-patch

Source

htsql-sqlascii-patch / src / htsql_pgsql / core / connect.py

#
# Copyright (c) 2006-2012, Prometheus Research, LLC
#


from htsql.core.adapter import adapts
from htsql.core.domain import StringDomain, EnumDomain
from htsql.core.connect import Connect, DBError, NormalizeError, Normalize
from htsql.core.context import context
import psycopg2, psycopg2.extensions


class PGSQLError(DBError):
    """
    Raised when a database error occurred.
    """


class ConnectPGSQL(Connect):
    """
    Implementation of the connection adapter for PostgreSQL.
    """

    def open(self):
        # Prepare and execute the `psycopg2.connect()` call.
        db = context.app.htsql.db
        parameters = {}
        parameters['database'] = db.database
        if db.host is not None:
            parameters['host'] = db.host
        if db.port is not None:
            parameters['port'] = db.port
        if db.username is not None:
            parameters['user'] = db.username
        if db.password is not None:
            parameters['password'] = db.password
        connection = psycopg2.connect(**parameters)

        connection.set_client_encoding('LATIN1')

        # Make TEXT values return as `unicode` objects.
        psycopg2.extensions.register_type(psycopg2.extensions.UNICODE,
                                          connection)
        psycopg2.extensions.register_type(psycopg2.extensions.UNICODEARRAY,
                                          connection)

        # Enable autocommit.
        if self.with_autocommit:
            connection.set_isolation_level(
                    psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)

        return connection


class NormalizePGSQLError(NormalizeError):

    def __call__(self):
        # If we got a DBAPI exception, generate our own error.
        if isinstance(self.error, psycopg2.Error):
            message = str(self.error)
            error = PGSQLError(message)
            return error

        # Otherwise, let the superclass return `None`.
        return super(NormalizePGSQLError, self).__call__()


class NormalizePGSQLString(Normalize):

    adapts(StringDomain)

    @staticmethod
    def convert(value):
        if isinstance(value, str):
            value = value.decode('latin1')
        return value


class NormalizePGSQLEnum(Normalize):

    adapts(EnumDomain)

    @staticmethod
    def convert(value):
        if isinstance(value, str):
            value = value.decode('latin1')
        return value