Commits

Anonymous committed 301fabb

Completed previously missed patches from tickets 422 and 415
get unit tests to work with pyodbc [ticket:481]
fix blank password on adodbapi [ticket:371]

Comments (0)

Files changed (8)

   - fix for non-integer relationships [ticket:473]
   - DB-API module now selectable at run-time [ticket:419]
   - preliminary support for pyodbc (Yay!) [ticket:419]
-  - now passes more unit tests [ticket:422]
+  - now passes many more unit tests [tickets:422, 481, 415]
   - better unittest compatibility with ANSI functions [ticket:479]
   - improved support for implicit sequence PK columns with auto-insert [ticket:415]
+  - fix for blank password in adodbapi [ticket:371]
+  - fixes to get unit tests working with pyodbc [ticket:481]
+
   
 0.3.4
 - general:

lib/sqlalchemy/databases/mssql.py

     import adodbapi as dbmodule
     # ADODBAPI has a non-standard Connection method
     connect = dbmodule.Connection
-    make_connect_string = lambda keys: \
-        [["Provider=SQLOLEDB;Data Source=%s;User Id=%s;Password=%s;Initial Catalog=%s" % (
-            keys.get("host"), keys.get("user"), keys.get("password"), keys.get("database"))], {}]
+    def make_connect_string(keys):
+        return  [["Provider=SQLOLEDB;Data Source=%s;User Id=%s;Password=%s;Initial Catalog=%s" % (
+            keys.get("host"), keys.get("user"), keys.get("password", ""), keys.get("database"))], {}]        
     sane_rowcount = True
     dialect = MSSQLDialect
     colspecs[sqltypes.Unicode] = AdoMSUnicode
     global dbmodule, connect, make_connect_string, do_commit, sane_rowcount, dialect, colspecs, ischema_names
     import pyodbc as dbmodule
     connect = dbmodule.connect
-    make_connect_string = lambda keys: \
-        [["Driver={SQL Server};Server=%s;UID=%s;PWD=%s;Database=%s" % (
-            keys.get("host"), keys.get("user"), keys.get("password"), keys.get("database"))], {}]
+    def make_connect_string(keys):
+        return [["Driver={SQL Server};Server=%s;UID=%s;PWD=%s;Database=%s" % (
+            keys.get("host"), keys.get("user"), keys.get("password", ""), keys.get("database"))], {}]        
     do_commit = True
-    sane_rowcount = True
-    dialect = MSSQLDialect # XXX - find out whether this needs to be tweaked for pyodbc
+    sane_rowcount = False
+    dialect = MSSQLDialect
     import warnings
     warnings.warn('pyodbc support in sqlalchemy.databases.mssql is extremely experimental - use at your own risk.')
-    colspecs[sqltypes.Unicode] = MSUnicode # Ado?
-    ischema_names['nvarchar'] = MSUnicode # Ado?
+    colspecs[sqltypes.Unicode] = AdoMSUnicode
+    ischema_names['nvarchar'] = AdoMSUnicode
 
 def use_default():
     import_errors = []
             use_adodbapi,
             use_pymssql,
             use_pyodbc,
-        ]:
+            ]:
         if try_use(f):
             return dbmodule # informational return, so the user knows what he's using.
     else:
 
 
 class MSSQLDialect(ansisql.ANSIDialect):
-    def __init__(self, module=None, auto_identity_insert=False, encoding=None, **params):
+    def __init__(self, module=None, auto_identity_insert=False, **params):
         self.module = module or dbmodule or use_default()
         self.auto_identity_insert = auto_identity_insert
-        ansisql.ANSIDialect.__init__(self, encoding=encoding, **params)
+        ansisql.ANSIDialect.__init__(self, **params)
         self.set_default_schema_name("dbo")
         
     def create_connect_args(self, url):
     def last_inserted_ids(self):
         return self.context.last_inserted_ids
             
+    def do_execute(self, cursor, statement, params, **kwargs):
+        if params == {}:
+            params = ()
+        super(MSSQLDialect, self).do_execute(cursor, statement, params, **kwargs)
+
     def _execute(self, c, statement, parameters):
         try:
+            if parameters == {}:
+                parameters = ()
             c.execute(statement, parameters)
             self.context.rowcount = c.rowcount
             c.DBPROP_COMMITPRESERVE = "Y"
         func.name = self.function_rewrites.get(func.name, func.name)
         super(MSSQLCompiler, self).visit_function(func)            
 
+    def for_update_clause(self, select):
+        # "FOR UPDATE" is only allowed on "DECLARE CURSOR" which SQLAlchemy doesn't use
+        return ''
+
 class MSSQLSchemaGenerator(ansisql.ANSISchemaGenerator):
     def get_column_specification(self, column, **kwargs):
         colspec = self.preparer.format_column(column) + " " + column.type.engine_impl(self.engine).get_col_spec()
         #TODO: determin MSSQL's case folding rules
         return value
 
+use_default()
+

test/engine/reflection.py

         table = Table(
             'engine_multi', meta, 
             Column('multi_id', Integer, Sequence('multi_id_seq'), primary_key=True),
-            Column('multi_rev', Integer, Sequence('multi_rev_seq'), primary_key=True),
+            Column('multi_rev', Integer, primary_key=True),
             Column('name', String(50), nullable=False),
             Column('val', String(100))
         )

test/orm/inheritance5.py

     def define_tables(self, metadata):
         global table_Employee, table_Engineer, table_Manager
         table_Employee = Table( 'Employee', metadata,
-            Column( 'name', type= String, ),
+            Column( 'name', type= String(100), ),
             Column( 'id', primary_key= True, type= Integer, ),
-            Column( 'atype', type= String, ),
+            Column( 'atype', type= String(100), ),
         )
 
         table_Engineer = Table( 'Engineer', metadata,
-            Column( 'machine', type= String, ),
+            Column( 'machine', type= String(100), ),
             Column( 'id', Integer, ForeignKey( 'Employee.id', ), primary_key= True, ),
         )
 
         table_Manager = Table( 'Manager', metadata,
-            Column( 'duties', type= String, ),
+            Column( 'duties', type= String(100), ),
             Column( 'id', Integer, ForeignKey( 'Engineer.id', ), primary_key= True, ),
         )
     def test_threelevels(self):

test/orm/memusage.py

         table2 = Table("mytable2", metadata, 
             Column('col1', Integer, primary_key=True),
             Column('col2', String(30)),
-            Column('col3', String(30), ForeignKey("mytable.col1"))
+            Column('col3', Integer, ForeignKey("mytable.col1"))
             )
     
         metadata.create_all()
         clear_mappers()
     
 if __name__ == '__main__':
-    testbase.main()
+    testbase.main()

test/sql/defaults.py

             Column('col5', deftype, PassiveDefault(def2)),
             
             # preexecute + update timestamp
-            Column('col6', Date, default=currenttime, onupdate=currenttime),
+            Column('col6', DateTime, default=currenttime, onupdate=currenttime),
             
             Column('boolcol1', Boolean, default=True),
             Column('boolcol2', Boolean, default=False)
         finally:
             table.drop()    
 
+    def testfetchid(self):
+        meta = BoundMetaData(testbase.db)
+        table = Table("aitest", meta, 
+            Column('id', Integer, primary_key=True),
+            Column('data', String(20)))
+        table.create()
+
+        try:
+            # simulate working on a table that doesn't already exist
+            meta2 = BoundMetaData(testbase.db)
+            table2 = Table("aitest", meta2,
+                Column('id', Integer, primary_key=True),
+                Column('data', String(20)))
+            class AiTest(object):
+                pass
+            mapper(AiTest, table2)
+        
+            s = create_session()
+            u = AiTest()
+            s.save(u)
+            s.flush()
+            assert u.id is not None
+            s.clear()
+        finally:
+            table.drop()
+        
+
 class SequenceTest(PersistTest):
     @testbase.supported('postgres', 'oracle')
     def setUpAll(self):

test/sql/query.py

         self.users.insert().execute(user_id=7, user_name='fido')
         r = self.users.select(limit=3, order_by=[self.users.c.user_id]).execute().fetchall()
         self.assert_(r == [(1, 'john'), (2, 'jack'), (3, 'ed')], repr(r))
+        
+    @testbase.unsupported('mssql')
+    def testselectlimitoffset(self):
+        self.users.insert().execute(user_id=1, user_name='john')
+        self.users.insert().execute(user_id=2, user_name='jack')
+        self.users.insert().execute(user_id=3, user_name='ed')
+        self.users.insert().execute(user_id=4, user_name='wendy')
+        self.users.insert().execute(user_id=5, user_name='laura')
+        self.users.insert().execute(user_id=6, user_name='ralph')
+        self.users.insert().execute(user_id=7, user_name='fido')
         r = self.users.select(limit=3, offset=2, order_by=[self.users.c.user_id]).execute().fetchall()
         self.assert_(r==[(3, 'ed'), (4, 'wendy'), (5, 'laura')])
         r = self.users.select(offset=5, order_by=[self.users.c.user_id]).execute().fetchall()

test/sql/testtypes.py

         self.echo(repr(x['plain_data']))
         self.assert_(isinstance(x['unicode_data'], unicode) and x['unicode_data'] == unicodedata)
         if isinstance(x['plain_data'], unicode):
-            # SQLLite returns even non-unicode data as unicode
-            self.assert_(db.name == 'sqlite')
+            # SQLLite and MSSQL return non-unicode data as unicode
+            self.assert_(db.name in ('sqlite', 'mssql'))
             self.assert_(x['plain_data'] == unicodedata)
-            self.echo("its sqlite !")
+            self.echo("it's %s!" % db.name)
         else:
             self.assert_(not isinstance(x['plain_data'], unicode) and x['plain_data'] == rawdata)
     def testengineparam(self):