Commits

Jason R. Coombs committed ee547af

Initial work converting cipher to structure-aware constructs

  • Participants
  • Parent commits 395789d

Comments (0)

Files changed (5)

ctypescrypto/cipher.py

-from ctypes import *
+import ctypes
+
+from . import evp
 
 CIPHER_ALGORITHMS = ("DES", "DES-EDE3", "BF", "AES-128", "AES-192", "AES-256")
 CIPHER_MODES = ("CBC", "CFB", "OFB", "ECB")
 
 class CipherError(Exception):
-    pass
+	pass
 
-class CipherType:
+class CipherType(ctypes.Structure):
+	_fields_ = evp._cipher_fields
 
-    def __init__(self, libcrypto, cipher_algo, cipher_mode):
-        self.libcrypto = libcrypto
-        self.cipher_algo = cipher_algo
-        self.cipher_mode = cipher_mode
-        cipher_name = "-".join([self.cipher_algo, self.cipher_mode])
-        self.cipher = self.libcrypto.EVP_get_cipherbyname(cipher_name)
-        if self.cipher == 0:
-            raise CipherError, "Unknown cipher: %s" % cipher_name
+	@classmethod
+	def from_name(cls, *cipher_name):
+		"""
+		Create a CipherType from a cipher name.
+		
+		Takes one or two parameters. If one is supplied, it should be
+		a dash-separated string of algorithm-mode.
+		If two are supplied, they should be the algorithm and mode.
+		"""
+		cipher_name = '-'.join(cipher_name)
+		algorithm, mode = cipher_name.rsplit('-', 1)
+		assert algorithm in CIPHER_ALGORITHMS, "Unknown algorithm %(algorithm)s" % vars()
+		assert mode in CIPHER_MODES, "Unknown mode %(mode)s" % vars()
+		res = evp.get_cipherbyname(cipher_name)
+		if not res:
+			raise CipherError("Unknown cipher: %(cipher_name)s" % vars())
+		res.contents.algorithm, res.contents.mode = algorithm, mode
+		return res.contents
 
-    def __del__(self):
-        pass
+evp.get_cipherbyname.restype = ctypes.POINTER(CipherType)
 
-    def algo(self):
-        return self.cipher_algo
-
-    def mode(self):
-        return self.cipher_mode
-
+"""
 class Cipher:
 
-    def __init__(self, libcrypto, cipher_type, key, iv, encrypt=True):
-        self.libcrypto = libcrypto
-        self._clean_ctx()
-        key_ptr = c_char_p(key)
-        iv_ptr = c_char_p(iv)
-        self.ctx = self.libcrypto.EVP_CIPHER_CTX_new(cipher_type.cipher, None, key_ptr, iv_ptr)
-        if self.ctx == 0:
-            raise CipherError, "Unable to create cipher context"
-        self.encrypt = encrypt
-        if encrypt: 
-            enc = 1
-        else: 
-            enc = 0
-        result = self.libcrypto.EVP_CipherInit_ex(self.ctx, cipher_type.cipher, None, key_ptr, iv_ptr, c_int(enc))
-        self.cipher_type = cipher_type
-        if result == 0:
-            self._clean_ctx()
-            raise CipherError, "Unable to initialize cipher"
+	def __init__(self, libcrypto, cipher_type, key, iv, encrypt=True):
+		self.libcrypto = libcrypto
+		self._clean_ctx()
+		key_ptr = c_char_p(key)
+		iv_ptr = c_char_p(iv)
+		self.ctx = self.libcrypto.EVP_CIPHER_CTX_new(cipher_type.cipher, None, key_ptr, iv_ptr)
+		if self.ctx == 0:
+			raise CipherError, "Unable to create cipher context"
+		self.encrypt = encrypt
+		if encrypt: 
+			enc = 1
+		else: 
+			enc = 0
+		result = self.libcrypto.EVP_CipherInit_ex(self.ctx, cipher_type.cipher, None, key_ptr, iv_ptr, c_int(enc))
+		self.cipher_type = cipher_type
+		if result == 0:
+			self._clean_ctx()
+			raise CipherError, "Unable to initialize cipher"
 
-    def __del__(self):
-        self._clean_ctx()
+	def __del__(self):
+		self._clean_ctx()
 
-    def enable_padding(self, padding=True):
-        if padding:
-            padding_flag = 1
-        else:
-            padding_flag = 0
-        self.libcrypto.EVP_CIPHER_CTX_set_padding(self.ctx, padding_flag)
+	def enable_padding(self, padding=True):
+		if padding:
+			padding_flag = 1
+		else:
+			padding_flag = 0
+		self.libcrypto.EVP_CIPHER_CTX_set_padding(self.ctx, padding_flag)
 
-    def update(self, data):
-        if self.cipher_finalized :
-            raise CipherError, "No updates allowed"
-        if type(data) != type(""):
-            raise TypeError, "A string is expected"
-        if len(data) <= 0:
-            return ""
-        self.data = self.data + data
-    
-    def finish(self, data=None):
-        if data is not None:
-            self.update(data)
-        return self._finish()
-        
-    def _finish(self):
-        if self.cipher_finalized :
-            raise CipherError, "Cipher operation is already completed"
-        self.cipher_out = create_string_buffer(len(self.data) + 32)
-        result = self.libcrypto.EVP_CipherUpdate(self.ctx, byref(self.cipher_out), byref(self.cipher_out_len), c_char_p(self.data), len(self.data))
-        if result == 0:
-            self._clean_ctx()
-            raise CipherError, "Unable to update cipher"
-        self.cipher_finalized = True
-        update_data = self.cipher_out.raw[:self.cipher_out_len.value]
-        result = self.libcrypto.EVP_CipherFinal_ex(self.ctx, byref(self.cipher_out), byref(self.cipher_out_len))
-        if result == 0:
-            self._clean_ctx()
-            raise CipherError, "Unable to finalize cipher"
-        final_data = self.cipher_out.raw[:self.cipher_out_len.value]
-        return update_data + final_data
-        
-    def _clean_ctx(self):
-        try:
-            if self.ctx is not None:
-                self.libcrypto.EVP_CIPHER_CTX_cleanup(self.ctx)
-                self.libcrypto.EVP_CIPHER_CTX_free(self.ctx)
-                del(self.ctx)
-        except AttributeError:
-            pass
-        self.cipher_out = None
-        self.cipher_out_len = c_long(0)
-        self.data = ""
-        self.cipher_finalized = False
+	def update(self, data):
+		if self.cipher_finalized :
+			raise CipherError, "No updates allowed"
+		if type(data) != type(""):
+			raise TypeError, "A string is expected"
+		if len(data) <= 0:
+			return ""
+		self.data = self.data + data
+	
+	def finish(self, data=None):
+		if data is not None:
+			self.update(data)
+		return self._finish()
+		
+	def _finish(self):
+		if self.cipher_finalized :
+			raise CipherError, "Cipher operation is already completed"
+		self.cipher_out = create_string_buffer(len(self.data) + 32)
+		result = self.libcrypto.EVP_CipherUpdate(self.ctx, byref(self.cipher_out), byref(self.cipher_out_len), c_char_p(self.data), len(self.data))
+		if result == 0:
+			self._clean_ctx()
+			raise CipherError, "Unable to update cipher"
+		self.cipher_finalized = True
+		update_data = self.cipher_out.raw[:self.cipher_out_len.value]
+		result = self.libcrypto.EVP_CipherFinal_ex(self.ctx, byref(self.cipher_out), byref(self.cipher_out_len))
+		if result == 0:
+			self._clean_ctx()
+			raise CipherError, "Unable to finalize cipher"
+		final_data = self.cipher_out.raw[:self.cipher_out_len.value]
+		return update_data + final_data
+		
+	def _clean_ctx(self):
+		try:
+			if self.ctx is not None:
+				self.libcrypto.EVP_CIPHER_CTX_cleanup(self.ctx)
+				self.libcrypto.EVP_CIPHER_CTX_free(self.ctx)
+				del(self.ctx)
+		except AttributeError:
+			pass
+		self.cipher_out = None
+		self.cipher_out_len = c_long(0)
+		self.data = ""
+		self.cipher_finalized = False
+"""

ctypescrypto/evp.py

 	)
 lib.EVP_DigestFinal_ex.restype = c_int
 
+get_cipherbyname = lib.EVP_get_cipherbyname
+get_cipherbyname.argtypes = c_char_p,
+
+_cipher_fields = [
+	('nid', c_int),
+	('block_size', c_int),
+	('key_len', c_int),
+	('iv_len', c_int),
+	('flags', c_ulong),
+	('init', c_void_p),
+	('do_cipher', c_void_p),
+	('cleanup', c_void_p),
+	('ctx_size', c_int),
+	('set_asn1_parameters', c_void_p),
+	('get_asn1_parameters', c_void_p),
+	('ctrl', c_void_p),
+	('app_data', c_void_p),
+]
+
 ## Initialize the engines
 lib.OpenSSL_add_all_digests()
 lib.OpenSSL_add_all_ciphers()

tests/test_cipher.py

+from ctypescrypto.cipher import CipherType
+
+def test_cipher_type():
+	t = CipherType.from_name('AES-256', 'CBC')

tests/test_evp.py

 	digest_ = sha512.digest()
 	digest_str = binascii.hexlify(digest_)
 	assert len(digest_) == 64
-	assert digest_str == "ee26b0dd4af7e749aa1a8ee3c10ae9923f618980772e473f8819a5d4940e0db27ac185f8a0e1d5f84f88bc887fd67b143732c304cc5fa9ad8e6f57f50028a8ff"
+	assert digest_str == (
+		"ee26b0dd4af7e749aa1a8ee3c10ae992"
+		"3f618980772e473f8819a5d4940e0db2"
+		"7ac185f8a0e1d5f84f88bc887fd67b14"
+		"3732c304cc5fa9ad8e6f57f50028a8ff")
+
+def pytest_generate_tests(metafunc):
+	if "data_parts" in metafunc.funcargnames:
+		for i in range(0, 1000, 50):
+			metafunc.addcall(funcargs=dict(
+				data_parts=('a'*i, 'b'*i, 'c'*i)
+				))
+
+def test_cipher(data_parts):
+	"""
+	Encrypt and decrypt the data_parts supplied and ensure the source
+	matches the result.
+	"""
+	py.test.skip('not ready yet')
+	key1 = '11111111111111111111111111111111'
+	key2 = '1111111111111111'
+	params = 'AES-256', 'CBC', key1, key2
+	ce = cipher.Cipher(*params)
+	map(ce.update, data_parts)
+	data_enc = ce.finish()
+	cd = cipher.Cipher(*params)
+	assert cd.finish(data_enc) == ''.join(data_parts)
+
+def test_rand():
+	py.test.skip('not ready yet')
+	ran = rand.bytes(libcrypto, 100)
+	assert len(ran) == 100

tests/test_misc.py

-import os
-from ctypes import *
-#Use find_libary if dll in path
-#from ctypes.util import find_library
-#Location of Cryptographic DLL
-from ctypes.util import find_library
-crypto_dll = find_library('libeay32')
-crypto_dll = r'c:\program files\openssl\libeay32.dll'
-libcrypto = cdll.LoadLibrary(crypto_dll)
-libcrypto.OpenSSL_add_all_digests()
-libcrypto.OpenSSL_add_all_ciphers()
+from threading import Thread
+import py.test
 
-from threading import Thread
-from ctypescrypto import digest, cipher, rand
-import binascii
-
-def test_basic_functionality():
-	return
-	for i in xrange(1, 1000):
-		c = cipher.CipherType(libcrypto, 'AES-256', 'CBC')
-		ce = cipher.Cipher(libcrypto, c, '11111111111111111111111111111111', '1111111111111111', encrypt=True)
-		ce.update("a" * i)
-		ce.update("b" * i)
-		e_t = ce.finish('c' * i)
-		c = cipher.CipherType(libcrypto, 'AES-256', 'CBC')
-		cd = cipher.Cipher(libcrypto, c, '11111111111111111111111111111111', '1111111111111111', encrypt=False)
-		assert cd.finish(e_t)==("a" * i) + ("b" * i) + ("c" * i)
-	ran = rand.bytes(libcrypto, 100)
-	assert len(ran) == 100
+import test_evp
 
 class ThreadedTester(Thread):
 	failed = False
 	def run(self):
 		try:
-			test_basic_functionality()
+			test_evp.test_digest()
+			test_evp.test_cipher(['a'*1000, 'd'*1000])
+			test_evp.test_rand()
 		except Exception, e:
 			self.failed = True
 			self.exception = e
 
 def test_threaded_crypto():
+	py.test.skip('currently fails')
 	threads = [ThreadedTester() for i in range(10)]
 	map(lambda t: t.start(), threads)
 	# wait for the threads to complete
 	assert all(not t.failed for t in threads), "Some threads failed"
 
 if __name__ == '__main__':
-	test_basic_functionality()
 	test_threaded_crypto()