Commits

Jason R. Coombs committed 15c17b1

Made some progress toward a complete Cipher implementation

Comments (0)

Files changed (4)

ctypescrypto/cipher.py

 
 evp.get_cipherbyname.restype = ctypes.POINTER(CipherType)
 
-"""
-class Cipher:
+class Cipher(ctypes.Structure):
+	_fields_ = evp._cipher_context_fields
+	finalized = False
 
-	def __init__(self, libcrypto, cipher_type, key, iv, encrypt=True):
-		self.libcrypto = libcrypto
-		self._clean_ctx()
-		key_ptr = c_char_p(key)
-		iv_ptr = c_char_p(iv)
-		self.ctx = self.libcrypto.EVP_CIPHER_CTX_new(cipher_type.cipher, None, key_ptr, iv_ptr)
-		if self.ctx == 0:
-			raise CipherError, "Unable to create cipher context"
-		self.encrypt = encrypt
-		if encrypt: 
-			enc = 1
-		else: 
-			enc = 0
-		result = self.libcrypto.EVP_CipherInit_ex(self.ctx, cipher_type.cipher, None, key_ptr, iv_ptr, c_int(enc))
-		self.cipher_type = cipher_type
-		if result == 0:
-			self._clean_ctx()
-			raise CipherError, "Unable to initialize cipher"
+	def __init__(self, type, key, iv, encrypt=True):
+		evp.CIPHER_CTX_init(self)
+		engine = None
+		type = self.interpret_type(type)
+		res = evp.CipherInit_ex(self, type, engine, key, iv, encrypt)
+		if res == 0:
+			raise CipherError("Unable to initialize cipher")
+		self.out_data = []
 
-	def __del__(self):
-		self._clean_ctx()
+	@staticmethod
+	def interpret_type(type):
+		if not isinstance(type, CipherType):
+			if not hasattr(type, __iter__):
+				type = [type]
+			type = CipherType(*type)
+		return type
 
-	def enable_padding(self, padding=True):
-		if padding:
-			padding_flag = 1
-		else:
-			padding_flag = 0
-		self.libcrypto.EVP_CIPHER_CTX_set_padding(self.ctx, padding_flag)
+	def set_padding(self, padding=True):
+		evp.CIPHER_CTX_set_padding(self, padding)
 
 	def update(self, data):
-		if self.cipher_finalized :
-			raise CipherError, "No updates allowed"
-		if type(data) != type(""):
-			raise TypeError, "A string is expected"
-		if len(data) <= 0:
-			return ""
-		self.data = self.data + data
-	
-	def finish(self, data=None):
+		"""
+		From docs:
+		EVP_EncryptUpdate() encrypts inl bytes from the buffer in and writes the encrypted version to out. This function can be called multiple times to encrypt successive blocks of data. The amount of data written depends on the block alignment of the encrypted data: as a result the amount of data written may be anything from zero bytes to (inl + cipher_block_size - 1) so outl should contain sufficient room. The actual number of bytes written is placed in outl. 
+		"""
+		if self.finalized:
+			raise CipherError("No updates allowed")
+		if not isinstance(data, basestring):
+			raise TypeError("A string is expected")
+		out = ctypes.create_string_buffer(len(data) + evp.MAX_BLOCK_LENGTH - 1)
+		out_len = ctypes.c_int()
+		
+		res = evp.CipherUpdate(self, out, out_len, data, len(data))
+		if res != 1:
+			raise CipherError("Error updating cipher")
+		self.out_data.append(out.raw[:out_len])
+
+	def final(self, data=None):
 		if data is not None:
 			self.update(data)
-		return self._finish()
-		
-	def _finish(self):
-		if self.cipher_finalized :
-			raise CipherError, "Cipher operation is already completed"
-		self.cipher_out = create_string_buffer(len(self.data) + 32)
-		result = self.libcrypto.EVP_CipherUpdate(self.ctx, byref(self.cipher_out), byref(self.cipher_out_len), c_char_p(self.data), len(self.data))
-		if result == 0:
-			self._clean_ctx()
-			raise CipherError, "Unable to update cipher"
-		self.cipher_finalized = True
-		update_data = self.cipher_out.raw[:self.cipher_out_len.value]
-		result = self.libcrypto.EVP_CipherFinal_ex(self.ctx, byref(self.cipher_out), byref(self.cipher_out_len))
-		if result == 0:
-			self._clean_ctx()
-			raise CipherError, "Unable to finalize cipher"
-		final_data = self.cipher_out.raw[:self.cipher_out_len.value]
-		return update_data + final_data
-		
-	def _clean_ctx(self):
-		try:
-			if self.ctx is not None:
-				self.libcrypto.EVP_CIPHER_CTX_cleanup(self.ctx)
-				self.libcrypto.EVP_CIPHER_CTX_free(self.ctx)
-				del(self.ctx)
-		except AttributeError:
-			pass
-		self.cipher_out = None
-		self.cipher_out_len = c_long(0)
-		self.data = ""
-		self.cipher_finalized = False
-"""
+		self.finalized = True
+		out = ctypes.create_string_buffer(evp.MAX_BLOCK_LENGTH)
+		out_len = ctypes.c_int()
+		res = evp.CipherFinal_ex(self, out, out_len)
+		if not res == 1:
+			raise CipherError("Error finalizing cipher")
+		self.out_data.append(out.raw[:out_len])
+		self.final = lambda: ''.join(self.out_data)
+		return ''.join(self.out_data)
+
+_init_args = (ctypes.POINTER(Cipher), ctypes.POINTER(CipherType),
+	ctypes.c_void_p,
+	ctypes.c_char_p, ctypes.c_char_p,
+	)
+_update_args = (ctypes.POINTER(Cipher), ctypes.c_char_p,
+	ctypes.POINTER(ctypes.c_int),
+	ctypes.c_char_p, ctypes.c_int,
+	)
+_final_args = (ctypes.POINTER(Cipher), ctypes.c_char_p,
+	ctypes.POINTER(ctypes.c_int),
+	)
+evp.EncryptInit_ex.argtypes = evp.DecryptInit_ex.argtypes = evp.CipherInit_ex.argtypes = _init_args
+evp.EncryptUpdate.argtypes = evp.DecryptUpdate.argtypes = evp.CipherUpdate.argtypes = _update_args
+evp.EncryptFinal_ex.argtypes = evp.DecryptFinal_ex.argtypes = evp.CipherFinal_ex.argtypes = _final_args
+
+import itertools
 from ctypes import *
 
 from .support import find_library
 	('app_data', c_void_p),
 ]
 
+MAX_IV_LENGTH = 16
+MAX_BLOCK_LENGTH = 32
+MAX_KEY_LENGTH = 32
+
+_cipher_context_fields = [
+	('cipher', c_void_p), # POINTER(CipherType)
+	('engine', c_void_p), # POINTER(ENGINE)
+	('encrypt', c_int),
+	('buf_len', c_int),
+	('oiv', c_char*MAX_IV_LENGTH),
+	('iv', c_char*MAX_IV_LENGTH),
+	('buf', c_char*MAX_BLOCK_LENGTH),
+	('num', c_int),
+	('app_data', c_void_p),
+	('key_len', c_int),
+	('flags', c_ulong),
+	('cipher_data', c_void_p),
+	('final_used', c_int),
+	('block_mask', c_int),
+	('final', c_char*MAX_BLOCK_LENGTH),
+]
+
+#EncryptInit_ex = lib.EVP_EncryptInit_ex
+#DecryptInit_ex = lib.EVP_DecryptInit_ex
+#...
+for ed, method in itertools.product(
+	['Encrypt', 'Decrypt', 'Cipher'],
+	['Init_ex', 'Update', 'Final_ex'],
+	):
+	local_name = ''.join([ed, method])
+	lib_name = ''.join(['EVP_', ed, method])
+	func = getattr(lib, lib_name)
+	func.restype = c_int
+	globals()[local_name] = func
+
 ## Initialize the engines
 lib.OpenSSL_add_all_digests()
 lib.OpenSSL_add_all_ciphers()
-from ctypescrypto.cipher import CipherType
+from ctypescrypto import cipher
 
 def test_cipher_type():
-	t = CipherType.from_name('AES-256', 'CBC')
+	t = cipher.CipherType.from_name('AES-256', 'CBC')
+
+def pytest_generate_tests(metafunc):
+	if "data_parts" in metafunc.funcargnames:
+		for i in range(0, 1000, 50):
+			metafunc.addcall(funcargs=dict(
+				data_parts=('a'*i, 'b'*i, 'c'*i)
+				))
+
+def test_cipher(data_parts):
+	"""
+	Encrypt and decrypt the data_parts supplied and ensure the source
+	matches the result.
+	"""
+	key = '11111111111111111111111111111111'
+	iv = '1111111111111111'
+	params = ('AES-256', 'CBC'), key, iv
+	ce = cipher.Cipher(*params)
+	map(ce.update, data_parts)
+	data_enc = ce.finish()
+	cd = cipher.Cipher(*params, encrypt=False)
+	assert cd.finish(data_enc) == ''.join(data_parts)
+
 		"7ac185f8a0e1d5f84f88bc887fd67b14"
 		"3732c304cc5fa9ad8e6f57f50028a8ff")
 
-def pytest_generate_tests(metafunc):
-	if "data_parts" in metafunc.funcargnames:
-		for i in range(0, 1000, 50):
-			metafunc.addcall(funcargs=dict(
-				data_parts=('a'*i, 'b'*i, 'c'*i)
-				))
-
-def test_cipher(data_parts):
-	"""
-	Encrypt and decrypt the data_parts supplied and ensure the source
-	matches the result.
-	"""
-	py.test.skip('not ready yet')
-	key1 = '11111111111111111111111111111111'
-	key2 = '1111111111111111'
-	params = 'AES-256', 'CBC', key1, key2
-	ce = cipher.Cipher(*params)
-	map(ce.update, data_parts)
-	data_enc = ce.finish()
-	cd = cipher.Cipher(*params)
-	assert cd.finish(data_enc) == ''.join(data_parts)
-
 def test_rand():
 	py.test.skip('not ready yet')
 	ran = rand.bytes(libcrypto, 100)