Commits

Hong Minhee  committed 55e103e

enable timezone in sqlite datetime type [ticket:1985]

  • Participants
  • Parent commits aaf4381

Comments (0)

Files changed (2)

File lib/sqlalchemy/dialects/sqlite/base.py

 from sqlalchemy.types import BLOB, BOOLEAN, CHAR, DATE, DATETIME, DECIMAL,\
     FLOAT, REAL, INTEGER, NUMERIC, SMALLINT, TEXT, TIME, TIMESTAMP, VARCHAR
 
+
+class _UTC(datetime.tzinfo):
+    _delta = datetime.timedelta(0)
+    _value = None
+
+    def __new__(cls):
+        if cls._value is None:
+            cls._value = datetime.tzinfo.__new__(cls)
+        return cls._value
+
+    def utcoffset(self, dt):
+        return self._delta
+
+    def tzname(self, dt):
+        return "UTC"
+
+    def dst(self, dt):
+        return self._delta
+
+    def __repr__(self):
+        return "<_UTC>"
+
 class _DateTimeMixin(object):
     _reg = None
     _storage_format = None
             if value is None:
                 return None
             elif isinstance(value, datetime_datetime):
+                if self.timezone:
+                    tzinfo = value.tzinfo
+                    if tzinfo is not None and tzinfo.utcoffset(value) is not None:
+                        value = value.astimezone(_UTC())
                 return format % (value.year, value.month, value.day,
                                  value.hour, value.minute, value.second,
                                  value.microsecond)
 
     def result_processor(self, dialect, coltype):
         if self._reg:
-            return processors.str_to_datetime_processor_factory(
+            process = processors.str_to_datetime_processor_factory(
                 self._reg, datetime.datetime)
         else:
-            return processors.str_to_datetime
+            process = processors.str_to_datetime
+        if self.timezone:
+            return lambda value: value.replace(tzinfo=_UTC())
+        return process
 
 class DATE(_DateTimeMixin, sqltypes.Date):
     """Represent a Python date object in SQLite using a string.
             if value is None:
                 return None
             elif isinstance(value, datetime_time):
+                if self.timezone:
+                    tzinfo = value.tzinfo
+                    if tzinfo is not None and tzinfo.utcoffset(value) is not None:
+                        dt = datetime.datetime.combine(datetime.date.today(), value)
+                        value = dt.astimezone(_UTC()).time()
                 return format % (value.hour, value.minute, value.second,
                                  value.microsecond)
             else:
 
     def result_processor(self, dialect, coltype):
         if self._reg:
-            return processors.str_to_datetime_processor_factory(
+            process = processors.str_to_datetime_processor_factory(
                 self._reg, datetime.time)
         else:
-            return processors.str_to_time
+            process = processors.str_to_time
+        if self.timezone:
+            return lambda value: value.replace(tzinfo=_UTC())
+        return process
 
 colspecs = {
     sqltypes.Date: DATE,

File test/dialect/test_sqlite.py

             t.drop(engine)
             engine.dispose()
 
+    def test_timezone(self):
+        from sqlalchemy.dialects.sqlite.base import _UTC
+        class _KST(datetime.tzinfo):
+            def utcoffset(self, dt):
+                return datetime.timedelta(hours=9)
+            def tzname(self, dt):
+                return "KST"
+            def dst(self, dt):
+                return datetime.timedelta(0)
+        # DATETIME
+        dt_naive = datetime.datetime(2011, 4, 12, 15, 14, 30, 000001)
+        dt_aware = dt_naive.replace(tzinfo=_KST())
+        dt_utc = dt_aware.astimezone(_UTC())
+        dt_utc_naive = dt_utc.replace(tzinfo=None)
+        sqldt = sqlite.DATETIME(timezone=True)
+        bp = sqldt.bind_processor(None)
+        eq_(bp(dt_naive), str(dt_naive))
+        eq_(bp(dt_aware), str(dt_utc_naive))
+        # TIME
+        t_naive = dt_naive.time()
+        t_aware = t_naive.replace(tzinfo=_KST())
+        t_utc_naive = dt_utc_naive.time()
+        sqldt = sqlite.TIME(timezone=True)
+        bp = sqldt.bind_processor(None)
+        eq_(bp(t_naive), str(t_naive))
+        eq_(bp(t_aware), str(t_utc_naive))
+
     def test_no_convert_unicode(self):
         """test no utf-8 encoding occurs"""