htsql-charset-option / src / htsql_sqlite /

Full commit
# Copyright (c) 2006-2011, Prometheus Research, LLC
# Authors: Clark C. Evans <>,
#          Kirill Simonov <>


This module implements the introspection adapter for SQLite.

from htsql.introspect import Introspect
from htsql.entity import (CatalogEntity, SchemaEntity, TableEntity,
                          ColumnEntity, UniqueKeyEntity, PrimaryKeyEntity,
from .domain import (SQLiteBooleanDomain, SQLiteIntegerDomain,
                     SQLiteFloatDomain, SQLiteTextDomain, SQLiteDateDomain,
                     SQLiteDateTimeDomain, SQLiteOpaqueDomain)
from htsql.connect import Connect
from htsql.util import Record

class Meta(object):
    Loads raw meta-data from SQLite system tables.

    def __init__(self):
        connect = Connect()
        connection = connect()
        cursor = connection.cursor()
        self.sqlite_master = self.fetch(cursor,
                                        """SELECT *
                                           FROM sqlite_master
                                           WHERE type = 'table'
                                           ORDER BY name""")
        self.table_info = {}
        self.index_list = {}
        self.index_info = {}
        self.foreign_key_list = {}
        for row in self.sqlite_master:
            rows = self.fetch(cursor, """PRAGMA table_info(%s)""",
            self.table_info[] = rows
            rows = self.fetch(cursor, """PRAGMA index_list(%s)""",
            self.index_list[] = rows
            for index_row in self.index_list[]:
                rows = self.fetch(cursor, """PRAGMA index_info(%s)""",
                self.index_info[] = rows
            rows = self.fetch(cursor, """PRAGMA foreign_key_list(%s)""",
            self.foreign_key_list[] = rows

    def fetch(self, cursor, query, name=None):
        if name is not None:
            query = query % ('"%s"' % name.replace('"', '""'))
        rows = []
        for items in cursor.fetchall():
            attributes = {}
            for kind, item in zip(cursor.description, items):
                name = kind[0]
                if isinstance(item, unicode):
                    item = item.encode('utf-8')
                # Workaround against extra quoting in
                # PRAGMA foreign_key_list; column `table`.
                # See ``
                # and ``.
                # The bug is fixed in SQLite 3.6.14.
                if name == 'table' and (item.startswith('"') and
                    item = item[1:-1].replace('""', '"')
                attributes[name] = item
            record = Record(**attributes)
        return rows

class IntrospectSQLite(Introspect):
    Implements the introspection adapter for SQLite.

    def __init__(self):
        super(IntrospectSQLite, self).__init__()
        self.meta = Meta()

    def __call__(self):
        return self.introspect_catalog()

    def permit_schema(self, schema_name):
        return True

    def permit_table(self, schema_name, table_name):
        return True

    def permit_column(self, schema_name, table_name, column_name):
        return True

    def introspect_catalog(self):
        tables = self.introspect_tables()
        schema = SchemaEntity('_', tables)
        catalog = CatalogEntity([schema])
        return catalog

    def introspect_tables(self):
        tables = []
        for row in self.meta.sqlite_master:
            name =
            if not self.permit_table('_', name):
            columns = self.introspect_columns(name)
            unique_keys = self.introspect_unique_keys(name)
            foreign_keys = self.introspect_foreign_keys(name)
            table = TableEntity('_', name, columns, unique_keys, foreign_keys)
        return tables

    def introspect_columns(self, table_name):
        columns = []
        for row in self.meta.table_info[table_name]:
            name =
            if not self.permit_column('_', table_name, name):
            domain = self.introspect_domain(table_name, name, row.type)
            is_nullable = (not row.notnull)
            has_default = (row.dflt_value is not None)
            column = ColumnEntity('_', table_name, name, domain,
                                  is_nullable, has_default)
        return columns

    def introspect_unique_keys(self, table_name):
        unique_keys = []
        column_names = []
        for row in self.meta.table_info[table_name]:
        if column_names:
            unique_key = PrimaryKeyEntity('_', table_name, column_names)
        for row in self.meta.index_list[table_name]:
            if not row.unique:
            index_name =
            column_names = []
            for index_row in self.meta.index_info[index_name]:
            if not all(self.permit_column('_', table_name, column_name)
                       for column_name in column_names):
            unique_key = UniqueKeyEntity('_', table_name, column_names)
        return unique_keys

    def introspect_foreign_keys(self, table_name):
        foreign_keys = []
        ids = []
        columns_by_id = {}
        target_table_by_id = {}
        target_columns_by_id = {}
        for row in self.meta.foreign_key_list[table_name]:
            if not in target_table_by_id:
                columns_by_id[] = []
                target_table_by_id[] = row.table
                target_columns_by_id[] = []
            columns_by_id[].append(getattr(row, 'from'))
            target_columns_by_id[].append(getattr(row, 'to'))
        for id in ids:
            column_names = columns_by_id[id]
            target_table_name = target_table_by_id[id]
            target_column_names = target_columns_by_id[id]
            if not all(self.permit_column('_', table_name, column_name)
                       for column_name in column_names):
            if not self.permit_table('_', target_table_name):
            if not all(self.permit_column('_', target_table_name, column_name)
                       for column_name in target_column_names):
            foreign_key = ForeignKeyEntity('_', table_name, column_names,
                                           '_', target_table_name,
        return foreign_keys

    def introspect_domain(self, table_name, column_name, type_name):
        name = type_name
        type_name = type_name.lower()
        if 'int' in type_name:
            return SQLiteIntegerDomain(name)
        if 'char' in type_name or 'clob' in type_name or 'text' in type_name:
            return SQLiteTextDomain(name)
        if 'real' in type_name or 'floa' in type_name or 'doub' in type_name:
            return SQLiteFloatDomain(name)
        if 'bool' in type_name:
            return SQLiteBooleanDomain(name)
        if 'datetime' in type_name or 'timestamp' in type_name:
            return SQLiteDateTimeDomain(name)
        if 'date' in type_name:
            return SQLiteDateDomain(name)
        return SQLiteOpaqueDomain(name)