Mike Bayer avatar Mike Bayer committed 2b45cd9

- tightened down conditions used to locate "relation direction", associating
the "foreignkey" of the relationship with the "primaryjoin". the column match now
must be exact, not just "corresponding". this enables self-referential relationships on a
polymorphic mapper.
- a little bit of improvement to the concept of a "concrete" inheritance mapping, though that concept
is not well fleshed out yet (added test case to support concrete mappers on top of a polymorphic base).

Comments (0)

Files changed (9)

   actual primary key in "someinstance".
   - some deeper error checking when compiling relations, to detect an ambiguous "primaryjoin"
   in the case that both sides of the relationship have foreign key references in the primary
-  join condition
+  join condition.  also tightened down conditions used to locate "relation direction", associating
+  the "foreignkey" of the relationship with the "primaryjoin"
+  - a little bit of improvement to the concept of a "concrete" inheritance mapping, though that concept
+  is not well fleshed out yet (added test case to support concrete mappers on top of a polymorphic base).
   - fix to "proxy=True" behavior on synonym()
   - fixed bug where delete-orphan basically didn't work with many-to-many relationships [ticket:427],
   backref presence generally hid the symptom

lib/sqlalchemy/orm/mapper.py

 
         if self.inherits is not None:
             for key, prop in self.inherits.__props.iteritems():
-                if not self.__props.has_key(key):
+                if not self.__props.has_key(key) and (not self.concrete or not isinstance(prop, ColumnProperty)):
                     prop.adapt_to_inherited(key, self)
 
         # load properties from the main table object,
         the columns of select_table should encompass all the columns of the mapped_table either directly
         or through proxying relationships."""
         if self.select_table is not self.mapped_table:
+            if self.polymorphic_identity is None:
+                raise exceptions.ArgumentError("Could not locate a polymorphic_identity field for mapper '%s'.  This field is required for polymorphic mappers" % str(self))
             props = {}
             if self.properties is not None:
                 for key, prop in self.properties.iteritems():

lib/sqlalchemy/orm/properties.py

                 else:
                     return sync.MANYTOONE
         else:
-            onetomany = len([c for c in self.foreignkey if self.mapper.unjoined_table.corresponding_column(c, False) is not None])
-            manytoone = len([c for c in self.foreignkey if self.parent.unjoined_table.corresponding_column(c, False) is not None])
+            onetomany = len([c for c in self.foreignkey if self.mapper.unjoined_table.corresponding_column(c, False, require_exact=True) is not None])
+            manytoone = len([c for c in self.foreignkey if self.parent.unjoined_table.corresponding_column(c, False, require_exact=True) is not None])
             if not onetomany and not manytoone:
                 raise exceptions.ArgumentError("Cant determine relation direction for '%s' on mapper '%s' with primary join '%s' - foreign key columns are not present in neither the parent nor the child's mapped tables" %(self.key, str(self.parent), str(self.primaryjoin)))
             elif onetomany and manytoone:

lib/sqlalchemy/orm/strategies.py

         binds = {}
         reverse = {}
         def column_in_table(table, column):
-            return table.corresponding_column(column, raiseerr=False, keys_ok=False) is not None
+            return table.corresponding_column(column, raiseerr=False, keys_ok=False, require_exact=True) is not None
 
         if remote_side is None or len(remote_side) == 0:
             remote_side = foreignkey

lib/sqlalchemy/sql.py

         if not hasattr(self, '_oid_column'):
             self._oid_column = self._locate_oid_column()
         return self._oid_column
-    def corresponding_column(self, column, raiseerr=True, keys_ok=False):
+    def corresponding_column(self, column, raiseerr=True, keys_ok=False, require_exact=False):
         """given a ColumnElement, return the ColumnElement object from this 
         Selectable which corresponds to that original Column via a proxy relationship."""
+        if require_exact:
+            if self.columns.get(column.key) is column:
+                return column
+            else:
+                if not raiseerr:
+                    return None
+                else:
+                    raise exceptions.InvalidRequestError("Column instance '%s' is not directly present in table '%s'" % (str(column), str(column.table)))
         for c in column.orig_set:
             try:
                 return self.original_columns[c]

test/orm/alltests.py

         'orm.inheritance',
         'orm.inheritance2',
         'orm.inheritance3',
+        'orm.inheritance4',
+        'orm.inheritance5',
         'orm.single',
         'orm.polymorph'        
         )

test/orm/inheritance4.py

+from sqlalchemy import *
+import testbase
+
+class ConcreteTest1(testbase.AssertMixin):
+    def setUpAll(self):
+        global managers_table, engineers_table, metadata
+        metadata = BoundMetaData(testbase.db)
+        managers_table = Table('managers', metadata, 
+            Column('employee_id', Integer, primary_key=True),
+            Column('name', String(50)),
+            Column('manager_data', String(50)),
+        )
+
+        engineers_table = Table('engineers', metadata, 
+            Column('employee_id', Integer, primary_key=True),
+            Column('name', String(50)),
+            Column('engineer_info', String(50)),
+        )
+
+        metadata.create_all()
+    def tearDownAll(self):
+        metadata.drop_all()
+        
+    def testbasic(self):
+        class Employee(object):
+            def __init__(self, name):
+                self.name = name
+            def __repr__(self):
+                return self.__class__.__name__ + " " + self.name
+
+        class Manager(Employee):
+            def __init__(self, name, manager_data):
+                self.name = name
+                self.manager_data = manager_data
+            def __repr__(self):
+                return self.__class__.__name__ + " " + self.name + " " +  self.manager_data
+
+        class Engineer(Employee):
+            def __init__(self, name, engineer_info):
+                self.name = name
+                self.engineer_info = engineer_info
+            def __repr__(self):
+                return self.__class__.__name__ + " " + self.name + " " +  self.engineer_info
+
+        pjoin = polymorphic_union({
+            'manager':managers_table,
+            'engineer':engineers_table
+        }, 'type', 'pjoin')
+
+        employee_mapper = mapper(Employee, pjoin, polymorphic_on=pjoin.c.type)
+        manager_mapper = mapper(Manager, managers_table, inherits=employee_mapper, concrete=True, polymorphic_identity='manager')
+        engineer_mapper = mapper(Engineer, engineers_table, inherits=employee_mapper, concrete=True, polymorphic_identity='engineer')
+
+        session = create_session()
+        session.save(Manager('Tom', 'knows how to manage things'))
+        session.save(Engineer('Kurt', 'knows how to hack'))
+        session.flush()
+        session.clear()
+
+        assert set([repr(x) for x in session.query(Employee).select()]) == set(["Engineer Kurt knows how to hack", "Manager Tom knows how to manage things"])
+        assert set([repr(x) for x in session.query(Manager).select()]) == set(["Manager Tom knows how to manage things"])
+        assert set([repr(x) for x in session.query(Engineer).select()]) == set(["Engineer Kurt knows how to hack"])
+
+    def testwithrelation(self):
+        pass
+        
+        # TODO: test a self-referential relationship on a concrete polymorphic mapping
+
+
+if __name__ == '__main__':
+    testbase.main()

test/orm/inheritance5.py

+from sqlalchemy import *
+import testbase
+
+class AttrSettable(object):
+    def __init__(self, **kwargs):
+        [setattr(self, k, v) for k, v in kwargs.iteritems()]
+    def __repr__(self):
+        return self.__class__.__name__ + ' ' + ','.join(["%s=%s" % (k,v) for k, v in self.__dict__.iteritems() if k[0] != '_'])
+
+
+class RelationTest1(testbase.PersistTest):
+    """test self-referential relationships on polymorphic mappers"""
+    def setUpAll(self):
+        global people, managers, metadata
+        metadata = BoundMetaData(testbase.db)
+
+        people = Table('people', metadata, 
+           Column('person_id', Integer, Sequence('person_id_seq', optional=True), primary_key=True),
+           Column('manager_id', Integer, ForeignKey('managers.person_id', use_alter=True, name="mpid_fq")),
+           Column('name', String(50)),
+           Column('type', String(30)))
+
+        managers = Table('managers', metadata, 
+           Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True),
+           Column('status', String(30)),
+           Column('manager_name', String(50))
+           )
+
+        metadata.create_all()
+
+    def tearDownAll(self):
+        metadata.drop_all()
+
+    def tearDown(self):
+        clear_mappers()
+        people.update().execute(manager_id=None)
+        for t in metadata.table_iterator(reverse=True):
+            t.delete().execute()
+
+    def testbasic(self):
+        class Person(AttrSettable):
+            pass
+        class Manager(Person):
+            pass
+
+        mapper(Person, people, properties={
+            'manager':relation(Manager, primaryjoin=people.c.manager_id==managers.c.person_id, uselist=False)
+        })
+        mapper(Manager, managers, inherits=Person, inherit_condition=people.c.person_id==managers.c.person_id)
+        try:
+            compile_mappers()
+        except exceptions.ArgumentError, ar:
+            assert str(ar) == "Cant determine relation direction for 'manager' on mapper 'Mapper|Person|people' with primary join 'people.manager_id = managers.person_id' - foreign key columns are present in both the parent and the child's mapped tables.  Specify 'foreignkey' argument."
+
+        clear_mappers()
+
+        mapper(Person, people, properties={
+            'manager':relation(Manager, primaryjoin=people.c.manager_id==managers.c.person_id, foreignkey=people.c.manager_id, uselist=False, post_update=True)
+        })
+        mapper(Manager, managers, inherits=Person, inherit_condition=people.c.person_id==managers.c.person_id)
+
+        session = create_session()
+        p = Person(name='some person')
+        m = Manager(name='some manager')
+        p.manager = m
+        session.save(p)
+        session.flush()
+        session.clear()
+
+        p = session.query(Person).get(p.person_id)
+        m = session.query(Manager).get(m.person_id)
+        print p, m, p.manager
+        assert p.manager is m
+            
+class RelationTest2(testbase.AssertMixin):
+    """test self-referential relationships on polymorphic mappers"""
+    def setUpAll(self):
+        global people, managers, metadata
+        metadata = BoundMetaData(testbase.db)
+
+        people = Table('people', metadata, 
+           Column('person_id', Integer, Sequence('person_id_seq', optional=True), primary_key=True),
+           Column('name', String(50)),
+           Column('type', String(30)))
+
+        managers = Table('managers', metadata, 
+           Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True),
+           Column('manager_id', Integer, ForeignKey('people.person_id')),
+           Column('status', String(30)),
+           )
+
+        metadata.create_all()
+
+    def tearDownAll(self):
+        metadata.drop_all()
+
+    def tearDown(self):
+        clear_mappers()
+        for t in metadata.table_iterator(reverse=True):
+            t.delete().execute()
+
+    def testbasic(self):
+        class Person(AttrSettable):
+            pass
+        class Manager(Person):
+            pass
+
+        poly_union = polymorphic_union({
+            'person':people.select(people.c.type=='person'),
+            'manager':managers.join(people, people.c.person_id==managers.c.person_id)
+        }, None)
+
+        mapper(Person, people, select_table=poly_union, polymorphic_identity='person', polymorphic_on=people.c.type)
+        mapper(Manager, managers, inherits=Person, inherit_condition=people.c.person_id==managers.c.person_id, polymorphic_identity='manager',
+              properties={
+                'colleague':relation(Person, primaryjoin=managers.c.manager_id==people.c.person_id, uselist=False)
+        })
+        
+        sess = create_session()
+        p = Person(name='person1')
+        m = Manager(name='manager1')
+        m.colleague = p
+        sess.save(m)
+        sess.flush()
+        
+        sess.clear()
+        p = sess.query(Person).get(p.person_id)
+        m = sess.query(Manager).get(m.person_id)
+        print p
+        print m
+        assert m.colleague is p
+
+if __name__ == "__main__":    
+    testbase.main()
+        

test/orm/relationships.py

         s.delete(j)
         s.flush()
         
-class CyclicalRelationInheritTest(testbase.PersistTest):
-    """testing a mapper with an inheriting mapper, where the base mapper also has a relation to the inheriting mapper,
-    as well as using unexpected columns to create the join conditions which do not indicate the usual "cyclical" relationship
-    this represents.  test that proper error messages are raised guiding the user to a reasonably working setup, and that 
-    the ultimate setup works."""
-    def setUpAll(self):
-        global people, managers, metadata
-        metadata = BoundMetaData(testbase.db)
-
-        people = Table('people', metadata, 
-           Column('person_id', Integer, Sequence('person_id_seq', optional=True), primary_key=True),
-           Column('manager_id', Integer, ForeignKey('managers.person_id', use_alter=True, name="mpid_fq")),
-           Column('name', String(50)),
-           Column('type', String(30)))
-
-        managers = Table('managers', metadata, 
-           Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True),
-           Column('status', String(30)),
-           Column('manager_name', String(50))
-           )
-
-        metadata.create_all()
-
-    def tearDownAll(self):
-        metadata.drop_all()
-
-    def tearDown(self):
-        clear_mappers()
-        people.update().execute(manager_id=None)
-        for t in metadata.table_iterator(reverse=True):
-            t.delete().execute()
-
-    def testbasic(self):
-        class Person(object):
-            pass
-        class Manager(Person):
-            pass
-
-        mapper(Person, people, properties={
-            'manager':relation(Manager, primaryjoin=people.c.manager_id==managers.c.person_id, uselist=False)
-        })
-        mapper(Manager, managers, inherits=Person, inherit_condition=people.c.person_id==managers.c.person_id)
-        try:
-            compile_mappers()
-        except exceptions.ArgumentError, ar:
-            assert str(ar) == "Cant determine relation direction for 'manager' on mapper 'Mapper|Person|people' with primary join 'people.manager_id = managers.person_id' - foreign key columns are present in both the parent and the child's mapped tables.  Specify 'foreignkey' argument."
-
-        clear_mappers()
-
-        mapper(Person, people, properties={
-            'manager':relation(Manager, primaryjoin=people.c.manager_id==managers.c.person_id, foreignkey=people.c.manager_id, uselist=False, post_update=True)
-        })
-        mapper(Manager, managers, inherits=Person, inherit_condition=people.c.person_id==managers.c.person_id)
-
-        session = create_session()
-        p = Person()
-        p.name = 'some person'
-        m = Manager()
-        m.name = 'some manager'
-        p.manager = m
-        session.save(p)
-        session.flush()
-        session.clear()
-
-        p = session.query(Person).get(p.person_id)
-        m = session.query(Manager).get(m.person_id)
-        print p, m, p.manager
-        assert p.manager is m
-        
         
         
 if __name__ == "__main__":
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.