django-mssql / sqlserver_ado / operations.py

Michael Manfre 1b1c507 

Michael Manfre 0b87f67 
Michael Manfre f6b5859 

Michael Manfre 1b1c507 
Michael Manfre 0c22c23 




Michael Manfre 1b1c507 



Michael Manfre 110433d 






Michael Manfre 1b1c507 
Michael Manfre 46109a8 





Michael Manfre 1b1c507 
Michael Manfre 2fe6e77 

















Michael Manfre 1b1c507 
Michael Manfre 2d27e60 
Michael Manfre 1b1c507 

Michael Manfre 0b87f67 





Michael Manfre 1b1c507 


Michael Manfre 0b87f67 
















Michael Manfre 1b1c507 




















Michael Manfre 07396e8 
Michael Manfre 1b1c507 

















































































Michael Manfre f6b5859 
Michael Manfre 0c22c23 
Michael Manfre f6b5859 


Michael Manfre 1b1c507 

Michael Manfre 613d75c 
Michael Manfre 5b358e6 
Michael Manfre 055ab8a 
Michael Manfre 1b1c507 

Michael Manfre 613d75c 
Michael Manfre 055ab8a 

Michael Manfre f6b5859 


Michael Manfre 1b1c507 



Michael Manfre 055ab8a 
Michael Manfre 1b1c507 
Michael Manfre 613d75c 
Michael Manfre 1b1c507 














Michael Manfre 8bd3c94 





Michael Manfre a79a31f 






Michael Manfre 80a2031 







Michael Manfre 0c22c23 































import datetime
import time
import django
from django.conf import settings
from django.db.backends import BaseDatabaseOperations

try:
    from django.utils import timezone
except ImportError:
    # timezone added in Django 1.4, use provided partial backport
    import timezone

class DatabaseOperations(BaseDatabaseOperations):
    compiler_module = "sqlserver_ado.compiler"
    
    def cache_key_culling_sql(self):
        return """
            SELECT [cache_key]
              FROM (SELECT [cache_key], ROW_NUMBER() OVER (ORDER BY [cache_key]) AS [rank] FROM %s) AS [RankedCache]
             WHERE [rank] = %%s + 1
        """
    
    def date_extract_sql(self, lookup_type, field_name):
        if lookup_type == 'week_day':
            lookup_type = 'weekday'
        return 'DATEPART({0}, {1})'.format(
            lookup_type,
            self.quote_name(field_name),
        )

    def date_interval_sql(self, sql, connector, timedelta):
        """
        implements the interval functionality for expressions
        format for SQL Server.
        """
        sign = 1 if connector == '+' else -1
        seconds = ((timedelta.days * 86400) + timedelta.seconds) * sign
        out = sql
        if seconds:
            out = u'DATEADD(SECOND, {0}, {1})'.format(seconds, sql)
        if timedelta.microseconds:
            # DATEADD with datetime doesn't support ms, must cast up
            out = u'DATEADD(MICROSECOND, {ms}, CAST({sql} as datetime2))'.format(
                ms=timedelta.microseconds * sign,
                sql=out,
            )
        return out

    def date_trunc_sql(self, lookup_type, field_name):
        return "DATEADD(%s, DATEDIFF(%s, 0, %s), 0)" % (lookup_type, lookup_type, field_name)

    def last_insert_id(self, cursor, table_name, pk_name):
        """
        Fetch the last inserted ID by executing another query.
        """
        # IDENT_CURRENT   returns the last identity value generated for a 
        #                 specific table in any session and any scope.
        # http://msdn.microsoft.com/en-us/library/ms175098.aspx
        cursor.execute("SELECT CAST(IDENT_CURRENT(%s) as bigint)", [self.quote_name(table_name)])
        return cursor.fetchone()[0]

    def return_insert_id(self):
        """
        MSSQL implements the RETURNING SQL standard extension differently from
        the core database backends and this function is essentially a no-op. 
        The SQL is altered in the SQLInsertCompiler to add the necessary OUTPUT
        clause.
        """
        if django.VERSION[0] == 1 and django.VERSION[1] < 5:
            # This gets around inflexibility of SQLInsertCompiler's need to 
            # append an SQL fragment at the end of the insert query, which also must
            # expect the full quoted table and column name.
            return ('/* %s */', '')
        
        # Django #19096 - As of Django 1.5, can return None, None to bypass the 
        # core's SQL mangling.
        return (None, None)

    def no_limit_value(self):
        return None

    def prep_for_like_query(self, x):
        """Prepares a value for use in a LIKE query."""
        from django.utils.encoding import smart_unicode
        return (
            smart_unicode(x).\
                replace("\\", "\\\\").\
                replace("%", "\%").\
                replace("_", "\_").\
                replace("[", "\[").\
                replace("]", "\]")
            )

    def quote_name(self, name):
        if name.startswith('[') and name.endswith(']'):
            return name # already quoted
        return '[%s]' % name

    def random_function_sql(self):
        return 'NEWID()'

    def regex_lookup(self, lookup_type):
        # Case sensitivity
        match_option = {'iregex':0, 'regex':1}[lookup_type]
        return "dbo.REGEXP_LIKE(%%s, %%s, %s)=1" % (match_option,)

    def sql_flush(self, style, tables, sequences):
        """
        Returns a list of SQL statements required to remove all data from
        the given database tables (without actually removing the tables
        themselves).

        The `style` argument is a Style object as returned by either
        color_style() or no_style() in django.core.management.color.
        
        Originally taken from django-pyodbc project.
        """
        if not tables:
            return list()
            
        qn = self.quote_name
            
        # Cannot use TRUNCATE on tables that are referenced by a FOREIGN KEY; use DELETE instead.
        # (which is slow)
        from django.db import connection
        cursor = connection.cursor()
        # Try to minimize the risks of the braindeaded inconsistency in
        # DBCC CHEKIDENT(table, RESEED, n) behavior.
        seqs = []
        for seq in sequences:
            cursor.execute("SELECT COUNT(*) FROM %s" % qn(seq["table"]))
            rowcnt = cursor.fetchone()[0]
            elem = dict()

            if rowcnt:
                elem['start_id'] = 0
            else:
                elem['start_id'] = 1

            elem.update(seq)
            seqs.append(elem)

        cursor.execute("SELECT TABLE_NAME, CONSTRAINT_NAME FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS WHERE CONSTRAINT_TYPE IN ('CHECK', 'FOREIGN KEY')")
        fks = cursor.fetchall()
        
        sql_list = list()

        # Turn off constraints.
        sql_list.extend(['ALTER TABLE %s NOCHECK CONSTRAINT %s;' % (
            qn(fk[0]), qn(fk[1])) for fk in fks if fk[0] is not None and fk[1] is not None])

        # Delete data from tables.
        sql_list.extend(['%s %s %s;' % (
            style.SQL_KEYWORD('DELETE'), 
            style.SQL_KEYWORD('FROM'), 
            style.SQL_FIELD(qn(t))
            ) for t in tables])

        # Reset the counters on each table.
        sql_list.extend(['%s %s (%s, %s, %s) %s %s;' % (
            style.SQL_KEYWORD('DBCC'),
            style.SQL_KEYWORD('CHECKIDENT'),
            style.SQL_FIELD(qn(seq["table"])),
            style.SQL_KEYWORD('RESEED'),
            style.SQL_FIELD('%d' % seq['start_id']),
            style.SQL_KEYWORD('WITH'),
            style.SQL_KEYWORD('NO_INFOMSGS'),
            ) for seq in seqs])

        # Turn constraints back on.
        sql_list.extend(['ALTER TABLE %s CHECK CONSTRAINT %s;' % (
            qn(fk[0]), qn(fk[1])) for fk in fks if fk[0] is not None and fk[1] is not None])

        return sql_list

    def tablespace_sql(self, tablespace, inline=False):
        return "ON %s" % self.quote_name(tablespace)
        
    def value_to_db_datetime(self, value):
        if value is None:
            return None
            
        if timezone.is_aware(value):
            if getattr(settings, 'USE_TZ', False):
                value = value.astimezone(timezone.utc).replace(tzinfo=None)
            else:
                raise ValueError("SQL Server backend does not support timezone-aware datetimes.")

        # SQL Server 2005 doesn't support microseconds
        if self.is_sql2005():
           value = value.replace(microsecond=0)
        return value
    
    def value_to_db_time(self, value):
        if not self.is_sql2005():
            return value

        if timezone.is_aware(value):
            raise ValueError("SQL Server backend does not support timezone-aware times.")

        # MS SQL 2005 doesn't support microseconds
        #...but it also doesn't really suport bare times
        if value is None:
            return None
        
        return value.replace(microsecond=0)

    def value_to_db_decimal(self, value, max_digits, decimal_places):
        if value is None or value == '':
            return None
        return value # Should be a decimal type (or string)

    def year_lookup_bounds(self, value):
        """
        Returns a two-elements list with the lower and upper bound to be used
        with a BETWEEN operator to query a field value using a year lookup

        `value` is an int, containing the looked-up year.
        """
        first = datetime.datetime(value, 1, 1)
        second = datetime.datetime(value, 12, 31, 23, 59, 59, 999)
        return [first, second]

    def bulk_insert_sql(self, fields, num_values):
        """
        Format the SQL for bulk insert
        """
        items_sql = "(%s)" % ", ".join(["%s"] * len(fields))
        return "VALUES " + ", ".join([items_sql] * num_values)

    def max_name_length(self):
        """
        MSSQL supports identifier names up to 128
        """
        return 128

    def _supports_stddev(self):
        """
        Work around for django ticket #18334. 
        This backend supports StdDev and the SQLCompilers will remap to 
        the correct function names.
        """
        return True

    def enable_identity_insert(self, table):
        """
        Backends can implement as needed to enable inserts in to
        the identity column.
        
        Should return True if identity inserts have been enabled.
        """
        if table:
            from django.db import connection
            cursor = connection.cursor()
            cursor.execute('SET IDENTITY_INSERT {0} ON'.format(
                connection.ops.quote_name(table)
            ))
            return True
        return False
    
    def disable_identity_insert(self, table):
        """
        Backends can implement as needed to disable inserts in to
        the identity column.
        
        Should return True if identity inserts have been disabled.
        """
        if table:
            from django.db import connection
            cursor = connection.cursor()
            cursor.execute('SET IDENTITY_INSERT {0} OFF'.format(
                connection.ops.quote_name(table)
            ))
            return True
        return False
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.