Jason R. Coombs avatar Jason R. Coombs committed 4807309

Moved DigestType and Digest back to digest module

Comments (0)

Files changed (4)

ctypescrypto/digest.py

-from evp import DigestType, Digest
+import ctypes
 
+import evp
+
+class DigestError(Exception): pass
+
+class DigestType(ctypes.Structure):
+	_fields_ = evp._digest_type_fields
+
+	@classmethod
+	def from_name(cls, digest_name):
+		res = evp.get_digestbyname(digest_name)
+		if not res:
+			raise DigestError("Unknown Digest: %(digest_name)s" % vars())
+		return res.contents
+
+class Digest(ctypes.Structure):
+	_fields_ = evp._digest_context_fields
+	finalized = False
+	def __init__(self, digest_type):
+		self.digest_type = digest_type
+		evp.MD_CTX_init(self)
+		result = evp.DigestInit_ex(self, digest_type, None)
+		if result == 0:
+			raise DigestError("Unable to initialize digest")
+
+	def update(self, data):
+		if self.finalized:
+			raise DigestError("Digest is finalized; no updates allowed")
+		if not isinstance(data, basestring):
+			raise TypeError("A string is expected")
+		result = evp.DigestUpdate(self, data, len(data))
+		if result != 1:
+			raise DigestError, "Unable to update digest"
+		
+	def digest(self, data=None):
+		if data is not None:
+			self.update(data)
+		result_buffer = ctypes.create_string_buffer(evp.MAX_MD_SIZE)
+		result_length = ctypes.c_uint()
+		res_code = evp.DigestFinal_ex(self, result_buffer,
+			result_length)
+		if res_code != 1 :
+			raise DigestError, "Unable to finalize digest"
+		self.finalized = True
+		result = result_buffer.raw[:result_length.value]
+		# override self.digest to return the same result on subsequent
+		#  calls
+		self.digest = lambda: result
+		return result
+
+evp._set_digest_arg_types(DigestType, Digest)

ctypescrypto/evp.py

 
 MAX_MD_SIZE = 64
 
-class DigestError(Exception): pass
+_digest_type_fields = [
+	('type', c_int),
+	('pkey_type', c_int),
+	('md_size', c_int),
+	('flags', c_ulong),
+	('init', c_void_p),
+	('update', c_void_p),
+	('final', c_void_p),
+	('copy', c_void_p),
+	('cleanup', c_void_p),
+	('sign', c_void_p),
+	('verify', c_void_p),
+	('required_pkey_type', c_int*5),
+	('block size', c_int),
+	('ctx_size', c_int),
+	('md_ctrl', c_void_p),
+]
 
-class DigestType(Structure):
-	_fields_ = [
-		('type', c_int),
-		('pkey_type', c_int),
-		('md_size', c_int),
-		('flags', c_ulong),
-		('init', c_void_p),
-		('update', c_void_p),
-		('final', c_void_p),
-		('copy', c_void_p),
-		('cleanup', c_void_p),
-		('sign', c_void_p),
-		('verify', c_void_p),
-		('required_pkey_type', c_int*5),
-		('block size', c_int),
-		('ctx_size', c_int),
-		('md_ctrl', c_void_p),
-	]
+_digest_context_fields = [
+	('p_type', c_void_p), # POINTER(DigestType)
+	('engine', c_void_p), # todo, POINTER(ENGINE)
+	('flags', c_ulong),
+	('md_data', c_void_p),
+	('pctx', c_void_p), # todo, POINTER(EVP_PKEY_CTX)
+	('update_func', c_void_p),
+]
 
-	@classmethod
-	def from_name(cls, digest_name):
-		res = lib.EVP_get_digestbyname(digest_name)
-		if not res:
-			raise DigestError("Unknown Digest: %(digest_name)s" % vars())
-		return res.contents
-
-class DigestContext(Structure):
-	c_name = 'EVP_MD_CTX'
-	_fields_ = [
-		('digest', POINTER(DigestType)),
-		('engine', c_void_p), # todo, POINTER(ENGINE)
-		('flags', c_ulong),
-		('md_data', c_void_p),
-		('pctx', c_void_p), # todo, POINTER(EVP_PKEY_CTX)
-		('update', c_void_p),
-		]
-
-	def __init__(self):
-		lib.EVP_MD_CTX_init(self)
-
-class Digest(object):
-	finalized = False
-	def __init__(self, digest_type):
-		self.digest_type = digest_type
-		self.context = DigestContext()
-		result = lib.EVP_DigestInit_ex(self.context, digest_type, None)
-		if result == 0:
-			raise DigestError("Unable to initialize digest")
-
-	def update(self, data):
-		if self.finalized:
-			raise DigestError("Digest is finalized; no updates allowed")
-		if not isinstance(data, basestring):
-			raise TypeError("A string is expected")
-		result = lib.EVP_DigestUpdate(self.context, data, len(data))
-		if result != 1:
-			raise DigestError, "Unable to update digest"
-		
-	def digest(self, data=None):
-		if data is not None:
-			self.update(data)
-		result_buffer = create_string_buffer(MAX_MD_SIZE)
-		result_length = c_uint()
-		res_code = lib.EVP_DigestFinal_ex(self.context, result_buffer,
-			result_length)
-		if res_code != 1 :
-			raise DigestError, "Unable to finalize digest"
-		self.finalized = True
-		result = result_buffer.raw[:result_length.value]
-		# override self.digest to return the same result on subsequent
-		#  calls
-		self.digest = lambda: result
-		return result
+def _reg(name):
+	"""
+	Copy the function by the given name from the EVP library into this
+	namespace.
+	"""
+	libname = 'EVP_' + name
+	globals()[name] = getattr(lib, libname)
 
 lib = find_library('libeay32')
 ## Define the argtypes and result types for the EVP functions
-lib.EVP_get_digestbyname.argtypes = c_char_p,
-lib.EVP_get_digestbyname.restype = POINTER(DigestType)
-lib.EVP_DigestInit.argtypes = (
-	POINTER(DigestContext), POINTER(DigestType),
-	)
-lib.EVP_DigestInit_ex.argtypes = lib.EVP_DigestInit.argtypes + (c_void_p,)
-lib.EVP_DigestInit_ex.restype = c_int
-lib.EVP_MD_CTX_init.argtypes = POINTER(DigestContext),
-lib.EVP_MD_CTX_create.restype = POINTER(DigestContext)
-lib.EVP_DigestUpdate.argtypes = POINTER(DigestContext), c_char_p, c_int
-lib.EVP_DigestUpdate.restype = c_int
-lib.EVP_DigestFinal_ex.argtypes = (POINTER(DigestContext),
-	c_char_p, POINTER(c_uint),
-	)
-lib.EVP_DigestFinal_ex.restype = c_int
+map(_reg, 'get_digestbyname DigestInit DigestInit_ex MD_CTX_init '
+	'MD_CTX_create DigestUpdate DigestFinal_ex'.split())
 
-get_cipherbyname = lib.EVP_get_cipherbyname
+def _set_digest_arg_types(DigestType, Digest):
+	get_digestbyname.argtypes = c_char_p,
+	get_digestbyname.restype = POINTER(DigestType)
+	DigestInit.argtypes = (
+		POINTER(Digest), POINTER(DigestType),
+		)
+	DigestInit_ex.argtypes = lib.EVP_DigestInit.argtypes + (c_void_p,)
+	DigestInit_ex.restype = c_int
+	MD_CTX_init.argtypes = POINTER(Digest),
+	MD_CTX_create.restype = POINTER(Digest)
+	DigestUpdate.argtypes = POINTER(Digest), c_char_p, c_int
+	DigestUpdate.restype = c_int
+	DigestFinal_ex.argtypes = (POINTER(Digest),
+		c_char_p, POINTER(c_uint),
+		)
+	DigestFinal_ex.restype = c_int
+
+_reg('get_cipherbyname')
 get_cipherbyname.argtypes = c_char_p,
 
 _cipher_fields = [

tests/test_evp.py

-import binascii
-
 import py.test
 from ctypescrypto import evp, digest
 
-def test_load_valid_digest_type_by_name():
-	t = evp.DigestType.from_name('SHA256')
-
-def test_load_invalid_digest_type_by_name():
-	# dne is Does Not Exist
-	py.test.raises(evp.DigestError, evp.DigestType.from_name, 'sha-dne')
-
-def test_digest():
-	digest_type = digest.DigestType.from_name('SHA512')
-	sha512 = digest.Digest(digest_type)
-	sha512.update("test")
-	assert not sha512.finalized
-	digest_ = sha512.digest()
-	digest_str = binascii.hexlify(digest_)
-	assert len(digest_) == 64
-	assert digest_str == (
-		"ee26b0dd4af7e749aa1a8ee3c10ae992"
-		"3f618980772e473f8819a5d4940e0db2"
-		"7ac185f8a0e1d5f84f88bc887fd67b14"
-		"3732c304cc5fa9ad8e6f57f50028a8ff")
-
 def test_rand():
 	py.test.skip('not ready yet')
 	ran = rand.bytes(libcrypto, 100)

tests/test_threads.py

 
 import test_evp
 import test_cipher
+import test_digest
 
 class ThreadedTester(Thread):
 	failed = False
 	def run(self):
 		try:
-			test_evp.test_digest()
+			test_digest.test_digest()
 			test_cipher.test_cipher(['a'*1000, 'd'*1000])
 			#test_evp.test_rand()
 		except Exception, e:
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.