Anonymous avatar Anonymous committed 3d8d32f

Fix DN case-sensitivity on SCOPE_BASE searches.

Fixes #5.

Comments (0)

Files changed (3)

src/mockldap/__init__.py

 from collections import defaultdict
 
+from ldap.cidict import cidict
+
 from .ldapobject import LDAPObject
 from .recording import SeedRequired  # noqa
 
         if self.ldap_objects is not None:
             raise Exception("You can't add a directory after calling start().")
 
-        self.directories[uri] = directory
+        self.directories[uri] = cidict(map_keys(lambda s: s.lower(), directory))
 
     def start(self, path='ldap.initialize'):
         """
         return ldap_object
 
 
-# Map a dictionary by applying a function to each value.
-map_values = lambda f, d: dict((k, f(v)) for k, v in d.items())
+# Map a dictionary by applying a function to each key/value.
+map_keys = lambda f, d: dict((f(k), v) for k, v in d.iteritems())
+map_values = lambda f, d: dict((k, f(v)) for k, v in d.iteritems())

src/mockldap/ldapobject.py

 from copy import deepcopy
 
 import ldap
+from ldap.cidict import cidict
 import ldap.dn
 
 try:
 class LDAPObject(RecordableMethods):
     """
     :param directory: The initial content of this LDAP connection.
-    :type directory: ``{dn: {attr: [values]}}``
+    :type directory: :class:`ldap.cidict.cidict`: ``{dn: {attr: [values]}}``
 
     Our mock replacement for :class:`ldap.LDAPObject`. This exports selected
     LDAP operations and allows you to set return values in advance as well as
         *string*: DN of the last successful bind. None if unbound.
     """
     def __init__(self, directory):
-        self.directory = ldap.cidict.cidict(deepcopy(directory))
+        if not isinstance(directory, ldap.cidict.cidict):
+            from . import map_keys
+            directory = cidict(map_keys(lambda s: s.lower(), directory))
+
+        self.directory = deepcopy(directory)
         self.async_results = []
         self.options = {}
         self.tls_enabled = False
         """
         Supports many, but not all, filter strings.
 
-        Tests of the form ``'(foo=bar)'`` and ``'(foo=*)'`` are supported, as
-        are the &, |, and !  operators. attrlist and attrsonly are also
+        Tests of the form ``'(foo=bar)'`` and ``'(foo=\*)'`` are supported, as
+        are the &, \|, and !  operators. attrlist and attrsonly are also
         supported. Beyond that, this method must be seeded.
         """
         return self._search_s(base, scope, filterstr, attrlist, attrsonly)
                         return 1
                 except (NameError, ValueError):
                     pass
-        return (value in self.directory[dn][attr]) and 1 or 0
+
+        return (1 if (value in self.directory[dn][attr]) else 0)
 
     def _search_s(self, base, scope, filterstr, attrlist, attrsonly):
         from .filter import parse, UnsupportedOp
             raise ldap.NO_SUCH_OBJECT
 
         # Find directory entries within the requested scope
-        base_parts = ldap.dn.explode_dn(base)
+        base_parts = ldap.dn.explode_dn(base.lower())
         base_len = len(base_parts)
         dn_parts = dict((dn, ldap.dn.explode_dn(dn)) for dn in self.directory.iterkeys())
 
         if scope == ldap.SCOPE_BASE:
-            dns = iter([base])
+            dns = (dn for dn, parts in dn_parts.iteritems() if parts == base_parts)
         elif scope == ldap.SCOPE_ONELEVEL:
             dns = (dn for dn, parts in dn_parts.iteritems() if parts[1:] == base_parts)
         elif scope == ldap.SCOPE_SUBTREE:

src/mockldap/tests.py

     def tearDown(self):
         self.mockldap.stop()
 
+    def test_manual_ldapobject(self):
+        from .ldapobject import LDAPObject
+
+        ldapobj = LDAPObject(directory)
+
+        self.assertIsInstance(ldapobj.directory, ldap.cidict.cidict)
+
     def test_set_option(self):
         self.ldapobj.set_option(ldap.OPT_X_TLS_DEMAND, True)
         self.assertEqual(self.ldapobj.get_option(ldap.OPT_X_TLS_DEMAND), True)
 
         self.assertEqual(results, [alice])
 
+    def test_search_s_base_case_insensitive(self):
+        results = self.ldapobj.search_s('cn=ALICE,ou=Example,o=TEST', ldap.SCOPE_BASE)
+
+        self.assertEquals(results, [alice])
+
     def test_search_s_get_specific_attr(self):
         results = self.ldapobj.search_s("cn=alice,ou=example,o=test", ldap.SCOPE_BASE,
                                         attrlist=["userPassword"])
             self.ldapobj.search_s("ou=example,o=test", ldap.SCOPE_ONELEVEL,
                                   '(invalid~=bogus)')
 
+    def test_search_async(self):
+        msgid = self.ldapobj.search("cn=alice,ou=example,o=test", ldap.SCOPE_BASE)
+        results = self.ldapobj.result(msgid)
+
+        self.assertEqual(results, (ldap.RES_SEARCH_RESULT, [alice]))
+
     def test_useful_seed_required_message(self):
         filterstr = '(invalid~=bogus)'
         try:
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.