Andy Mikhailenko avatar Andy Mikhailenko committed 0c032a5

Improved document structure validation algo. Added tests.

Comments (0)

Files changed (2)

     return dict(generate())
 
 
+
 class StructureSpecificationError(Exception):
     pass
 
             yield (key,), value
 
 
-def walk_structure_spec(spec, only_leaves=True):
-    """ Walks given document structure specification dictionary and yields
-    pairs ``(keys, value)`` where `keys` is a tuple of keys.
-
+def validate_structure_spec(spec):
+    """ Checks whether given document structure specification dictionary if
+    defined correctly.
     Raises `StructureSpecificationError` if the specification is malformed.
-
-    :param only_leaves:
-        if ``False``, keys with assigned container types (`dict` and `list`)
-        are included in results (but value is replaced with type). Default
-        is ``True`` so only terminal nodes are yielded. For example::
-
-            >>> list(walk_structure_spec({'a': {'b': int}}, only_leaves=True)
-            [(('a','b'), int)]
-            >>> list(walk_structure_spec({'a': {'b': int}}, only_leaves=False)
-            [(('a',), dict), (('a','b'), int)]
-
     """
     stack = deque(walk_dict(spec))
     while stack:
         keys, value = stack.pop()
         if isinstance(value, list):
+            # accepted: list of values of given type
+            # e.g.: [unicode] -> [u'foo', u'bar']
             if len(value) == 1:
                 stack.append((keys, value[0]))
             else:
                 raise StructureSpecificationError(
                     '{path}: list must contain exactly 1 item (got {count})'
                          .format(path='.'.join(keys), count=len(value)))
-            if not only_leaves:
-                yield keys, list
         elif isinstance(value, dict):
+            # accepted: nested dictionary (a spec on its own)
+            # e.g.: {...} -> {...}
             for subkeys, subvalue in walk_dict(value):
                 stack.append((keys+subkeys, subvalue))
-            if not only_leaves:
-                yield keys, dict
-        elif isinstance(value, type) or value is None:
-            yield keys, value
+        elif value is None:
+            # accepted: any value
+            # e.g.: None -> 123
+            pass
+        elif isinstance(value, type):
+            # accepted: given type
+            # e.g.: unicode -> u'foo'   or   dict -> {'a': 123}   or whatever.
+            pass
         else:
             raise StructureSpecificationError(
                 '{path}: expected dict, list, type or None (got {value!r})'
                     .format(path='.'.join(keys), value=value))
 
 
-def validate_structure_spec(spec):
-    "Validates given document structure specification dictionary."
-    for keys, value in walk_structure_spec(spec):
-        pass
+def check_type(typespec, value, keys_tuple):
+    if typespec is None:
+        return
+    if not isinstance(typespec, type):
+        key = '.'.join(keys_tuple)
+        raise StructureSpecificationError(
+            '{path}: expected dict, list, type or None (got {value!r})'
+                .format(path=key, value=value))
+    if not isinstance(value, typespec):
+        key = '.'.join(keys_tuple)
+        raise TypeError('{key}: expected {typespec.__name__}, got '
+                        '{valtype.__name__} {value!r}'.format(key=key,
+                        typespec=typespec, valtype=type(value), value=value))
 
 
-def validate_structure(spec, data, skip_unknown=False, skip_missing=False):
+def validate_structure(spec, data, skip_missing=False, skip_unknown=False):
     """ Validates given document against given structure specification.
     """
-    plain_spec = dict(walk_structure_spec(spec, only_leaves=False))
-    seen = []
-    for keys, value in walk_dict(data):
-        seen.append(keys)
+    # flatten the structures so that nested dictionaries are moved to the root
+    # level and {'a': {'b': 1}} becomes {('a','b'): 1}
+    flat_spec = dict(walk_dict(spec))
+    flat_data = dict(walk_dict(data))
+
+    # compare the two structures; nested dictionaries are included in the
+    # comparison but nested lists are opaque and will be dealt with later on.
+    spec_keys = set(spec.iterkeys())
+    data_keys = set(data.iterkeys())
+    missing = spec_keys - data_keys
+    unknown = data_keys - spec_keys
+
+    if missing and not skip_missing:
+        raise KeyError('Missing keys: {0}'.format(', '.join(missing)))
+
+    if unknown and not skip_unknown:
+        raise KeyError('Unknown keys: {0}'.format(', '.join(unknown)))
+
+    # check types and deal with nested lists
+    for keys, value in flat_data.iteritems():
+        typespec = flat_spec.get(keys)
         if value is None:
+            # empty value, ok unless required
             continue
-        if keys not in plain_spec:
-            if skip_unknown:
-                continue
-            raise KeyError('{key} is not in spec'.format(key='.'.join(keys)))
-        typespec = plain_spec[keys]
-        if not isinstance(value, typespec):
-            raise TypeError('{key}: expected {typespec}, got {value!r}'
-                            .format(key='.'.join(keys), typespec=typespec,
-                                    value=value))
-    missing = set(plain_spec) - set(seen)
-    if missing:
-        if skip_missing:
-            return
-        dotkeys = ('.'.join(k) for k in missing)
-        raise KeyError('Missing keys: {0}'.format(', '.join(dotkeys)))
+        elif typespec is None:
+            # any value is acceptable
+            continue
+        elif isinstance(typespec, list) and value:
+            # nested list
+            item_spec = typespec[0]
+            for item in value:
+                if item_spec == dict or isinstance(item, dict):
+                    # validate each value in the list as a separate document
+                    # and fix error message to include outer key
+                    try:
+                        validate_structure(item_spec, item,
+                                           skip_missing=skip_missing,
+                                           skip_unknown=skip_unknown)
+                    except (KeyError, TypeError) as e:
+                        raise type(e)('{k}: {e}'.format(k='.'.join(keys), e=e))
+                else:
+                    check_type(item_spec, item, keys)
+        else:
+            check_type(typespec, value, keys)
 # -*- coding: utf-8 -*-
+#
+#    Monk is a lightweight schema/query framework for document databases.
+#    Copyright © 2011  Andrey Mikhaylenko
+#
+#    This file is part of Monk.
+#
+#    Monk is free software: you can redistribute it and/or modify
+#    it under the terms of the GNU Lesser General Public License as published
+#    by the Free Software Foundation, either version 3 of the License, or
+#    (at your option) any later version.
+#
+#    Monk is distributed in the hope that it will be useful,
+#    but WITHOUT ANY WARRANTY; without even the implied warranty of
+#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+#    GNU Lesser General Public License for more details.
+#
+#    You should have received a copy of the GNU Lesser General Public License
+#    along with Monk.  If not, see <http://gnu.org/licenses/>.
 """
 Unit Tests
 ==========
 
 
 class StructureSpecTestCase(unittest2.TestCase):
-    def test_walk(self):
-        spec = {
+
+    def test_walk_dict(self):
+        data = {
             'a': {
-                'b': [
-                    { 'c': int },
+                'b': {
+                    'c': 'C',
+                },
+                'd': [
+                    { 'e': 123 },
                 ],
-                'd': str
             },
-            'e': list
+            'f': ['F'],
+            'g': dict,
+            'h': list,
+            'i': None,
         }
         paths = [
-            (('a', 'b', 'c'), int),
-            (('a', 'd',), str),
-            (('e',), list)
+            # value is a dictionary, not yielded, queued for unwrapping
+            (('a',), None),
+            # nested dict unwrapped; value is a dict; queued for unwrapping
+            (('a', 'b',), None),
+            # nested dict unwrapped; value is a string; yielded as is
+            (('a', 'b', 'c'), 'C'),
+            # nested dict unwrapped; next value is a list which in opaque for
+            # this function, so yielded as is, even if there are dicts inside
+            (('a', 'd'), [{'e': 123}]),
+            # value is a list again; yielded as is
+            (('f',), ['F']),
+            # a couple of type definitions
+            (('g',), dict),
+            (('h',), list),
+            (('i',), None),
         ]
-        all_paths = [
-            (('a',), None),
-            (('a', 'b'), None),
-            (('a', 'b', 'c'), int),
-            (('a', 'd',), str),
-            (('e',), list)
-        ]
-        self.assertEqual(set(monk.walk_structure_spec(spec)), set(paths))
-
-
-        print list(monk.walk_structure_spec(spec, all_keys=True))
-
-
-        self.assertEqual(set(monk.walk_structure_spec(spec, all_keys=True)),
-                         set(all_paths))
+        self.assertEqual(sorted(monk.walk_dict(data)), sorted(paths))
 
     def test_correct_types(self):
         """ `None` stands for "any value". """
             monk.validate_structure({'a': bool}, {'a': u''})
 
     def test_missing(self):
+        monk.validate_structure({'a': unicode}, {}, skip_missing=True)
         with self.assertRaises(KeyError):
             monk.validate_structure({'a': unicode}, {})
         with self.assertRaises(KeyError):
             monk.validate_structure({}, {'x': 123})
         with self.assertRaises(KeyError):
             monk.validate_structure({'a': unicode}, {'a': u'A', 'x': 123})
+        with self.assertRaisesRegexp(TypeError, "a: b: expected int, got str 'bad'"):
+            monk.validate_structure({'a': [{'b': [int]}]}, {'a': [{'b': ['bad']}]})
 
     def test_bool(self):
         monk.validate_structure({'a': bool}, {'a': None})
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.