Commits

adamv committed f004e3a

* Removed debugging print
* Moved dbapi tests into main test project, so they are run every time

  • Participants
  • Parent commits 5a91003

Comments (0)

Files changed (13)

File samples/nodjango.py

     c.close()
 
 def sproc_2b(connection):
-    "Calls a sproc using 'callproc'; None isn't supported as a parameter yet."
+    "Calls a sproc using 'callproc'."
     c = connection.cursor()
     c.callproc('uspAppUser_GetAll', [0])
     _print_names(c.fetchall())

File source/sqlserver_ado/base.py

             return False
     return True
 
+def connection_string_from_settings():
+    from django.conf import settings
+    return make_connection_string(settings)
+
+def make_connection_string(settings):
+    if settings.DATABASE_NAME == '':
+        raise ImproperlyConfigured("You need to specify a DATABASE_NAME in your Django settings file.")
+
+    datasource = settings.DATABASE_HOST
+    if not datasource:
+        datasource = "127.0.0.1"
+
+    # If a port is given, force a TCP/IP connection. The host should be an IP address in this case.
+    if settings.DATABASE_PORT != '':
+        if not _looks_like_ipaddress(datasource):
+            raise ImproperlyConfigured("When using DATABASE_PORT, DATABASE_HOST must be an IP address.")
+        datasource = '%s,%s;Network Library=DBMSSOCN' % (datasource, settings.DATABASE_PORT)
+
+    # If no user is specified, use integrated security.
+    if settings.DATABASE_USER != '':
+        auth_string = "UID=%s;PWD=%s" % (settings.DATABASE_USER, settings.DATABASE_PASSWORD)
+    else:
+        auth_string = "Integrated Security=SSPI"
+
+    return "PROVIDER=SQLOLEDB;DATA SOURCE=%s;Initial Catalog=%s;%s" % (datasource, settings.DATABASE_NAME, auth_string)
+
 class DatabaseWrapper(BaseDatabaseWrapper):
     operators = {
         "exact": "= %s",
         self.introspection = DatabaseIntrospection(self)
         self.validation = BaseDatabaseValidation()
         
-
     def _cursor(self, settings):
         # Connection strings courtesy of:
         # http://www.connectionstrings.com/?carrier=sqlserver
 
         if self.connection is None:
-            if settings.DATABASE_NAME == '':
-                raise ImproperlyConfigured("You need to specify a DATABASE_NAME in your Django settings file.")
-
-            datasource = settings.DATABASE_HOST
-            if not datasource:
-                datasource = "127.0.0.1"
-
-            # If a port is given, force a TCP/IP connection. The host should be an IP address in this case.
-            if settings.DATABASE_PORT != '':
-                if not _looks_like_ipaddress(datasource):
-                    raise ImproperlyConfigured("When using DATABASE_PORT, DATABASE_HOST must be an IP address.")
-                datasource = '%s,%s;Network Library=DBMSSOCN' % (datasource, settings.DATABASE_PORT)
-
-            # If no user is specified, use integrated security.
-            if settings.DATABASE_USER != '':
-                auth_string = "UID=%s;PWD=%s" % (settings.DATABASE_USER, settings.DATABASE_PASSWORD)
-            else:
-                auth_string = "Integrated Security=SSPI"
-
-            conn_string = "PROVIDER=SQLOLEDB;DATA SOURCE=%s;Initial Catalog=%s;%s" % \
-                (datasource, settings.DATABASE_NAME, auth_string)
-
-            self.connection = Database.connect(conn_string)
+            self.connection = Database.connect(make_connection_string(settings))
 
         return Database.Cursor(self.connection)

File source/sqlserver_ado/operations.py

     	quoted_field_name = self.quote_name(field_name)
 
         if lookup_type == 'year':
-            print "YEAR CHECKED"
             return "Convert(datetime, Convert(varchar, DATEPART(year, %s)) + '/01/01')" % quoted_field_name
         if lookup_type == 'month':
             return "Convert(datetime, Convert(varchar, DATEPART(year, %s)) + '/' + Convert(varchar, DATEPART(month, %s)) + '/01')" %\

File tests/api/dbapi20.py

-#!/usr/bin/env python
-''' Python DB API 2.0 driver compliance unit test suite. 
-    
-    This software is Public Domain and may be used without restrictions.
-    http://www.stuartbishop.net/Software/DBAPI20TestSuite/index.html
-'''
-
-__rcs_id__  = '$Id: dbapi20.py,v 1.11 2005/01/02 02:41:01 zenzen Exp $'
-__version__ = '$Revision: 1.11 $'[11:-2]
-__author__ = 'Stuart Bishop <stuart@stuartbishop.net>'
-
-import unittest
-import time
-
-
-class DatabaseAPI20Test(unittest.TestCase):
-    ''' Test a database self.driver for DB API 2.0 compatibility.
-        This implementation tests Gadfly, but the TestCase
-        is structured so that other self.drivers can subclass this 
-        test case to ensure compiliance with the DB-API. It is 
-        expected that this TestCase may be expanded in the future
-        if ambiguities or edge conditions are discovered.
-
-        The 'Optional Extensions' are not yet being tested.
-
-        self.drivers should subclass this test, overriding setUp, tearDown,
-        self.driver, connect_args and connect_kw_args. Class specification
-        should be as follows:
-
-        import dbapi20 
-        class mytest(dbapi20.DatabaseAPI20Test):
-           [...] 
-
-        Don't 'import DatabaseAPI20Test from dbapi20', or you will
-        confuse the unit tester - just 'import dbapi20'.
-    '''
-
-    # The self.driver module. This should be the module where the 'connect'
-    # method is to be found
-    driver = None
-    connect_args = () # List of arguments to pass to connect
-    connect_kw_args = {} # Keyword arguments for connect
-    table_prefix = 'dbapi20test_' # If you need to specify a prefix for tables
-
-    ddl1 = 'create table %sbooze (name varchar(20))' % table_prefix
-    ddl2 = 'create table %sbarflys (name varchar(20))' % table_prefix
-    xddl1 = 'drop table %sbooze' % table_prefix
-    xddl2 = 'drop table %sbarflys' % table_prefix
-
-    lowerfunc = 'lower' # Name of stored procedure to convert string->lowercase
-        
-    # Some drivers may need to override these helpers, for example adding
-    # a 'commit' after the execute.
-    def executeDDL1(self,cursor):
-        cursor.execute(self.ddl1)
-
-    def executeDDL2(self,cursor):
-        cursor.execute(self.ddl2)
-
-    def setUp(self):
-        ''' self.drivers should override this method to perform required setup
-            if any is necessary, such as creating the database.
-        '''
-        pass
-
-    def tearDown(self):
-        ''' self.drivers should override this method to perform required cleanup
-            if any is necessary, such as deleting the test database.
-            The default drops the tables that may be created.
-        '''
-        con = self._connect()
-        try:
-            cur = con.cursor()
-            for i, ddl in enumerate((self.xddl1,self.xddl2)):
-                try: 
-                    cur.execute(ddl)
-                    con.commit()
-                except self.driver.Error: 
-                    # Assume table didn't exist. Other tests will check if
-                    # execute is busted.
-                    pass
-        finally:
-            con.close()
-
-    def _connect(self):
-        try:
-            return self.driver.connect(*self.connect_args,**self.connect_kw_args)
-        except AttributeError:
-            self.fail("No connect method found in self.driver module")
-
-    def test_connect(self):
-        con = self._connect()
-        con.close()
-
-    def test_apilevel(self):
-        try:
-            # Must exist and equal 2.0
-            apilevel = self.driver.apilevel
-            self.assertEqual(apilevel,'2.0')
-        except AttributeError:
-            self.fail("Driver doesn't define apilevel")
-
-    def test_threadsafety(self):
-        try:
-            # Must exist
-            threadsafety = self.driver.threadsafety
-            # Must be a valid value
-            self.failUnless(threadsafety in (0,1,2,3))
-        except AttributeError:
-            self.fail("Driver doesn't define threadsafety")
-
-    def test_paramstyle(self):
-        try:
-            # Must exist
-            paramstyle = self.driver.paramstyle
-            # Must be a valid value
-            self.failUnless(paramstyle in ('qmark','numeric','named','format','pyformat'))
-        except AttributeError:
-            self.fail("Driver doesn't define paramstyle")
-
-    def test_Exceptions(self):
-        # Make sure required exceptions exist, and are in the
-        # defined heirarchy.
-        self.failUnless(issubclass(self.driver.Warning,StandardError))
-        self.failUnless(issubclass(self.driver.Error,StandardError))
-        self.failUnless(issubclass(self.driver.InterfaceError,self.driver.Error))
-        self.failUnless(issubclass(self.driver.DatabaseError,self.driver.Error))
-        self.failUnless(issubclass(self.driver.OperationalError,self.driver.Error))
-        self.failUnless(issubclass(self.driver.IntegrityError,self.driver.Error))
-        self.failUnless(issubclass(self.driver.InternalError,self.driver.Error))
-        self.failUnless(issubclass(self.driver.ProgrammingError,self.driver.Error))
-        self.failUnless(issubclass(self.driver.NotSupportedError,self.driver.Error))
-
-    def test_ExceptionsAsConnectionAttributes(self):
-        # OPTIONAL EXTENSION
-        # Test for the optional DB API 2.0 extension, where the exceptions
-        # are exposed as attributes on the Connection object
-        # I figure this optional extension will be implemented by any
-        # driver author who is using this test suite, so it is enabled
-        # by default.
-        con = self._connect()
-        drv = self.driver
-        self.failUnless(con.Warning is drv.Warning)
-        self.failUnless(con.Error is drv.Error)
-        self.failUnless(con.InterfaceError is drv.InterfaceError)
-        self.failUnless(con.DatabaseError is drv.DatabaseError)
-        self.failUnless(con.OperationalError is drv.OperationalError)
-        self.failUnless(con.IntegrityError is drv.IntegrityError)
-        self.failUnless(con.InternalError is drv.InternalError)
-        self.failUnless(con.ProgrammingError is drv.ProgrammingError)
-        self.failUnless(con.NotSupportedError is drv.NotSupportedError)
-
-
-    def test_commit(self):
-        con = self._connect()
-        try:
-            # Commit must work, even if it doesn't do anything
-            con.commit()
-        finally:
-            con.close()
-
-    def test_rollback(self):
-        con = self._connect()
-        # If rollback is defined, it should either work or throw
-        # the documented exception
-        if hasattr(con,'rollback'):
-            try:
-                con.rollback()
-            except self.driver.NotSupportedError:
-                pass
-    
-    def test_cursor(self):
-        con = self._connect()
-        try:
-            cur = con.cursor()
-        finally:
-            con.close()
-
-    def test_cursor_isolation(self):
-        con = self._connect()
-        try:
-            # Make sure cursors created from the same connection have
-            # the documented transaction isolation level
-            cur1 = con.cursor()
-            cur2 = con.cursor()
-            self.executeDDL1(cur1)
-            cur1.execute("insert into %sbooze values ('Victoria Bitter')" % (self.table_prefix))
-            cur2.execute("select name from %sbooze" % self.table_prefix)
-            booze = cur2.fetchall()
-            self.assertEqual(len(booze),1)
-            self.assertEqual(len(booze[0]),1)
-            self.assertEqual(booze[0][0],'Victoria Bitter')
-        finally:
-            con.close()
-
-    def test_description(self):
-        con = self._connect()
-        try:
-            cur = con.cursor()
-            self.executeDDL1(cur)
-            self.assertEqual(cur.description,None,
-                'cursor.description should be none after executing a '
-                'statement that can return no rows (such as DDL)'
-                )
-            cur.execute('select name from %sbooze' % self.table_prefix)
-            self.assertEqual(len(cur.description),1,
-                'cursor.description describes too many columns'
-                )
-            self.assertEqual(len(cur.description[0]),7,
-                'cursor.description[x] tuples must have 7 elements'
-                )
-            self.assertEqual(cur.description[0][0].lower(),'name',
-                'cursor.description[x][0] must return column name'
-                )
-            self.assertEqual(cur.description[0][1],self.driver.STRING,
-                'cursor.description[x][1] must return column type. Got %r'
-                    % cur.description[0][1]
-                )
-
-            # Make sure self.description gets reset
-            self.executeDDL2(cur)
-            self.assertEqual(cur.description,None,
-                'cursor.description not being set to None when executing '
-                'no-result statements (eg. DDL)'
-                )
-        finally:
-            con.close()
-
-    def test_rowcount(self):
-        con = self._connect()
-        try:
-            cur = con.cursor()
-            self.executeDDL1(cur)
-            self.assertEqual(cur.rowcount,-1,
-                'cursor.rowcount should be -1 after executing no-result '
-                'statements'
-                )
-            cur.execute("insert into %sbooze values ('Victoria Bitter')" % (
-                self.table_prefix
-                ))
-            self.failUnless(cur.rowcount in (-1,1),
-                'cursor.rowcount should == number or rows inserted, or '
-                'set to -1 after executing an insert statement'
-                )
-            cur.execute("select name from %sbooze" % self.table_prefix)
-            self.failUnless(cur.rowcount in (-1,1),
-                'cursor.rowcount should == number of rows returned, or '
-                'set to -1 after executing a select statement'
-                )
-            self.executeDDL2(cur)
-            self.assertEqual(cur.rowcount,-1,
-                'cursor.rowcount not being reset to -1 after executing '
-                'no-result statements'
-                )
-        finally:
-            con.close()
-
-    def _callproc_setup(self):
-        pass
-
-    def test_callproc(self):
-        con = self._connect()
-        try:
-            cur = con.cursor()
-            self._callproc_setup(cur)
-            # Execute and get new parameters
-            values = cur.callproc('to_lower', ('FOO',))
-            self.assertEqual(len(values),1, 'callproc didnt return the input values')
-            self.assertEqual(values[0],'FOO', 'input-only values shouldnt change')
-            
-            r = cur.fetchall()
-            self.assertEqual(len(r),1,'callproc produced no result set')
-            self.assertEqual(len(r[0]),1, 'callproc produced invalid result set')
-            self.assertEqual(r[0][0],'foo', 'callproc produced invalid results')
-        finally:
-            con.close()
-
-    def test_close(self):
-        con = self._connect()
-        try:
-            cur = con.cursor()
-        finally:
-            con.close()
-
-        # cursor.execute should raise an Error if called after connection
-        # closed
-        self.assertRaises(self.driver.Error,self.executeDDL1,cur)
-
-        # connection.commit should raise an Error if called after connection'
-        # closed.'
-        self.assertRaises(self.driver.Error,con.commit)
-
-        # connection.close should raise an Error if called more than once
-        self.assertRaises(self.driver.Error,con.close)
-
-    def test_execute(self):
-        con = self._connect()
-        try:
-            cur = con.cursor()
-            self._paraminsert(cur)
-        finally:
-            con.close()
-
-    def _paraminsert(self,cur):
-        self.executeDDL1(cur)
-        cur.execute("insert into %sbooze values ('Victoria Bitter')" % (
-            self.table_prefix
-            ))
-        self.failUnless(cur.rowcount in (-1,1))
-
-        if self.driver.paramstyle == 'qmark':
-            cur.execute(
-                'insert into %sbooze values (?)' % self.table_prefix,
-                ("Cooper's",)
-                )
-        elif self.driver.paramstyle == 'numeric':
-            cur.execute(
-                'insert into %sbooze values (:1)' % self.table_prefix,
-                ("Cooper's",)
-                )
-        elif self.driver.paramstyle == 'named':
-            cur.execute(
-                'insert into %sbooze values (:beer)' % self.table_prefix, 
-                {'beer':"Cooper's"}
-                )
-        elif self.driver.paramstyle == 'format':
-            cur.execute(
-                'insert into %sbooze values (%%s)' % self.table_prefix,
-                ("Cooper's",)
-                )
-        elif self.driver.paramstyle == 'pyformat':
-            cur.execute(
-                'insert into %sbooze values (%%(beer)s)' % self.table_prefix,
-                {'beer':"Cooper's"}
-                )
-        else:
-            self.fail('Invalid paramstyle')
-        self.failUnless(cur.rowcount in (-1,1))
-
-        cur.execute('select name from %sbooze' % self.table_prefix)
-        res = cur.fetchall()
-        self.assertEqual(len(res),2,'cursor.fetchall returned too few rows')
-        beers = [res[0][0],res[1][0]]
-        beers.sort()
-        self.assertEqual(beers[0],"Cooper's",
-            'cursor.fetchall retrieved incorrect data, or data inserted '
-            'incorrectly'
-            )
-        self.assertEqual(beers[1],"Victoria Bitter",
-            'cursor.fetchall retrieved incorrect data, or data inserted '
-            'incorrectly'
-            )
-
-    def test_executemany(self):
-        con = self._connect()
-        try:
-            cur = con.cursor()
-            self.executeDDL1(cur)
-            largs = [ ("Cooper's",) , ("Boag's",) ]
-            margs = [ {'beer': "Cooper's"}, {'beer': "Boag's"} ]
-            if self.driver.paramstyle == 'qmark':
-                cur.executemany(
-                    'insert into %sbooze values (?)' % self.table_prefix,
-                    largs
-                    )
-            elif self.driver.paramstyle == 'numeric':
-                cur.executemany(
-                    'insert into %sbooze values (:1)' % self.table_prefix,
-                    largs
-                    )
-            elif self.driver.paramstyle == 'named':
-                cur.executemany(
-                    'insert into %sbooze values (:beer)' % self.table_prefix,
-                    margs
-                    )
-            elif self.driver.paramstyle == 'format':
-                cur.executemany(
-                    'insert into %sbooze values (%%s)' % self.table_prefix,
-                    largs
-                    )
-            elif self.driver.paramstyle == 'pyformat':
-                cur.executemany(
-                    'insert into %sbooze values (%%(beer)s)' % (
-                        self.table_prefix
-                        ),
-                    margs
-                    )
-            else:
-                self.fail('Unknown paramstyle')
-            self.failUnless(cur.rowcount in (-1,2),
-                'insert using cursor.executemany set cursor.rowcount to '
-                'incorrect value %r' % cur.rowcount
-                )
-            cur.execute('select name from %sbooze' % self.table_prefix)
-            res = cur.fetchall()
-            self.assertEqual(len(res),2,
-                'cursor.fetchall retrieved incorrect number of rows'
-                )
-            beers = [res[0][0],res[1][0]]
-            beers.sort()
-            self.assertEqual(beers[0],"Boag's",'incorrect data retrieved')
-            self.assertEqual(beers[1],"Cooper's",'incorrect data retrieved')
-        finally:
-            con.close()
-
-    def test_fetchone(self):
-        con = self._connect()
-        try:
-            cur = con.cursor()
-
-            # cursor.fetchone should raise an Error if called before
-            # executing a select-type query
-            self.assertRaises(self.driver.Error,cur.fetchone)
-
-            # cursor.fetchone should raise an Error if called after
-            # executing a query that cannnot return rows
-            self.executeDDL1(cur)
-            self.assertRaises(self.driver.Error,cur.fetchone)
-
-            cur.execute('select name from %sbooze' % self.table_prefix)
-            self.assertEqual(cur.fetchone(),None,
-                'cursor.fetchone should return None if a query retrieves '
-                'no rows'
-                )
-            self.failUnless(cur.rowcount in (-1,0))
-
-            # cursor.fetchone should raise an Error if called after
-            # executing a query that cannnot return rows
-            cur.execute("insert into %sbooze values ('Victoria Bitter')" % (
-                self.table_prefix
-                ))
-            self.assertRaises(self.driver.Error,cur.fetchone)
-
-            cur.execute('select name from %sbooze' % self.table_prefix)
-            r = cur.fetchone()
-            self.assertEqual(len(r),1,
-                'cursor.fetchone should have retrieved a single row'
-                )
-            self.assertEqual(r[0],'Victoria Bitter',
-                'cursor.fetchone retrieved incorrect data'
-                )
-            self.assertEqual(cur.fetchone(),None,
-                'cursor.fetchone should return None if no more rows available'
-                )
-            self.failUnless(cur.rowcount in (-1,1))
-        finally:
-            con.close()
-
-    samples = [
-        'Carlton Cold',
-        'Carlton Draft',
-        'Mountain Goat',
-        'Redback',
-        'Victoria Bitter',
-        'XXXX'
-        ]
-
-    def _populate(self):
-        ''' Return a list of sql commands to setup the DB for the fetch
-            tests.
-        '''
-        populate = [
-            "insert into %sbooze values ('%s')" % (self.table_prefix,s) 
-                for s in self.samples
-            ]
-        return populate
-
-    def test_fetchmany(self):
-        con = self._connect()
-        try:
-            cur = con.cursor()
-
-            # cursor.fetchmany should raise an Error if called without
-            #issuing a query
-            self.assertRaises(self.driver.Error,cur.fetchmany,4)
-
-            self.executeDDL1(cur)
-            for sql in self._populate():
-                cur.execute(sql)
-
-            cur.execute('select name from %sbooze' % self.table_prefix)
-            r = cur.fetchmany()
-            self.assertEqual(len(r),1,
-                'cursor.fetchmany retrieved incorrect number of rows, '
-                'default of arraysize is one.'
-                )
-            cur.arraysize=10
-            r = cur.fetchmany(3) # Should get 3 rows
-            self.assertEqual(len(r),3,
-                'cursor.fetchmany retrieved incorrect number of rows'
-                )
-            r = cur.fetchmany(4) # Should get 2 more
-            self.assertEqual(len(r),2,
-                'cursor.fetchmany retrieved incorrect number of rows'
-                )
-            r = cur.fetchmany(4) # Should be an empty sequence
-            self.assertEqual(len(r),0,
-                'cursor.fetchmany should return an empty sequence after '
-                'results are exhausted'
-            )
-            self.failUnless(cur.rowcount in (-1,6))
-
-            # Same as above, using cursor.arraysize
-            cur.arraysize=4
-            cur.execute('select name from %sbooze' % self.table_prefix)
-            r = cur.fetchmany() # Should get 4 rows
-            self.assertEqual(len(r),4,
-                'cursor.arraysize not being honoured by fetchmany'
-                )
-            r = cur.fetchmany() # Should get 2 more
-            self.assertEqual(len(r),2)
-            r = cur.fetchmany() # Should be an empty sequence
-            self.assertEqual(len(r),0)
-            self.failUnless(cur.rowcount in (-1,6))
-
-            cur.arraysize=6
-            cur.execute('select name from %sbooze' % self.table_prefix)
-            rows = cur.fetchmany() # Should get all rows
-            self.failUnless(cur.rowcount in (-1,6))
-            self.assertEqual(len(rows),6)
-            self.assertEqual(len(rows),6)
-            rows = [r[0] for r in rows]
-            rows.sort()
-          
-            # Make sure we get the right data back out
-            for i in range(0,6):
-                self.assertEqual(rows[i],self.samples[i],
-                    'incorrect data retrieved by cursor.fetchmany'
-                    )
-
-            rows = cur.fetchmany() # Should return an empty list
-            self.assertEqual(len(rows),0,
-                'cursor.fetchmany should return an empty sequence if '
-                'called after the whole result set has been fetched'
-                )
-            self.failUnless(cur.rowcount in (-1,6))
-
-            self.executeDDL2(cur)
-            cur.execute('select name from %sbarflys' % self.table_prefix)
-            r = cur.fetchmany() # Should get empty sequence
-            self.assertEqual(len(r),0,
-                'cursor.fetchmany should return an empty sequence if '
-                'query retrieved no rows'
-                )
-            self.failUnless(cur.rowcount in (-1,0))
-
-        finally:
-            con.close()
-
-    def test_fetchall(self):
-        con = self._connect()
-        try:
-            cur = con.cursor()
-            # cursor.fetchall should raise an Error if called
-            # without executing a query that may return rows (such
-            # as a select)
-            self.assertRaises(self.driver.Error, cur.fetchall)
-
-            self.executeDDL1(cur)
-            for sql in self._populate():
-                cur.execute(sql)
-
-            # cursor.fetchall should raise an Error if called
-            # after executing a a statement that cannot return rows
-            self.assertRaises(self.driver.Error,cur.fetchall)
-
-            cur.execute('select name from %sbooze' % self.table_prefix)
-            rows = cur.fetchall()
-            self.failUnless(cur.rowcount in (-1,len(self.samples)))
-            self.assertEqual(len(rows),len(self.samples),
-                'cursor.fetchall did not retrieve all rows'
-                )
-            rows = [r[0] for r in rows]
-            rows.sort()
-            for i in range(0,len(self.samples)):
-                self.assertEqual(rows[i],self.samples[i],
-                'cursor.fetchall retrieved incorrect rows'
-                )
-            rows = cur.fetchall()
-            self.assertEqual(
-                len(rows),0,
-                'cursor.fetchall should return an empty list if called '
-                'after the whole result set has been fetched'
-                )
-            self.failUnless(cur.rowcount in (-1,len(self.samples)))
-
-            self.executeDDL2(cur)
-            cur.execute('select name from %sbarflys' % self.table_prefix)
-            rows = cur.fetchall()
-            self.failUnless(cur.rowcount in (-1,0))
-            self.assertEqual(len(rows),0,
-                'cursor.fetchall should return an empty list if '
-                'a select query returns no rows'
-                )
-            
-        finally:
-            con.close()
-    
-    def test_mixedfetch(self):
-        con = self._connect()
-        try:
-            cur = con.cursor()
-            self.executeDDL1(cur)
-            for sql in self._populate():
-                cur.execute(sql)
-
-            cur.execute('select name from %sbooze' % self.table_prefix)
-            rows1  = cur.fetchone()
-            rows23 = cur.fetchmany(2)
-            rows4  = cur.fetchone()
-            rows56 = cur.fetchall()
-            self.failUnless(cur.rowcount in (-1,6))
-            self.assertEqual(len(rows23),2,
-                'fetchmany returned incorrect number of rows'
-                )
-            self.assertEqual(len(rows56),2,
-                'fetchall returned incorrect number of rows'
-                )
-
-            rows = [rows1[0]]
-            rows.extend([rows23[0][0],rows23[1][0]])
-            rows.append(rows4[0])
-            rows.extend([rows56[0][0],rows56[1][0]])
-            rows.sort()
-            for i in range(0,len(self.samples)):
-                self.assertEqual(rows[i],self.samples[i],
-                    'incorrect data retrieved or inserted'
-                    )
-        finally:
-            con.close()
-
-    def help_nextset_setUp(self,cur):
-        ''' Should create a procedure called deleteme
-            that returns two result sets, first the 
-	    number of rows in booze then "name from booze"
-        '''
-        raise NotImplementedError,'Helper not implemented'
-
-    def help_nextset_tearDown(self,cur):
-        'If cleaning up is needed after nextSetTest'
-        raise NotImplementedError,'Helper not implemented'
-
-    def test_nextset(self):
-        con = self._connect()
-        try:
-            cur = con.cursor()
-            if not hasattr(cur,'nextset'):
-                return
-
-            try:
-                self.executeDDL1(cur)
-                sql=self._populate()
-                for sql in self._populate():
-                    cur.execute(sql)
-
-                self.help_nextset_setUp(cur)
-
-                cur.callproc('more_than_one')
-                values = cur.fetchone()
-
-                assert cur.nextset()
-                values = cur.fetchall()
-
-                s = cur.nextset()
-                assert s == None,'No more return sets, should return None'
-            finally:
-                self.help_nextset_tearDown(cur)
-
-        finally:
-            con.close()
-
-    def test_arraysize(self):
-        # Not much here - rest of the tests for this are in test_fetchmany
-        con = self._connect()
-        try:
-            cur = con.cursor()
-            self.failUnless(hasattr(cur,'arraysize'),
-                'cursor.arraysize must be defined'
-                )
-        finally:
-            con.close()
-
-    def test_setinputsizes(self):
-        con = self._connect()
-        try:
-            cur = con.cursor()
-            cur.setinputsizes( (25,) )
-            self._paraminsert(cur) # Make sure cursor still works
-        finally:
-            con.close()
-
-    def test_setoutputsize_basic(self):
-        # Basic test is to make sure setoutputsize doesn't blow up
-        con = self._connect()
-        try:
-            cur = con.cursor()
-            cur.setoutputsize(1000)
-            cur.setoutputsize(2000,0)
-            self._paraminsert(cur) # Make sure the cursor still works
-        finally:
-            con.close()
-
-    def test_setoutputsize(self):
-        # Real test for setoutputsize is driver dependant
-        raise NotImplementedError,'Driver need to override this test'
-
-    def test_None(self):
-        con = self._connect()
-        try:
-            cur = con.cursor()
-            self.executeDDL1(cur)
-            cur.execute('insert into %sbooze values (NULL)' % self.table_prefix)
-            cur.execute('select name from %sbooze' % self.table_prefix)
-            r = cur.fetchall()
-            self.assertEqual(len(r),1)
-            self.assertEqual(len(r[0]),1)
-            self.assertEqual(r[0][0],None,'NULL value not returned as None')
-        finally:
-            con.close()
-
-    def test_Date(self):
-        d1 = self.driver.Date(2002,12,25)
-        d2 = self.driver.DateFromTicks(time.mktime((2002,12,25,0,0,0,0,0,0)))
-        # Can we assume this? API doesn't specify, but it seems implied
-        # self.assertEqual(str(d1),str(d2))
-
-    def test_Time(self):
-        t1 = self.driver.Time(13,45,30)
-        t2 = self.driver.TimeFromTicks(time.mktime((2001,1,1,13,45,30,0,0,0)))
-        # Can we assume this? API doesn't specify, but it seems implied
-        # self.assertEqual(str(t1),str(t2))
-
-    def test_Timestamp(self):
-        t1 = self.driver.Timestamp(2002,12,25,13,45,30)
-        t2 = self.driver.TimestampFromTicks(time.mktime((2002,12,25,13,45,30,0,0,0)))
-        # Can we assume this? API doesn't specify, but it seems implied
-        # self.assertEqual(str(t1),str(t2))
-
-    def test_Binary(self):
-        b = self.driver.Binary('Something')
-        b = self.driver.Binary('')
-
-    def test_STRING(self):
-        self.failUnless(hasattr(self.driver,'STRING'),
-            'module.STRING must be defined'
-            )
-
-    def test_BINARY(self):
-        self.failUnless(hasattr(self.driver,'BINARY'),
-            'module.BINARY must be defined.'
-            )
-
-    def test_NUMBER(self):
-        self.failUnless(hasattr(self.driver,'NUMBER'),
-            'module.NUMBER must be defined.'
-            )
-
-    def test_DATETIME(self):
-        self.failUnless(hasattr(self.driver,'DATETIME'),
-            'module.DATETIME must be defined.'
-            )
-
-    def test_ROWID(self):
-        self.failUnless(hasattr(self.driver,'ROWID'),
-            'module.ROWID must be defined.'
-            )

File tests/api/run.py

-import unittest
-
-# Django settings for testbackend project.
-def hack_path():
-    import os, sys
-    common_path = os.path.join(os.path.abspath(os.path.dirname(".")), "..")
-    sys.path.append(common_path)
-
-hack_path()
-from dbsettings import *
-
-from sqlserver_ado import dbapi
-import dbapi20
-
-class test_dbapi(dbapi20.DatabaseAPI20Test):
-    driver = dbapi
-    connect_args = [ make_connection_string() ]
-    
-    def _try_run(self, *args):
-        con = self._connect()
-        cur = None
-        try:
-            cur = con.cursor()
-            for arg in args:
-                cur.execute(arg)
-        finally:
-            try:
-                if cur is not None:
-                    cur.close()
-            except: pass
-            con.close()
-
-    def _try_run2(self, cur, *args):
-        for arg in args:
-            cur.execute(arg)
-    
-    # This should create the "lower" sproc.
-    def _callproc_setup(self, cur):
-        self._try_run2(cur,
-            """IF OBJECT_ID(N'[dbo].[to_lower]', N'P') IS NOT NULL DROP PROCEDURE [dbo].[to_lower]""",
-            """
-CREATE PROCEDURE to_lower
-    @input nvarchar(max)
-AS
-BEGIN
-    select LOWER(@input)
-END
-""",
-            )
-    
-    # This should create a sproc with a return value.
-    def _retval_setup(self, cur):
-        self._try_run2(cur,
-            """IF OBJECT_ID(N'[dbo].[add_one]', N'P') IS NOT NULL DROP PROCEDURE [dbo].[add_one]""",
-            """
-CREATE PROCEDURE add_one (@input int)
-AS
-BEGIN
-    return @input+1
-END
-""",
-            )
-
-    def test_retval(self):
-        con = self._connect()
-        try:
-            cur = con.cursor()
-            self._retval_setup(cur)
-            values = cur.callproc('add_one',(1,))
-            self.assertEqual(values[0], 1, 'input parameter should be left unchanged: %s' % (values[0],))
-            
-            self.assertEqual(cur.description, None,"No resultset was expected.")
-            self.assertEqual(cur.return_value, 2, "Invalid return value: %s" % (cur.return_value,))
-
-        finally:
-            con.close()
-
-    # This should create a sproc with an output parameter.
-    def _outparam_setup(self, cur):
-        self._try_run2(cur,
-            """IF OBJECT_ID(N'[dbo].[add_one_out]', N'P') IS NOT NULL DROP PROCEDURE [dbo].[add_one_out]""",
-            """
-CREATE PROCEDURE add_one_out (@input int, @output int OUTPUT)
-AS
-BEGIN
-    SET @output = @input+1
-END
-""",
-            )
-
-    def test_outparam(self):
-        con = self._connect()
-        try:
-            cur = con.cursor()
-            self._outparam_setup(cur)
-            values = cur.callproc('add_one_out',(1,None))
-            self.assertEqual(len(values), 2, 'expected 2 parameters')
-            self.assertEqual(values[0], 1, 'input parameter should be unchanged')
-            self.assertEqual(values[1], 2, 'output parameter should get new values')
-        finally:
-            con.close()            
-    
-    # Don't need setoutputsize tests.
-    def test_setoutputsize(self): 
-        pass
-        
-    def help_nextset_setUp(self,cur):
-        self._try_run2(cur,
-            """IF OBJECT_ID(N'[dbo].[more_than_one]', N'P') IS NOT NULL DROP PROCEDURE [dbo].[more_than_one]""",
-            """
-create procedure more_than_one
-as
-begin
-    select 1,2,3
-    select 4,5,6
-end
-""",
-            )
-
-    def help_nextset_tearDown(self,cur):
-        pass
-   
-suite = unittest.makeSuite(test_dbapi, 'test')
-testRunner = unittest.TextTestRunner(verbosity=9)
-print testRunner.run(suite)

File tests/startsql.cmd

+@echo off
+echo Starting SQL Server Express...
+net start SQLBrowser
+net start MSSQL$SQLEXPRESS
+set SQLINSTANCE=SQLEXPRESS

File tests/stopsql.cmd

+@echo off
+echo Stopping SQL Server Express...
+net stop MSSQL$SQLEXPRESS
+net stop SQLBrowser

File tests/test_main/apitest/__init__.py

Empty file added.

File tests/test_main/apitest/dbapi20.py

+#!/usr/bin/env python
+''' Python DB API 2.0 driver compliance unit test suite. 
+    
+    This software is Public Domain and may be used without restrictions.
+    http://www.stuartbishop.net/Software/DBAPI20TestSuite/index.html
+'''
+
+__rcs_id__  = '$Id: dbapi20.py,v 1.11 2005/01/02 02:41:01 zenzen Exp $'
+__version__ = '$Revision: 1.11 $'[11:-2]
+__author__ = 'Stuart Bishop <stuart@stuartbishop.net>'
+
+import unittest
+import time
+
+
+class DatabaseAPI20Test(unittest.TestCase):
+    ''' Test a database self.driver for DB API 2.0 compatibility.
+        This implementation tests Gadfly, but the TestCase
+        is structured so that other self.drivers can subclass this 
+        test case to ensure compiliance with the DB-API. It is 
+        expected that this TestCase may be expanded in the future
+        if ambiguities or edge conditions are discovered.
+
+        The 'Optional Extensions' are not yet being tested.
+
+        self.drivers should subclass this test, overriding setUp, tearDown,
+        self.driver, connect_args and connect_kw_args. Class specification
+        should be as follows:
+
+        import dbapi20 
+        class mytest(dbapi20.DatabaseAPI20Test):
+           [...] 
+
+        Don't 'import DatabaseAPI20Test from dbapi20', or you will
+        confuse the unit tester - just 'import dbapi20'.
+    '''
+
+    # The self.driver module. This should be the module where the 'connect'
+    # method is to be found
+    driver = None
+    connect_args = () # List of arguments to pass to connect
+    connect_kw_args = {} # Keyword arguments for connect
+    table_prefix = 'dbapi20test_' # If you need to specify a prefix for tables
+
+    ddl1 = 'create table %sbooze (name varchar(20))' % table_prefix
+    ddl2 = 'create table %sbarflys (name varchar(20))' % table_prefix
+    xddl1 = 'drop table %sbooze' % table_prefix
+    xddl2 = 'drop table %sbarflys' % table_prefix
+
+    lowerfunc = 'lower' # Name of stored procedure to convert string->lowercase
+        
+    # Some drivers may need to override these helpers, for example adding
+    # a 'commit' after the execute.
+    def executeDDL1(self,cursor):
+        cursor.execute(self.ddl1)
+
+    def executeDDL2(self,cursor):
+        cursor.execute(self.ddl2)
+
+    def setUp(self):
+        ''' self.drivers should override this method to perform required setup
+            if any is necessary, such as creating the database.
+        '''
+        pass
+
+    def tearDown(self):
+        ''' self.drivers should override this method to perform required cleanup
+            if any is necessary, such as deleting the test database.
+            The default drops the tables that may be created.
+        '''
+        con = self._connect()
+        try:
+            cur = con.cursor()
+            for i, ddl in enumerate((self.xddl1,self.xddl2)):
+                try: 
+                    cur.execute(ddl)
+                    con.commit()
+                except self.driver.Error: 
+                    # Assume table didn't exist. Other tests will check if
+                    # execute is busted.
+                    pass
+        finally:
+            con.close()
+
+    def _connect(self):
+        try:
+            return self.driver.connect(*self.connect_args,**self.connect_kw_args)
+        except AttributeError:
+            self.fail("No connect method found in self.driver module")
+
+    def test_connect(self):
+        con = self._connect()
+        con.close()
+
+    def test_apilevel(self):
+        try:
+            # Must exist and equal 2.0
+            apilevel = self.driver.apilevel
+            self.assertEqual(apilevel,'2.0')
+        except AttributeError:
+            self.fail("Driver doesn't define apilevel")
+
+    def test_threadsafety(self):
+        try:
+            # Must exist
+            threadsafety = self.driver.threadsafety
+            # Must be a valid value
+            self.failUnless(threadsafety in (0,1,2,3))
+        except AttributeError:
+            self.fail("Driver doesn't define threadsafety")
+
+    def test_paramstyle(self):
+        try:
+            # Must exist
+            paramstyle = self.driver.paramstyle
+            # Must be a valid value
+            self.failUnless(paramstyle in ('qmark','numeric','named','format','pyformat'))
+        except AttributeError:
+            self.fail("Driver doesn't define paramstyle")
+
+    def test_Exceptions(self):
+        # Make sure required exceptions exist, and are in the
+        # defined heirarchy.
+        self.failUnless(issubclass(self.driver.Warning,StandardError))
+        self.failUnless(issubclass(self.driver.Error,StandardError))
+        self.failUnless(issubclass(self.driver.InterfaceError,self.driver.Error))
+        self.failUnless(issubclass(self.driver.DatabaseError,self.driver.Error))
+        self.failUnless(issubclass(self.driver.OperationalError,self.driver.Error))
+        self.failUnless(issubclass(self.driver.IntegrityError,self.driver.Error))
+        self.failUnless(issubclass(self.driver.InternalError,self.driver.Error))
+        self.failUnless(issubclass(self.driver.ProgrammingError,self.driver.Error))
+        self.failUnless(issubclass(self.driver.NotSupportedError,self.driver.Error))
+
+    def test_ExceptionsAsConnectionAttributes(self):
+        # OPTIONAL EXTENSION
+        # Test for the optional DB API 2.0 extension, where the exceptions
+        # are exposed as attributes on the Connection object
+        # I figure this optional extension will be implemented by any
+        # driver author who is using this test suite, so it is enabled
+        # by default.
+        con = self._connect()
+        drv = self.driver
+        self.failUnless(con.Warning is drv.Warning)
+        self.failUnless(con.Error is drv.Error)
+        self.failUnless(con.InterfaceError is drv.InterfaceError)
+        self.failUnless(con.DatabaseError is drv.DatabaseError)
+        self.failUnless(con.OperationalError is drv.OperationalError)
+        self.failUnless(con.IntegrityError is drv.IntegrityError)
+        self.failUnless(con.InternalError is drv.InternalError)
+        self.failUnless(con.ProgrammingError is drv.ProgrammingError)
+        self.failUnless(con.NotSupportedError is drv.NotSupportedError)
+
+
+    def test_commit(self):
+        con = self._connect()
+        try:
+            # Commit must work, even if it doesn't do anything
+            con.commit()
+        finally:
+            con.close()
+
+    def test_rollback(self):
+        con = self._connect()
+        # If rollback is defined, it should either work or throw
+        # the documented exception
+        if hasattr(con,'rollback'):
+            try:
+                con.rollback()
+            except self.driver.NotSupportedError:
+                pass
+    
+    def test_cursor(self):
+        con = self._connect()
+        try:
+            cur = con.cursor()
+        finally:
+            con.close()
+
+    def test_cursor_isolation(self):
+        con = self._connect()
+        try:
+            # Make sure cursors created from the same connection have
+            # the documented transaction isolation level
+            cur1 = con.cursor()
+            cur2 = con.cursor()
+            self.executeDDL1(cur1)
+            cur1.execute("insert into %sbooze values ('Victoria Bitter')" % (self.table_prefix))
+            cur2.execute("select name from %sbooze" % self.table_prefix)
+            booze = cur2.fetchall()
+            self.assertEqual(len(booze),1)
+            self.assertEqual(len(booze[0]),1)
+            self.assertEqual(booze[0][0],'Victoria Bitter')
+        finally:
+            con.close()
+
+    def test_description(self):
+        con = self._connect()
+        try:
+            cur = con.cursor()
+            self.executeDDL1(cur)
+            self.assertEqual(cur.description,None,
+                'cursor.description should be none after executing a '
+                'statement that can return no rows (such as DDL)'
+                )
+            cur.execute('select name from %sbooze' % self.table_prefix)
+            self.assertEqual(len(cur.description),1,
+                'cursor.description describes too many columns'
+                )
+            self.assertEqual(len(cur.description[0]),7,
+                'cursor.description[x] tuples must have 7 elements'
+                )
+            self.assertEqual(cur.description[0][0].lower(),'name',
+                'cursor.description[x][0] must return column name'
+                )
+            self.assertEqual(cur.description[0][1],self.driver.STRING,
+                'cursor.description[x][1] must return column type. Got %r'
+                    % cur.description[0][1]
+                )
+
+            # Make sure self.description gets reset
+            self.executeDDL2(cur)
+            self.assertEqual(cur.description,None,
+                'cursor.description not being set to None when executing '
+                'no-result statements (eg. DDL)'
+                )
+        finally:
+            con.close()
+
+    def test_rowcount(self):
+        con = self._connect()
+        try:
+            cur = con.cursor()
+            self.executeDDL1(cur)
+            self.assertEqual(cur.rowcount,-1,
+                'cursor.rowcount should be -1 after executing no-result '
+                'statements'
+                )
+            cur.execute("insert into %sbooze values ('Victoria Bitter')" % (
+                self.table_prefix
+                ))
+            self.failUnless(cur.rowcount in (-1,1),
+                'cursor.rowcount should == number or rows inserted, or '
+                'set to -1 after executing an insert statement'
+                )
+            cur.execute("select name from %sbooze" % self.table_prefix)
+            self.failUnless(cur.rowcount in (-1,1),
+                'cursor.rowcount should == number of rows returned, or '
+                'set to -1 after executing a select statement'
+                )
+            self.executeDDL2(cur)
+            self.assertEqual(cur.rowcount,-1,
+                'cursor.rowcount not being reset to -1 after executing '
+                'no-result statements'
+                )
+        finally:
+            con.close()
+
+    def _callproc_setup(self):
+        pass
+
+    def test_callproc(self):
+        con = self._connect()
+        try:
+            cur = con.cursor()
+            self._callproc_setup(cur)
+            # Execute and get new parameters
+            values = cur.callproc('to_lower', ('FOO',))
+            self.assertEqual(len(values),1, 'callproc didnt return the input values')
+            self.assertEqual(values[0],'FOO', 'input-only values shouldnt change')
+            
+            r = cur.fetchall()
+            self.assertEqual(len(r),1,'callproc produced no result set')
+            self.assertEqual(len(r[0]),1, 'callproc produced invalid result set')
+            self.assertEqual(r[0][0],'foo', 'callproc produced invalid results')
+        finally:
+            con.close()
+
+    def test_close(self):
+        con = self._connect()
+        try:
+            cur = con.cursor()
+        finally:
+            con.close()
+
+        # cursor.execute should raise an Error if called after connection
+        # closed
+        self.assertRaises(self.driver.Error,self.executeDDL1,cur)
+
+        # connection.commit should raise an Error if called after connection'
+        # closed.'
+        self.assertRaises(self.driver.Error,con.commit)
+
+        # connection.close should raise an Error if called more than once
+        self.assertRaises(self.driver.Error,con.close)
+
+    def test_execute(self):
+        con = self._connect()
+        try:
+            cur = con.cursor()
+            self._paraminsert(cur)
+        finally:
+            con.close()
+
+    def _paraminsert(self,cur):
+        self.executeDDL1(cur)
+        cur.execute("insert into %sbooze values ('Victoria Bitter')" % (
+            self.table_prefix
+            ))
+        self.failUnless(cur.rowcount in (-1,1))
+
+        if self.driver.paramstyle == 'qmark':
+            cur.execute(
+                'insert into %sbooze values (?)' % self.table_prefix,
+                ("Cooper's",)
+                )
+        elif self.driver.paramstyle == 'numeric':
+            cur.execute(
+                'insert into %sbooze values (:1)' % self.table_prefix,
+                ("Cooper's",)
+                )
+        elif self.driver.paramstyle == 'named':
+            cur.execute(
+                'insert into %sbooze values (:beer)' % self.table_prefix, 
+                {'beer':"Cooper's"}
+                )
+        elif self.driver.paramstyle == 'format':
+            cur.execute(
+                'insert into %sbooze values (%%s)' % self.table_prefix,
+                ("Cooper's",)
+                )
+        elif self.driver.paramstyle == 'pyformat':
+            cur.execute(
+                'insert into %sbooze values (%%(beer)s)' % self.table_prefix,
+                {'beer':"Cooper's"}
+                )
+        else:
+            self.fail('Invalid paramstyle')
+        self.failUnless(cur.rowcount in (-1,1))
+
+        cur.execute('select name from %sbooze' % self.table_prefix)
+        res = cur.fetchall()
+        self.assertEqual(len(res),2,'cursor.fetchall returned too few rows')
+        beers = [res[0][0],res[1][0]]
+        beers.sort()
+        self.assertEqual(beers[0],"Cooper's",
+            'cursor.fetchall retrieved incorrect data, or data inserted '
+            'incorrectly'
+            )
+        self.assertEqual(beers[1],"Victoria Bitter",
+            'cursor.fetchall retrieved incorrect data, or data inserted '
+            'incorrectly'
+            )
+
+    def test_executemany(self):
+        con = self._connect()
+        try:
+            cur = con.cursor()
+            self.executeDDL1(cur)
+            largs = [ ("Cooper's",) , ("Boag's",) ]
+            margs = [ {'beer': "Cooper's"}, {'beer': "Boag's"} ]
+            if self.driver.paramstyle == 'qmark':
+                cur.executemany(
+                    'insert into %sbooze values (?)' % self.table_prefix,
+                    largs
+                    )
+            elif self.driver.paramstyle == 'numeric':
+                cur.executemany(
+                    'insert into %sbooze values (:1)' % self.table_prefix,
+                    largs
+                    )
+            elif self.driver.paramstyle == 'named':
+                cur.executemany(
+                    'insert into %sbooze values (:beer)' % self.table_prefix,
+                    margs
+                    )
+            elif self.driver.paramstyle == 'format':
+                cur.executemany(
+                    'insert into %sbooze values (%%s)' % self.table_prefix,
+                    largs
+                    )
+            elif self.driver.paramstyle == 'pyformat':
+                cur.executemany(
+                    'insert into %sbooze values (%%(beer)s)' % (
+                        self.table_prefix
+                        ),
+                    margs
+                    )
+            else:
+                self.fail('Unknown paramstyle')
+            self.failUnless(cur.rowcount in (-1,2),
+                'insert using cursor.executemany set cursor.rowcount to '
+                'incorrect value %r' % cur.rowcount
+                )
+            cur.execute('select name from %sbooze' % self.table_prefix)
+            res = cur.fetchall()
+            self.assertEqual(len(res),2,
+                'cursor.fetchall retrieved incorrect number of rows'
+                )
+            beers = [res[0][0],res[1][0]]
+            beers.sort()
+            self.assertEqual(beers[0],"Boag's",'incorrect data retrieved')
+            self.assertEqual(beers[1],"Cooper's",'incorrect data retrieved')
+        finally:
+            con.close()
+
+    def test_fetchone(self):
+        con = self._connect()
+        try:
+            cur = con.cursor()
+
+            # cursor.fetchone should raise an Error if called before
+            # executing a select-type query
+            self.assertRaises(self.driver.Error,cur.fetchone)
+
+            # cursor.fetchone should raise an Error if called after
+            # executing a query that cannnot return rows
+            self.executeDDL1(cur)
+            self.assertRaises(self.driver.Error,cur.fetchone)
+
+            cur.execute('select name from %sbooze' % self.table_prefix)
+            self.assertEqual(cur.fetchone(),None,
+                'cursor.fetchone should return None if a query retrieves '
+                'no rows'
+                )
+            self.failUnless(cur.rowcount in (-1,0))
+
+            # cursor.fetchone should raise an Error if called after
+            # executing a query that cannnot return rows
+            cur.execute("insert into %sbooze values ('Victoria Bitter')" % (
+                self.table_prefix
+                ))
+            self.assertRaises(self.driver.Error,cur.fetchone)
+
+            cur.execute('select name from %sbooze' % self.table_prefix)
+            r = cur.fetchone()
+            self.assertEqual(len(r),1,
+                'cursor.fetchone should have retrieved a single row'
+                )
+            self.assertEqual(r[0],'Victoria Bitter',
+                'cursor.fetchone retrieved incorrect data'
+                )
+            self.assertEqual(cur.fetchone(),None,
+                'cursor.fetchone should return None if no more rows available'
+                )
+            self.failUnless(cur.rowcount in (-1,1))
+        finally:
+            con.close()
+
+    samples = [
+        'Carlton Cold',
+        'Carlton Draft',
+        'Mountain Goat',
+        'Redback',
+        'Victoria Bitter',
+        'XXXX'
+        ]
+
+    def _populate(self):
+        ''' Return a list of sql commands to setup the DB for the fetch
+            tests.
+        '''
+        populate = [
+            "insert into %sbooze values ('%s')" % (self.table_prefix,s) 
+                for s in self.samples
+            ]
+        return populate
+
+    def test_fetchmany(self):
+        con = self._connect()
+        try:
+            cur = con.cursor()
+
+            # cursor.fetchmany should raise an Error if called without
+            #issuing a query
+            self.assertRaises(self.driver.Error,cur.fetchmany,4)
+
+            self.executeDDL1(cur)
+            for sql in self._populate():
+                cur.execute(sql)
+
+            cur.execute('select name from %sbooze' % self.table_prefix)
+            r = cur.fetchmany()
+            self.assertEqual(len(r),1,
+                'cursor.fetchmany retrieved incorrect number of rows, '
+                'default of arraysize is one.'
+                )
+            cur.arraysize=10
+            r = cur.fetchmany(3) # Should get 3 rows
+            self.assertEqual(len(r),3,
+                'cursor.fetchmany retrieved incorrect number of rows'
+                )
+            r = cur.fetchmany(4) # Should get 2 more
+            self.assertEqual(len(r),2,
+                'cursor.fetchmany retrieved incorrect number of rows'
+                )
+            r = cur.fetchmany(4) # Should be an empty sequence
+            self.assertEqual(len(r),0,
+                'cursor.fetchmany should return an empty sequence after '
+                'results are exhausted'
+            )
+            self.failUnless(cur.rowcount in (-1,6))
+
+            # Same as above, using cursor.arraysize
+            cur.arraysize=4
+            cur.execute('select name from %sbooze' % self.table_prefix)
+            r = cur.fetchmany() # Should get 4 rows
+            self.assertEqual(len(r),4,
+                'cursor.arraysize not being honoured by fetchmany'
+                )
+            r = cur.fetchmany() # Should get 2 more
+            self.assertEqual(len(r),2)
+            r = cur.fetchmany() # Should be an empty sequence
+            self.assertEqual(len(r),0)
+            self.failUnless(cur.rowcount in (-1,6))
+
+            cur.arraysize=6
+            cur.execute('select name from %sbooze' % self.table_prefix)
+            rows = cur.fetchmany() # Should get all rows
+            self.failUnless(cur.rowcount in (-1,6))
+            self.assertEqual(len(rows),6)
+            self.assertEqual(len(rows),6)
+            rows = [r[0] for r in rows]
+            rows.sort()
+          
+            # Make sure we get the right data back out
+            for i in range(0,6):
+                self.assertEqual(rows[i],self.samples[i],
+                    'incorrect data retrieved by cursor.fetchmany'
+                    )
+
+            rows = cur.fetchmany() # Should return an empty list
+            self.assertEqual(len(rows),0,
+                'cursor.fetchmany should return an empty sequence if '
+                'called after the whole result set has been fetched'
+                )
+            self.failUnless(cur.rowcount in (-1,6))
+
+            self.executeDDL2(cur)
+            cur.execute('select name from %sbarflys' % self.table_prefix)
+            r = cur.fetchmany() # Should get empty sequence
+            self.assertEqual(len(r),0,
+                'cursor.fetchmany should return an empty sequence if '
+                'query retrieved no rows'
+                )
+            self.failUnless(cur.rowcount in (-1,0))
+
+        finally:
+            con.close()
+
+    def test_fetchall(self):
+        con = self._connect()
+        try:
+            cur = con.cursor()
+            # cursor.fetchall should raise an Error if called
+            # without executing a query that may return rows (such
+            # as a select)
+            self.assertRaises(self.driver.Error, cur.fetchall)
+
+            self.executeDDL1(cur)
+            for sql in self._populate():
+                cur.execute(sql)
+
+            # cursor.fetchall should raise an Error if called
+            # after executing a a statement that cannot return rows
+            self.assertRaises(self.driver.Error,cur.fetchall)
+
+            cur.execute('select name from %sbooze' % self.table_prefix)
+            rows = cur.fetchall()
+            self.failUnless(cur.rowcount in (-1,len(self.samples)))
+            self.assertEqual(len(rows),len(self.samples),
+                'cursor.fetchall did not retrieve all rows'
+                )
+            rows = [r[0] for r in rows]
+            rows.sort()
+            for i in range(0,len(self.samples)):
+                self.assertEqual(rows[i],self.samples[i],
+                'cursor.fetchall retrieved incorrect rows'
+                )
+            rows = cur.fetchall()
+            self.assertEqual(
+                len(rows),0,
+                'cursor.fetchall should return an empty list if called '
+                'after the whole result set has been fetched'
+                )
+            self.failUnless(cur.rowcount in (-1,len(self.samples)))
+
+            self.executeDDL2(cur)
+            cur.execute('select name from %sbarflys' % self.table_prefix)
+            rows = cur.fetchall()
+            self.failUnless(cur.rowcount in (-1,0))
+            self.assertEqual(len(rows),0,
+                'cursor.fetchall should return an empty list if '
+                'a select query returns no rows'
+                )
+            
+        finally:
+            con.close()
+    
+    def test_mixedfetch(self):
+        con = self._connect()
+        try:
+            cur = con.cursor()
+            self.executeDDL1(cur)
+            for sql in self._populate():
+                cur.execute(sql)
+
+            cur.execute('select name from %sbooze' % self.table_prefix)
+            rows1  = cur.fetchone()
+            rows23 = cur.fetchmany(2)
+            rows4  = cur.fetchone()
+            rows56 = cur.fetchall()
+            self.failUnless(cur.rowcount in (-1,6))
+            self.assertEqual(len(rows23),2,
+                'fetchmany returned incorrect number of rows'
+                )
+            self.assertEqual(len(rows56),2,
+                'fetchall returned incorrect number of rows'
+                )
+
+            rows = [rows1[0]]
+            rows.extend([rows23[0][0],rows23[1][0]])
+            rows.append(rows4[0])
+            rows.extend([rows56[0][0],rows56[1][0]])
+            rows.sort()
+            for i in range(0,len(self.samples)):
+                self.assertEqual(rows[i],self.samples[i],
+                    'incorrect data retrieved or inserted'
+                    )
+        finally:
+            con.close()
+
+    def help_nextset_setUp(self,cur):
+        ''' Should create a procedure called deleteme
+            that returns two result sets, first the 
+	    number of rows in booze then "name from booze"
+        '''
+        raise NotImplementedError,'Helper not implemented'
+
+    def help_nextset_tearDown(self,cur):
+        'If cleaning up is needed after nextSetTest'
+        raise NotImplementedError,'Helper not implemented'
+
+    def test_nextset(self):
+        con = self._connect()
+        try:
+            cur = con.cursor()
+            if not hasattr(cur,'nextset'):
+                return
+
+            try:
+                self.executeDDL1(cur)
+                sql=self._populate()
+                for sql in self._populate():
+                    cur.execute(sql)
+
+                self.help_nextset_setUp(cur)
+
+                cur.callproc('more_than_one')
+                values = cur.fetchone()
+
+                assert cur.nextset()
+                values = cur.fetchall()
+
+                s = cur.nextset()
+                assert s == None,'No more return sets, should return None'
+            finally:
+                self.help_nextset_tearDown(cur)
+
+        finally:
+            con.close()
+
+    def test_arraysize(self):
+        # Not much here - rest of the tests for this are in test_fetchmany
+        con = self._connect()
+        try:
+            cur = con.cursor()
+            self.failUnless(hasattr(cur,'arraysize'),
+                'cursor.arraysize must be defined'
+                )
+        finally:
+            con.close()
+
+    def test_setinputsizes(self):
+        con = self._connect()
+        try:
+            cur = con.cursor()
+            cur.setinputsizes( (25,) )
+            self._paraminsert(cur) # Make sure cursor still works
+        finally:
+            con.close()
+
+    def test_setoutputsize_basic(self):
+        # Basic test is to make sure setoutputsize doesn't blow up
+        con = self._connect()
+        try:
+            cur = con.cursor()
+            cur.setoutputsize(1000)
+            cur.setoutputsize(2000,0)
+            self._paraminsert(cur) # Make sure the cursor still works
+        finally:
+            con.close()
+
+    def test_setoutputsize(self):
+        # Real test for setoutputsize is driver dependant
+        raise NotImplementedError,'Driver need to override this test'
+
+    def test_None(self):
+        con = self._connect()
+        try:
+            cur = con.cursor()
+            self.executeDDL1(cur)
+            cur.execute('insert into %sbooze values (NULL)' % self.table_prefix)
+            cur.execute('select name from %sbooze' % self.table_prefix)
+            r = cur.fetchall()
+            self.assertEqual(len(r),1)
+            self.assertEqual(len(r[0]),1)
+            self.assertEqual(r[0][0],None,'NULL value not returned as None')
+        finally:
+            con.close()
+
+    def test_Date(self):
+        d1 = self.driver.Date(2002,12,25)
+        d2 = self.driver.DateFromTicks(time.mktime((2002,12,25,0,0,0,0,0,0)))
+        # Can we assume this? API doesn't specify, but it seems implied
+        # self.assertEqual(str(d1),str(d2))
+
+    def test_Time(self):
+        t1 = self.driver.Time(13,45,30)
+        t2 = self.driver.TimeFromTicks(time.mktime((2001,1,1,13,45,30,0,0,0)))
+        # Can we assume this? API doesn't specify, but it seems implied
+        # self.assertEqual(str(t1),str(t2))
+
+    def test_Timestamp(self):
+        t1 = self.driver.Timestamp(2002,12,25,13,45,30)
+        t2 = self.driver.TimestampFromTicks(time.mktime((2002,12,25,13,45,30,0,0,0)))
+        # Can we assume this? API doesn't specify, but it seems implied
+        # self.assertEqual(str(t1),str(t2))
+
+    def test_Binary(self):
+        b = self.driver.Binary('Something')
+        b = self.driver.Binary('')
+
+    def test_STRING(self):
+        self.failUnless(hasattr(self.driver,'STRING'),
+            'module.STRING must be defined'
+            )
+
+    def test_BINARY(self):
+        self.failUnless(hasattr(self.driver,'BINARY'),
+            'module.BINARY must be defined.'
+            )
+
+    def test_NUMBER(self):
+        self.failUnless(hasattr(self.driver,'NUMBER'),
+            'module.NUMBER must be defined.'
+            )
+
+    def test_DATETIME(self):
+        self.failUnless(hasattr(self.driver,'DATETIME'),
+            'module.DATETIME must be defined.'
+            )
+
+    def test_ROWID(self):
+        self.failUnless(hasattr(self.driver,'ROWID'),
+            'module.ROWID must be defined.'
+            )

File tests/test_main/apitest/models.py

+from django.db import models
+
+# Create your models here.

File tests/test_main/apitest/tests.py

+# Base is used to get connection string using Django settings
+from sqlserver_ado import base
+# Internal dbapi module
+from sqlserver_ado import dbapi
+
+# Base unit test
+import dbapi20
+
+class test_dbapi(dbapi20.DatabaseAPI20Test):
+    driver = dbapi
+    connect_args = [ base.connection_string_from_settings() ]
+    
+#    def _connect(self):
+#        return connection
+    
+    def _try_run(self, *args):
+        con = self._connect()
+        cur = None
+        try:
+            cur = con.cursor()
+            for arg in args:
+                cur.execute(arg)
+        finally:
+            try:
+                if cur is not None:
+                    cur.close()
+            except: pass
+            con.close()
+
+    def _try_run2(self, cur, *args):
+        for arg in args:
+            cur.execute(arg)
+    
+    # This should create the "lower" sproc.
+    def _callproc_setup(self, cur):
+        self._try_run2(cur,
+            """IF OBJECT_ID(N'[dbo].[to_lower]', N'P') IS NOT NULL DROP PROCEDURE [dbo].[to_lower]""",
+            """
+CREATE PROCEDURE to_lower
+    @input nvarchar(max)
+AS
+BEGIN
+    select LOWER(@input)
+END
+""",
+            )
+    
+    # This should create a sproc with a return value.
+    def _retval_setup(self, cur):
+        self._try_run2(cur,
+            """IF OBJECT_ID(N'[dbo].[add_one]', N'P') IS NOT NULL DROP PROCEDURE [dbo].[add_one]""",
+            """
+CREATE PROCEDURE add_one (@input int)
+AS
+BEGIN
+    return @input+1
+END
+""",
+            )
+
+    def test_retval(self):
+        con = self._connect()
+        try:
+            cur = con.cursor()
+            self._retval_setup(cur)
+            values = cur.callproc('add_one',(1,))
+            self.assertEqual(values[0], 1, 'input parameter should be left unchanged: %s' % (values[0],))
+            
+            self.assertEqual(cur.description, None,"No resultset was expected.")
+            self.assertEqual(cur.return_value, 2, "Invalid return value: %s" % (cur.return_value,))
+
+        finally:
+            con.close()
+
+    # This should create a sproc with an output parameter.
+    def _outparam_setup(self, cur):
+        self._try_run2(cur,
+            """IF OBJECT_ID(N'[dbo].[add_one_out]', N'P') IS NOT NULL DROP PROCEDURE [dbo].[add_one_out]""",
+            """
+CREATE PROCEDURE add_one_out (@input int, @output int OUTPUT)
+AS
+BEGIN
+    SET @output = @input+1
+END
+""",
+            )
+
+    def test_outparam(self):
+        con = self._connect()
+        try:
+            cur = con.cursor()
+            self._outparam_setup(cur)
+            values = cur.callproc('add_one_out',(1,None))
+            self.assertEqual(len(values), 2, 'expected 2 parameters')
+            self.assertEqual(values[0], 1, 'input parameter should be unchanged')
+            self.assertEqual(values[1], 2, 'output parameter should get new values')
+        finally:
+            con.close()            
+    
+    # Don't need setoutputsize tests.
+    def test_setoutputsize(self): 
+        pass
+        
+    def help_nextset_setUp(self,cur):
+        self._try_run2(cur,
+            """IF OBJECT_ID(N'[dbo].[more_than_one]', N'P') IS NOT NULL DROP PROCEDURE [dbo].[more_than_one]""",
+            """
+create procedure more_than_one
+as
+begin
+    select 1,2,3
+    select 4,5,6
+end
+""",
+            )
+
+    def help_nextset_tearDown(self,cur):
+        pass

File tests/test_main/apitest/views.py

+# Create your views here.

File tests/test_main/settings.py

 )
 
 INSTALLED_APPS = (
+    'apitest',
     'regressiontests',
 	'slicing',
 	'nulls',