Commits

Mike Bayer committed d80189e

- really start making postgis example slick.

  • Participants
  • Parent commits 76034c1

Comments (0)

Files changed (1)

File examples/postgis/postgis.py

-from sqlalchemy.types import UserDefinedType
-from sqlalchemy.sql import expression
+from sqlalchemy.types import UserDefinedType, _Binary, TypeDecorator
+from sqlalchemy.sql import expression, type_coerce
 from sqlalchemy import event, Table
+import binascii
 
 # Python datatypes
 
 class GisElement(object):
     """Represents a geometry value."""
 
-    @property
-    def wkt(self):
-        return func.ST_AsText(literal(self, Geometry))
-
-    @property
-    def wkb(self):
-        return func.ST_AsBinary(literal(self, Geometry))
-
     def __str__(self):
         return self.desc
 
         return "<%s at 0x%x; %r>" % (self.__class__.__name__,
                                     id(self), self.desc)
 
-class PersistentGisElement(GisElement):
-    """Represents a Geometry value as loaded from the database."""
+class BinaryGisElement(GisElement, expression.Function):
+    """Represents a Geometry value expressed as binary."""
 
-    def __init__(self, desc):
-        self.desc = desc
+    def __init__(self, data):
+        self.data = data
+        expression.Function.__init__(self, "ST_GeomFromEWKB", data,
+                                    type_=Geometry(coerce_="binary"))
+
+    @property
+    def desc(self):
+        return self.as_hex
+
+    @property
+    def as_hex(self):
+        return binascii.hexlify(self.data)
 
 class TextualGisElement(GisElement, expression.Function):
-    """Represents a Geometry value as expressed within application code;
-    i.e. in wkt format.
-
-    Extends expression.Function so that the value is interpreted as
-    GeomFromText(value) in a SQL expression context.
-
-    """
+    """Represents a Geometry value expressed as text."""
 
     def __init__(self, desc, srid=-1):
         self.desc = desc
-        expression.Function.__init__(self, "ST_GeomFromText", desc, srid)
+        expression.Function.__init__(self, "ST_GeomFromText", desc, srid,
+                                    type_=Geometry)
 
 
 # SQL datatypes.
 
 class Geometry(UserDefinedType):
-    """Base PostGIS Geometry column type.
-
-    Converts bind/result values to/from a PersistentGisElement.
-
-    """
+    """Base PostGIS Geometry column type."""
 
     name = "GEOMETRY"
 
-    def __init__(self, dimension=None, srid=-1):
+    def __init__(self, dimension=None, srid=-1,
+                coerce_="text"):
         self.dimension = dimension
         self.srid = srid
+        self.coerce = coerce_
 
     class comparator_factory(UserDefinedType.Comparator):
         """Define custom operations for geometry types."""
         return self.name
 
     def bind_expression(self, bindvalue):
-        return TextualGisElement(bindvalue)
+        if self.coerce == "text":
+            return TextualGisElement(bindvalue)
+        elif self.coerce == "binary":
+            return BinaryGisElement(bindvalue)
+        else:
+            assert False
 
     def column_expression(self, col):
-        return func.ST_AsText(col, type_=self)
+        if self.coerce == "text":
+            return func.ST_AsText(col, type_=self)
+        elif self.coerce == "binary":
+            return func.ST_AsBinary(col, type_=self)
+        else:
+            assert False
 
     def bind_processor(self, dialect):
         def process(value):
         return process
 
     def result_processor(self, dialect, coltype):
+        if self.coerce == "text":
+            fac = TextualGisElement
+        elif self.coerce == "binary":
+            fac = BinaryGisElement
+        else:
+            assert False
         def process(value):
             if value is not None:
-                return PersistentGisElement(value)
+                return fac(value)
             else:
                 return value
         return process
 
+    def adapt(self, impltype):
+        return impltype(dimension=self.dimension,
+                srid=self.srid, coerce_=self.coerce)
+
 # other datatypes can be added as needed.
 
 class Point(Geometry):
             table.columns = table.info.pop('_saved_columns')
 setup_ddl_events()
 
-def _to_postgis(value):
-    """Interpret a value as a GIS-compatible construct.
-
-
-    TODO.  I'd like to make this unnecessary also,
-    and see if the Geometry type can do the coersion.
-    This would require [ticket:1534].
-
-    """
-
-    if hasattr(value, '__clause_element__'):
-        return value.__clause_element__()
-    elif isinstance(value, (expression.ClauseElement, GisElement)):
-        return value
-    elif isinstance(value, basestring):
-        return TextualGisElement(value)
-    elif value is None:
-        return None
-    else:
-        raise Exception("Invalid type")
-
 
 
 # illustrate usage
 if __name__ == '__main__':
     from sqlalchemy import (create_engine, MetaData, Column, Integer, String,
-        func, literal, select)
+        func, select)
     from sqlalchemy.orm import sessionmaker
     from sqlalchemy.ext.declarative import declarative_base
 
 
     # pre flush, the TextualGisElement represents the string we sent.
     assert str(r.road_geom) == 'LINESTRING(198231 263418,198213 268322)'
-    assert session.scalar(r.road_geom.wkt) == 'LINESTRING(198231 263418,198213 268322)'
 
     session.commit()
 
     # after flush and/or commit, all the TextualGisElements become PersistentGisElements.
     assert str(r.road_geom) == "LINESTRING(198231 263418,198213 268322)"
 
-    r1 = session.query(Road).filter(Road.road_name=='Graeme Ave').one()
+    r1 = session.query(Road).filter(Road.road_name == 'Graeme Ave').one()
 
     # illustrate the overridden __eq__() operator.
 
     # strings come in as TextualGisElements
     r2 = session.query(Road).filter(Road.road_geom == 'LINESTRING(189412 252431,189631 259122)').one()
 
-    # PersistentGisElements work directly
     r3 = session.query(Road).filter(Road.road_geom == r1.road_geom).one()
 
     assert r1 is r2 is r3
 
-    # illustrate the "intersects" operator
-    print session.query(Road).filter(Road.road_geom.intersects(r1.road_geom)).all()
+    # core usage just fine:
 
-    # illustrate usage of the "wkt" accessor. this requires a DB
-    # execution to call the AsText() function so we keep this explicit.
-    assert session.scalar(r1.road_geom.wkt) == 'LINESTRING(189412 252431,189631 259122)'
+    road_table = Road.__table__
+    stmt = select([road_table]).where(road_table.c.road_geom.intersects(r1.road_geom))
+    print session.execute(stmt).fetchall()
+
+    # look up the hex binary version, using SQLAlchemy casts
+    as_binary = session.scalar(select([type_coerce(r.road_geom, Geometry(coerce_="binary"))]))
+    assert as_binary.as_hex == \
+        '01020000000200000000000000b832084100000000e813104100000000283208410000000088601041'
+
+    # back again, same method !
+    as_text = session.scalar(select([type_coerce(as_binary, Geometry(coerce_="text"))]))
+    assert as_text.desc == "LINESTRING(198231 263418,198213 268322)"
+
 
     session.rollback()