Commits

Mike Bayer  committed 9e8e3e4

- the TIME and TIMESTAMP types are now availble from the
postgresql dialect directly, which add the PG-specific
argument 'precision' to both. 'precision' and
'timezone' are correctly reflected for both TIME and
TIMEZONE types. [ticket:997]

  • Participants
  • Parent commits 0b905a8

Comments (0)

Files changed (3)

      "sqlalchemy.dialects.postgresql" logger name. 
      [ticket:877]
 
+   - the TIME and TIMESTAMP types are now availble from the
+     postgresql dialect directly, which add the PG-specific
+     argument 'precision' to both.   'precision' and 
+     'timezone' are correctly reflected for both TIME and 
+     TIMEZONE types. [ticket:997]
+     
 - oracle
    - The Oracle dialect will issue VARCHAR type definitions
      using character counts, i.e. VARCHAR2(50 CHAR), so that

File lib/sqlalchemy/dialects/postgresql/base.py

 
 from sqlalchemy.types import INTEGER, BIGINT, SMALLINT, VARCHAR, \
         CHAR, TEXT, FLOAT, NUMERIC, \
-        TIMESTAMP, TIME, DATE, BOOLEAN
+        DATE, BOOLEAN
 
 class REAL(sqltypes.Float):
     __visit_name__ = "REAL"
     __visit_name__ = "MACADDR"
 PGMacAddr = MACADDR
 
+class TIMESTAMP(sqltypes.TIMESTAMP):
+    def __init__(self, timezone=False, precision=None):
+        super(TIMESTAMP, self).__init__(timezone=timezone)
+        self.precision = precision
+        
+class TIME(sqltypes.TIME):
+    def __init__(self, timezone=False, precision=None):
+        super(TIME, self).__init__(timezone=timezone)
+        self.precision = precision
+    
 class INTERVAL(sqltypes.TypeEngine):
     __visit_name__ = 'INTERVAL'
     def __init__(self, precision=None):
         return self.dialect.identifier_preparer.format_type(type_)
         
     def visit_TIMESTAMP(self, type_):
-        return "TIMESTAMP " + (type_.timezone and "WITH" or "WITHOUT") + " TIME ZONE"
+        return "TIMESTAMP%s %s" % (
+            getattr(type_, 'precision', None) and "(%d)" % type_.precision or "",
+            (type_.timezone and "WITH" or "WITHOUT") + " TIME ZONE"
+        )
 
     def visit_TIME(self, type_):
-        return "TIME " + (type_.timezone and "WITH" or "WITHOUT") + " TIME ZONE"
+        return "TIME%s %s" % (
+            getattr(type_, 'precision', None) and "(%d)" % type_.precision or "",
+            (type_.timezone and "WITH" or "WITHOUT") + " TIME ZONE"
+        )
 
     def visit_INTERVAL(self, type_):
         if type_.precision is not None:
         # format columns
         columns = []
         for name, format_type, default, notnull, attnum, table_oid in rows:
-            ## strip (30) from character varying(30)
-            attype = re.search('([^\([]+)', format_type).group(1)
+            ## strip (5) from character varying(5), timestamp(5) with time zone, etc
+            attype = re.sub(r'\([\d,]+\)', '', format_type)
+            
+            # strip '[]' from integer[], etc.
+            attype = re.sub(r'\[\]', '', attype)
+            
             nullable = not notnull
             is_array = format_type.endswith('[]')
-            try:
-                charlen = re.search('\(([\d,]+)\)', format_type).group(1)
-            except:
-                charlen = False
-            numericprec = False
-            numericscale = False
+            charlen = re.search('\(([\d,]+)\)', format_type)
+            if charlen:
+                charlen = charlen.group(1)
+            kwargs = {}
+                
             if attype == 'numeric':
-                if charlen is False:
-                    numericprec, numericscale = (None, None)
+                if charlen:
+                    prec, scale = charlen.split(',')
+                    args = (int(prec), int(scale))
                 else:
-                    numericprec, numericscale = charlen.split(',')
-                charlen = False
+                    args = ()
             elif attype == 'double precision':
-                numericprec, numericscale = (53, False)
-                charlen = False
+                args = (53, )
             elif attype == 'integer':
-                numericprec, numericscale = (32, 0)
-                charlen = False
-            args = []
-            for a in (charlen, numericprec, numericscale):
-                if a is None:
-                    args.append(None)
-                elif a is not False:
-                    args.append(int(a))
-            kwargs = {}
-            if attype == 'timestamp with time zone':
+                args = (32, 0)
+            elif attype in ('timestamp with time zone', 'time with time zone'):
                 kwargs['timezone'] = True
-            elif attype == 'timestamp without time zone':
+                if charlen:
+                    kwargs['precision'] = int(charlen)
+                args = ()
+            elif attype in ('timestamp without time zone', 'time without time zone', 'time'):
                 kwargs['timezone'] = False
+                if charlen:
+                    kwargs['precision'] = int(charlen)
+                args = ()
+            elif attype in ('interval','interval year to month','interval day to second'):
+                if charlen:
+                    kwargs['precision'] = int(charlen)
+                args = ()
+            elif charlen:
+                args = (int(charlen),)
+            else:
+                args = ()
+            
             if attype in self.ischema_names:
                 coltype = self.ischema_names[attype]
             elif attype in enums:

File test/dialect/test_postgresql.py

         assert_raises(exception_cls, eng.execute, "show transaction isolation level")
 
 
-class TimezoneTest(TestBase, AssertsExecutionResults):
+class TimezoneTest(TestBase):
     """Test timezone-aware datetimes.
 
     psycopg will return a datetime with a tzinfo attached to it, if postgresql
         c = notztable.update(notztable.c.id==1).execute(name='newname')
         print notztable.select(tztable.c.id==1).execute().first()
 
+class TimePrecisionTest(TestBase, AssertsCompiledSQL):
+    __dialect__ = postgresql.dialect()
+    
+    def test_compile(self):
+        for (type_, expected) in [
+            (postgresql.TIME(), "TIME WITHOUT TIME ZONE"),
+            (postgresql.TIME(precision=5), "TIME(5) WITHOUT TIME ZONE"),
+            (postgresql.TIME(timezone=True, precision=5), "TIME(5) WITH TIME ZONE"),
+            (postgresql.TIMESTAMP(), "TIMESTAMP WITHOUT TIME ZONE"),
+            (postgresql.TIMESTAMP(precision=5), "TIMESTAMP(5) WITHOUT TIME ZONE"),
+            (postgresql.TIMESTAMP(timezone=True, precision=5), "TIMESTAMP(5) WITH TIME ZONE"),
+        ]:
+            self.assert_compile(type_, expected)
+    
+    @testing.only_on('postgresql', 'DB specific feature')
+    def test_reflection(self):
+        m1 = MetaData(testing.db)
+        t1 = Table('t1', m1, 
+            Column('c1', postgresql.TIME()),
+            Column('c2', postgresql.TIME(precision=5)),
+            Column('c3', postgresql.TIME(timezone=True, precision=5)), 
+            Column('c4', postgresql.TIMESTAMP()), 
+            Column('c5', postgresql.TIMESTAMP(precision=5)), 
+            Column('c6', postgresql.TIMESTAMP(timezone=True, precision=5)), 
+        
+        )
+        t1.create()
+        try:
+            m2 = MetaData(testing.db)
+            t2 = Table('t1', m2, autoload=True)
+            eq_(t2.c.c1.type.precision, None)
+            eq_(t2.c.c2.type.precision, 5)
+            eq_(t2.c.c3.type.precision, 5)
+            eq_(t2.c.c4.type.precision, None)
+            eq_(t2.c.c5.type.precision, 5)
+            eq_(t2.c.c6.type.precision, 5)
+            eq_(t2.c.c1.type.timezone, False)
+            eq_(t2.c.c2.type.timezone, False)
+            eq_(t2.c.c3.type.timezone, True)
+            eq_(t2.c.c4.type.timezone, False)
+            eq_(t2.c.c5.type.timezone, False)
+            eq_(t2.c.c6.type.timezone, True)
+        finally:
+            t1.drop()
+        
+    
+    
 class ArrayTest(TestBase, AssertsExecutionResults):
     __only_on__ = 'postgresql'