Commits

Ralph Bean  committed 8bb5544 Merge

Merging.

  • Participants
  • Parent commits 8857fee, 61da7ca

Comments (0)

Files changed (5)

 
 setup(
     name='tw2.sqla',
-    version='2.0a5',
+    version='2.0a7',
     description='SQLAlchemy database layer for ToscaWidgets 2',
     author='Paul Johnston',
     author_email='paj@pajhome.org.uk',

File tests/test_utils.py

         # When updating a DBTestCls1 object, it should only be possible to modify
         # a DBTestCls2 object that is related to that object.
         prev_nick = self.DBTestCls2.query.get(1).nick
-        twsu.from_dict(self.DBTestCls1(), {'others': [{'id':1, 'nick':prev_nick+'_fred'}]})
-        assert(self.DBTestCls2.query.get(1).nick == prev_nick)
+        prev_id = self.DBTestCls2.query.get(1).id
+        prev_count = self.DBTestCls2.query.count()
+        twsu.from_dict(self.DBTestCls1(), {'others': [
+            {'id':prev_id, 'nick':prev_nick+'_fred'}]})
+        obj = self.DBTestCls2.query.get(1)
+        count = self.DBTestCls2.query.count()
+        assert(prev_nick == obj.nick)
+        assert(prev_id == obj.id)
+        assert(count == prev_count+1)
 
     def test_update_or_create(self):
         d = { 'name' : 'winboat' }

File tests/test_widgets.py

         'child': twf.Form(
             child=twf.GridLayout(
                 children=[
-                    twf.HiddenField(id='id'),
+                    twf.HiddenField(id='id', validator=twc.IntValidator),
                     twf.TextField(id='name'),
                 ])
             ),
 
         self.mw.config.debug = True
         r = self.widget().request(req)
-        assert r.body == """Form posted successfully [{'id': u'1', 'name': u'a'}]""", r.body
+        assert r.body == """Form posted successfully [{'id': 1, 'name': u'a'}]""", r.body
 
     def test_request_post_counts_new(self):
         environ = {'wsgi.input': StringIO('')}

File tw2/sqla/utils.py

 import sqlalchemy as sa
+things = ['new', 'dirty', 'deleted']
 
-def from_dict(obj, data):
+def from_dict(obj, data, protect_prm_tamp=True):
     """
     Update a mapped object with data from a JSON-style nested dict/list
     structure.
             if not record:
                 record = mapper.get_property(key).mapper.class_()
                 setattr(obj, key, record)
-            from_dict(record, value)
+            from_dict(record, value, protect_prm_tamp)
         elif isinstance(value, list) and \
              value and isinstance(value[0], dict):
-            from_list(mapper.get_property(key).mapper.class_, getattr(obj, key), value)
+            from_list(
+                mapper.get_property(key).mapper.class_,
+                getattr(obj, key),
+                value,
+                protect_prm_tamp=protect_prm_tamp
+            )
         elif key not in pk_props:
             setattr(obj, key, value)
     return obj
 
 
-def from_list(entity, objects, data):
+def from_list(entity, objects, data, force_delete=False, protect_prm_tamp=True):
     """
     Update a list of mapped objects with data from a JSON-style nested dict/list
     structure.
     To protect against parameter tampering attacks, if the primary key field(s) 
     for a row do not exactly match an existing object then a new object is created.
     """
+
     mapper = sa.orm.class_mapper(entity)    
     pkey_fields = [f.key for f in mapper.primary_key]
     obj_map = dict((tuple(mapper.primary_key_from_instance(o)), o) for o in objects)
                     'to list relationships in from_dict data.')
         pkey = tuple(row.get(f) for f in pkey_fields)
         obj = obj_map.pop(pkey, None)
-        if not obj:
+        if not obj and protect_prm_tamp:
             obj = entity()
+            from_dict(obj, row, protect_prm_tamp)
             obj.query.session.add(obj)
             objects.append(obj)
-        from_dict(obj, row)
+        elif not obj:
+            obj = update_or_create(entity, row)
+            obj.query.session.add(obj)
+            objects.append(obj)
+        else:
+            from_dict(obj, row, protect_prm_tamp)
+
     for d in obj_map.values():
         objects.remove(d)
-        d.query.session.delete(d)
+        if force_delete:
+            # Only fully delete 'unreferenced' objects if explicitly told to do
+            # so.  You would *not* want to do this in a database of friends
+            # where sally and suzie stop being friends but you do not want suzie
+            # deleted from the database alltogether.
+            d.query.session.delete(d)
 
 
-def update_or_create(cls, data):
+def update_or_create(cls, data, protect_prm_tamp=True):
 
     try:
         session = cls.query.session
         record = cls()
         session.add(record)
 
-    record = from_dict(record, data)
+    record = from_dict(record, data, protect_prm_tamp)
     return record

File tw2/sqla/widgets.py

     _no_autoid = True
 
     def fetch_data(self, req):
-        self.value = req.GET and self.entity.query.filter_by(**req.GET.mixed()).first() or None
+        filter = dict([
+            (key.__str__(), value) for key, value in req.GET.mixed().items()
+        ])
+        self.value = req.GET and self.entity.query.filter_by(**filter).first() or None
 
     @classmethod
-    def validated_request(cls, req, data):
-        utils.update_or_create(cls.entity, data)
-        transaction.commit()
+    def validated_request(cls, req, data, protect_prm_tamp=True, do_commit=True):
+        utils.update_or_create(cls.entity, data,
+                               protect_prm_tamp=protect_prm_tamp)
+        if do_commit:
+            transaction.commit()
+
         if hasattr(cls, 'redirect'):
             return webob.Response(request=req, status=302, location=cls.redirect)
         else:
         self.value = self.entity.query.all()
         
     @classmethod
-    def validated_request(cls, req, data):
-        utils.from_list(cls.entity, cls.entity.query.all(), data)
-        transaction.commit()
+    def validated_request(cls, req, data, protect_prm_tamp=True, do_commit=True):
+        utils.from_list(cls.entity, cls.entity.query.all(), data,
+                        force_delete=True, protect_prm_tamp=protect_prm_tamp)
+        if do_commit:
+            transaction.commit()
+
         if hasattr(cls, 'redirect'):
             return webob.Response(request=req, status=302, location=cls.redirect)
         else: