Commits

Brian Kearns committed f09e575

more cleanups for sqlite

  • Participants
  • Parent commits e06dbff

Comments (0)

Files changed (2)

lib_pypy/_sqlite3.py

         self._reset = False
         self.__locked = False
         self.__closed = False
-        self.__description = None
+        self.__lastrowid = None
         self.__rowcount = -1
 
         con._check_thread()
         try:
             if not isinstance(sql, basestring):
                 raise ValueError("operation parameter must be str or unicode")
-            self.__description = None
+            try:
+                del self.__description
+            except AttributeError:
+                pass
             self.__rowcount = -1
             self.__statement = self.__connection._statement_cache.get(sql)
 
             if self.__connection._isolation_level is not None:
-                if self.__statement._kind == Statement._DDL:
+                if self.__statement._type in ("UPDATE", "DELETE", "INSERT", "REPLACE"):
+                    if not self.__connection._in_transaction:
+                        self.__connection._begin()
+                elif self.__statement._type == "OTHER":
                     if self.__connection._in_transaction:
                         self.__connection.commit()
-                elif self.__statement._kind == Statement._DML:
-                    if not self.__connection._in_transaction:
-                        self.__connection._begin()
-
-            if multiple and self.__statement._kind != Statement._DML:
-                raise ProgrammingError("executemany is only for DML statements")
+                elif self.__statement._type == "SELECT":
+                    if multiple:
+                        raise ProgrammingError("You cannot execute SELECT "
+                                               "statements in executemany().")
 
             for params in many_params:
                 self.__statement._set_params(params)
                     self.__statement._reset()
                     raise self.__connection._get_exception(ret)
 
-                if self.__statement._kind == Statement._DML:
+                if ret == _lib.SQLITE_ROW:
+                    if multiple:
+                        raise ProgrammingError("executemany() can only execute DML statements.")
+                    self.__build_row_cast_map()
+                    self.__next_row = self.__fetch_one_row()
+                elif ret == _lib.SQLITE_DONE and not multiple:
                     self.__statement._reset()
 
-                if ret == _lib.SQLITE_ROW:
-                    self.__build_row_cast_map()
-                    self.__next_row = self.__fetch_one_row()
-
-                if self.__statement._kind == Statement._DML:
+                if self.__statement._type in ("UPDATE", "DELETE", "INSERT", "REPLACE"):
                     if self.__rowcount == -1:
                         self.__rowcount = 0
                     self.__rowcount += _lib.sqlite3_changes(self.__connection._db)
+
+                if not multiple and self.__statement._type == "INSERT":
+                    self.__lastrowid = _lib.sqlite3_last_insert_rowid(self.__connection._db)
+                else:
+                    self.__lastrowid = None
+
+                if multiple:
+                    self.__statement._reset()
         finally:
             self.__connection._in_transaction = \
                 not _lib.sqlite3_get_autocommit(self.__connection._db)
     rowcount = property(__get_rowcount)
 
     def __get_description(self):
-        if self.__description is None:
+        try:
+            return self.__description
+        except AttributeError:
             self.__description = self.__statement._get_description()
-        return self.__description
+            return self.__description
     description = property(__get_description)
 
     def __get_lastrowid(self):
-        return _lib.sqlite3_last_insert_rowid(self.__connection._db)
+        return self.__lastrowid
     lastrowid = property(__get_lastrowid)
 
     def setinputsizes(self, *args):
 
 
 class Statement(object):
-    _DML, _DQL, _DDL = range(3)
-
     _statement = None
 
     def __init__(self, connection, sql):
 
         if not isinstance(sql, basestring):
             raise Warning("SQL is of wrong type. Must be string or unicode.")
-        first_word = self._statement_kind = sql.lstrip().split(" ")[0].upper()
-        if first_word in ("INSERT", "UPDATE", "DELETE", "REPLACE"):
-            self._kind = Statement._DML
-        elif first_word in ("SELECT", "PRAGMA"):
-            self._kind = Statement._DQL
+
+        first_word = sql.lstrip().split(" ")[0].upper()
+        if first_word == "":
+            self._type = "INVALID"
+        elif first_word in ("SELECT", "INSERT", "UPDATE", "DELETE", "REPLACE"):
+            self._type = first_word
         else:
-            self._kind = Statement._DDL
+            self._type = "OTHER"
 
         if isinstance(sql, unicode):
             sql = sql.encode('utf-8')
 
         if ret == _lib.SQLITE_OK and not self._statement:
             # an empty statement, work around that, as it's the least trouble
+            self._type = "SELECT"
             c_sql = _ffi.new("char[]", b"select 42")
             ret = _lib.sqlite3_prepare_v2(self.__con._db, c_sql, -1,
                                           statement_star, next_char)
             self._statement = statement_star[0]
-            self._kind = Statement._DQL
+
         if ret != _lib.SQLITE_OK:
             raise self.__con._get_exception(ret)
 
             raise ValueError("parameters are of unsupported type")
 
     def _get_description(self):
-        if self._kind == Statement._DML:
+        if self._type in ("INSERT", "UPDATE", "DELETE", "REPLACE"):
             return None
         desc = []
         for i in xrange(_lib.sqlite3_column_count(self._statement)):

pypy/module/test_lib_pypy/test_sqlite3.py

     cur = con.cursor()
     cur.execute("select 42").fetchall()
     assert cur.description is not None
+
+def test_executemany_lastrowid():
+    con = _sqlite3.connect(':memory:')
+    cur = con.cursor()
+    cur.execute("create table test(a)")
+    cur.executemany("insert into test values (?)", [[1], [2], [3]])
+    assert cur.lastrowid is None