Manuel de la Pena avatar Manuel de la Pena committed a8062f0 Merge

Merged with osx work.

Comments (0)

Files changed (5)

conversa/passwords/__init__.py

+#-*- coding: utf-8 -*-
+# Copyright (c) 2012 Conversa and other contributors,
+#
+# Permission is hereby granted, free of charge, to any person obtaining
+# a copy of this software and associated documentation files (the
+# "Software"), to deal in the Software without restriction, including
+# without limitation the rights to use, copy, modify, merge, publish,
+# distribute, sublicense, and/or sell copies of the Software, and to
+# permit persons to whom the Software is furnished to do so, subject to
+# the following conditions:
+# 
+# The above copyright notice and this permission notice shall be
+# included in all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+"""Main password conversa package."""

conversa/passwords/osx.py

+#-*- coding: utf-8 -*-
+# Copyright (c) 2012 Conversa and other contributors,
+#
+# Permission is hereby granted, free of charge, to any person obtaining
+# a copy of this software and associated documentation files (the
+# "Software"), to deal in the Software without restriction, including
+# without limitation the rights to use, copy, modify, merge, publish,
+# distribute, sublicense, and/or sell copies of the Software, and to
+# permit persons to whom the Software is furnished to do so, subject to
+# the following conditions:
+# 
+# The above copyright notice and this permission notice shall be
+# included in all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+"""Conversa osx keyring module."""
+
+from ctypes import (cdll, c_uint32, c_void_p, byref, memmove,
+                    create_string_buffer)
+from ctypes.util import find_library
+
+_security = cdll.LoadLibrary(find_library('Security'))
+_core = cdll.LoadLibrary(find_library('CoreService'))
+
+#Constants
+ERRSECSUCCESS = 0
+ERRSECUNIMPLEMENTED = -4
+ERRSECITEMNOTFOUND = -25300
+ERRSECDUPLICATEITEM = -25299
+
+
+def args_not_none(fn):
+    """Decorator that checks for not None parameters."""
+    def _fn_call(*args, **kwargs):
+        """Check the arguments than call the function."""
+        if any(arg is None for arg in args):
+            raise ValueError("Arguments can't be None")
+        return fn(*args, **kwargs)
+    return _fn_call
+
+
+class OsxKeychain(object):
+    """Represent a Mac OSX KeyChain."""
+    def __init__(self, name='login.keychain'):
+        """Arguments:
+        - name: The keychain name (it's a POSIX path)
+        """
+        self.kc = c_void_p()
+        if _security.SecKeychainOpen(name, byref(self.kc)) != ERRSECSUCCESS:
+            raise OSError("Can't open %s keychain", name)
+
+    @args_not_none
+    def get_password(self, service, username):
+        """
+        Retrieve the password from the keychain for
+        the given service, username.
+        """
+        p_length = c_uint32()
+        tmp = c_void_p()
+        res = _security.SecKeychainFindGenericPassword(self.kc, len(service),
+                                                       service, len(username),
+                                                       username,
+                                                       byref(p_length),
+                                                       byref(tmp), None)
+        #No entry for service, username
+        if res == ERRSECITEMNOTFOUND:
+            return None
+        #Not implemented error
+        if res == ERRSECUNIMPLEMENTED:
+            raise OSError("Not Implemented")
+        #Password is present
+        res = create_string_buffer(p_length.value)
+        memmove(res, tmp.value, p_length.value)
+        _security.SecKeychainItemFreeContent(None, tmp)
+        return res.raw
+
+    @args_not_none
+    def set_password(self, service, username, password):
+        """
+        Set password for service and username in the keychain,
+        raise if the password is already set.
+        """
+        p_length = c_uint32()
+        tmp = c_void_p()
+        #Check if the password is present
+        res = _security.SecKeychainFindGenericPassword(self.kc, len(service),
+                                                       service, len(username),
+                                                       username,
+                                                       byref(p_length),
+                                                       byref(tmp), None)
+        #The password is already in the keychain
+        if res != ERRSECITEMNOTFOUND:
+            _security.SecKeychainItemFreeContent(None, tmp)
+            raise OSError("Password is already present")
+
+        res = _security.SecKeychainAddGenericPassword(self.kc,
+                                                      len(service), service,
+                                                      len(username), username,
+                                                      len(password), password,
+                                                      None)
+        if res == ERRSECDUPLICATEITEM:
+            raise OSError("Password is already present")
+
+    @args_not_none
+    def delete_password(self, service, username):
+        """
+        Delete the entry for service, username and raises if not present.
+        """
+        tmp = c_void_p()
+        #Check if the password is present
+        res = _security.SecKeychainFindGenericPassword(self.kc, len(service),
+                                                       service, len(username),
+                                                       username, None,
+                                                       None,
+                                                       byref(tmp))
+        #Password not present in the KeyChain
+        if res == ERRSECITEMNOTFOUND:
+            raise OSError("Password not present in keychain")
+        _security.SecKeychainItemDelete(tmp)
+        _core.CFRelease(tmp)

conversa/passwords/test/__init__.py

+#-*- coding: utf-8 -*-
+# Copyright (c) 2012 Conversa and other contributors,
+#
+# Permission is hereby granted, free of charge, to any person obtaining
+# a copy of this software and associated documentation files (the
+# "Software"), to deal in the Software without restriction, including
+# without limitation the rights to use, copy, modify, merge, publish,
+# distribute, sublicense, and/or sell copies of the Software, and to
+# permit persons to whom the Software is furnished to do so, subject to
+# the following conditions:
+# 
+# The above copyright notice and this permission notice shall be
+# included in all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+"""Conversa password tests."""

conversa/passwords/test/test_osx.py

+#-*- coding: utf-8 -*-
+# Copyright (c) 2012 Conversa and other contributors,
+#
+# Permission is hereby granted, free of charge, to any person obtaining
+# a copy of this software and associated documentation files (the
+# "Software"), to deal in the Software without restriction, including
+# without limitation the rights to use, copy, modify, merge, publish,
+# distribute, sublicense, and/or sell copies of the Software, and to
+# permit persons to whom the Software is furnished to do so, subject to
+# the following conditions:
+# 
+# The above copyright notice and this permission notice shall be
+# included in all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+"""Osx keyring tests."""
+
+import uuid
+
+from twisted.internet import defer
+from twisted.trial.unittest import TestCase
+
+from conversa.passwords import osx
+
+from ubuntuone.devtools.testcases import skipTest
+
+
+class FakeCtypesValue(object):
+    """Represent a fake keyring."""
+
+    def __init__(self):
+        """Create a new instance."""
+        self.value = None
+
+
+class FakeRawResult(object):
+    """Represent a fake raw result."""
+
+    def __init__(self):
+        """Create a new instance."""
+        self.raw = None
+
+
+class FakeCoreService(object):
+    """Fake apple core service interface."""
+    def __init__(self):
+        self.called = []
+
+    def _get_function_name(self):
+        """Get the calling function name."""
+        import sys
+        return sys._getframe(1).f_code.co_name
+
+    def CFRelease(self, data):
+        """Release the memory allocated for data."""
+        self.called.append((self._get_function_name(), data))
+
+
+class FakeKeyring(object):
+    """Fake security interface."""
+
+    def __init__(self):
+        """Create a new instance."""
+        self.opened = False
+        self.called = []  # used to record the called functions
+        self.passwords = {}  # used to store the passwords
+
+    def _get_function_name(self):
+        """Get the calling function name."""
+        import sys
+        return sys._getframe(1).f_code.co_name
+
+    def SecKeychainOpen(self, name, pointer):
+        """Open the key chain."""
+        self.called.append((self._get_function_name(), name, pointer))
+        self.opened = True
+        return osx.ERRSECSUCCESS
+
+    def SecKeychainFindGenericPassword(self, pointer, service_length,
+                                       service, username_length, username,
+                                       pswd_length, pswd_data, item):
+        """Return an item."""
+        self.called.append((self._get_function_name(), pointer, service_length,
+                            service, username_length, username, pswd_length,
+                            pswd_data, item))
+        if (service, username) in self.passwords:
+            pswd = self.passwords[(service, username)]
+            if pswd_length:
+                pswd_length.value = len(pswd)
+            if pswd_data:
+                pswd_data.value = pswd
+            if item:
+                item.value = pswd
+            return True
+        return osx.ERRSECITEMNOTFOUND
+
+    def SecKeychainAddGenericPassword(self, pointer, service_length,
+                                      service, username_length, username,
+                                      pswd_length, pswd_data, item):
+        """Add an item"""
+        self.called.append((self._get_function_name(), pointer, service_length,
+                            service, username_length, username, pswd_length,
+                            pswd_data, item))
+        if (service, username) in self.passwords:
+            return osx.ERRSECDUPLICATEITEM
+        self.passwords[(service, username)] = pswd_data
+        return 0
+
+    def SecKeychainItemDelete(self, data):
+        """Delete an item."""
+        self.called.append((self._get_function_name(), data))
+        tmp = self.passwords.get(data)
+        if tmp:
+            del self.passwords[data]
+            return 0
+        else:
+            return osx.ERRSECITEMNOTFOUND
+
+    def SecKeychainItemFreeContent(self, attrs, data):
+        """Free an item."""
+        self.called.append((self._get_function_name(), attrs, data))
+
+
+class OsxKeyChainTestCase(TestCase):
+    """Test case for conversa.passwords.osx."""
+
+    @defer.inlineCallbacks
+    def setUp(self):
+        """Generate random (and uniques) service, username and password."""
+        yield super(OsxKeyChainTestCase, self).setUp()
+        self.service = str(uuid.uuid1())
+        self.name = str(uuid.uuid1())
+        self.password = str(uuid.uuid1())
+        self.security = FakeKeyring()
+        self.core = FakeCoreService()
+
+        # patch those functions used in the osx moduel
+        self.patch(osx, '_security', self.security)
+        self.patch(osx, '_core', self.core)
+        self.patch(osx, 'c_uint32', lambda: FakeCtypesValue())
+        self.patch(osx, 'c_void_p', lambda: FakeCtypesValue())
+        self.patch(osx, 'create_string_buffer', lambda _: FakeRawResult())
+        self.patch(osx, 'byref', lambda arg: arg)
+
+        def fake_memmove(result, value, length):
+            """Fake the memmove function."""
+            result.raw = value
+
+        self.patch(osx, 'memmove', fake_memmove)
+
+        self.keyring = osx.OsxKeychain()
+
+    def test_get_missing_password(self):
+        """Test getting a password not present."""
+        # nothing to do because the keyring is empty
+        self.assertEqual(None, self.keyring.get_password(self.service,
+                                                         self.name))
+
+    def test_get_present_password(self):
+        """Test getting a present password."""
+        self.security.passwords[(self.service, self.name)] = self.password
+        self.assertEqual(self.password, self.keyring.get_password(self.service,
+                                                                  self.name))
+        del self.security.passwords[(self.service, self.name)]
+
+    def test_set_present_password_raises(self):
+        """Test inserting a password that is already present."""
+        self.security.passwords[(self.service, self.name)] = self.password
+        self.assertRaises(OSError, self.keyring.set_password, self.service,
+                          self.name, self.password)
+        del self.security.passwords[(self.service, self.name)]
+
+    def test_set_missing_password(self):
+        """Test inserting a password that is not already in the keychain."""
+        self.keyring.set_password(self.service, self.name, self.password)
+        self.assertEqual(self.password, self.security.passwords[(self.service,
+                                                                 self.name)])
+        del self.security.passwords[(self.service, self.name)]
+
+    def test_delete_missing_password(self):
+        """Test deleting a password that is not present."""
+        if self.security.passwords.get((self.service, self.name)):
+            del self.security.passwords[(self.service, self.name)]
+
+        self.assertRaises(OSError, self.keyring.delete_password,
+                          self.service, self.name)
+
+    def test_delete_present_password(self):
+        """Test deleting a password."""
+        self.security.passwords[(self.service, self.name)] = self.password
+        self.keyring.delete_password(self.service, self.name)
+

conversa/test/test_account.py

 
     def test_server_raise(self):
         """Test case for conversa.accounts.Server ValueError."""
-        data = self.server_data
         with self.assertRaises(ValueError):
             accounts.Server(self.address, self.port, "ciao", self.username)
         with self.assertRaises(ValueError):
 
     def test_incoming_server_raise(self):
         """Test case for conversa.accounts.IncomingServer ValueError."""
-        data = self.server_data
         with self.assertRaises(ValueError):
             accounts.IncomingServer(self.username, self.port, self.security, 
                                     self.username, 5)
 
     def test_outgoing_server_raise(self):
         """Test case for conversa.accounts.OutgoingServer ValueError."""
-        data = self.out_data
         with self.assertRaises(ValueError):
             accounts.OutgoingServer(self.address, self.port, 5, self.username)
         with self.assertRaises(ValueError):
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.