Commits

Jajcus  committed 36af809

Certificate decoding with pyasn1

SubjectAltName decoding not finished yet

  • Participants
  • Parent commits 35f4122

Comments (0)

Files changed (1)

File pyxmpp2/cert.py

 from datetime import datetime
 
 try:
-    from pyasn1_modules.rfc2459 import Certificate
+    from pyasn1_modules.rfc2459 import Certificate, DirectoryString, MAX, Name
+    from pyasn1_modules import pem
+    from pyasn1.codec.der import decoder as der_decoder
+    from pyasn1.type.char import BMPString, IA5String
+    from pyasn1.type.univ import Sequence, SequenceOf, Choice
+    from pyasn1.type.univ import Any, ObjectIdentifier
+    from pyasn1.type.namedtype import NamedTypes, NamedType
+    from pyasn1.type.useful import GeneralizedTime
+    from pyasn1.type.constraint import ValueSizeConstraint
+    from pyasn1.type import tag
     HAVE_PYASN1 = True
 except ImportError:
     HAVE_PYASN1 = False
                 cert.alt_names[name].append(value)
         if 'notAfter' in data:
             tstamp = ssl.cert_time_to_seconds(data['notAfter'])
-            cert.not_after = datetime.fromtimestamp(tstamp)
+            cert.not_after = datetime.utcfromtimestamp(tstamp)
         if sys.version_info.major < 3:
             cert._decode_names() # pylint: disable=W0212
         cert.common_names = []
                 new.append(name)
             self.alt_names[key] = new
 
+dn_oids = {
+        (2, 5, 4, 41): u"Name",
+        (2, 5, 4, 4): u"Surname",
+        (2, 5, 4, 42): u"GivenName",
+        (2, 5, 4, 43): u"Initials",
+        (2, 5, 4, 3): u"CommonName",
+        (2, 5, 4, 7): u"LocalityName",
+        (2, 5, 4, 8): u"StateOrProvinceName",
+        (2, 5, 4, 10): u"OrganizationName",
+        (2, 5, 4, 11): u"OrganizationalUnitName",
+        (2, 5, 4, 12): u"Title",
+        (2, 5, 4, 6): u"CountryName",
+}
+
+def _decode_asn1_string(data):
+    if isinstance(data, BMPString):
+        return bytes(data).decode("utf-16")
+    else:
+        return bytes(data).decode("utf-8")
+
+if HAVE_PYASN1:
+    class OtherName(Sequence):
+        componentType = NamedTypes(
+                NamedType('type-id', ObjectIdentifier()),
+                NamedType('value', Any().subtype(explicitTag = tag.Tag(
+                                tag.tagClassContext, tag.tagFormatSimple, 0)))
+                )
+
+    class GeneralName(Choice):
+        componentType = NamedTypes(
+                NamedType('otherName',
+                    OtherName().subtype(implicitTag = tag.Tag(
+                        tag.tagClassContext, tag.tagFormatSimple, 0))),
+                NamedType('rfc822Name',
+                    IA5String().subtype(implicitTag = tag.Tag(
+                        tag.tagClassContext, tag.tagFormatSimple, 1))),
+                NamedType('dNSName',
+                    IA5String().subtype(implicitTag = tag.Tag(
+                        tag.tagClassContext, tag.tagFormatSimple, 2))),
+                NamedType('x400Address',
+                    Any().subtype(implicitTag = tag.Tag(
+                        tag.tagClassContext, tag.tagFormatSimple, 3))),
+                NamedType('directoryName',
+                    Name().subtype(implicitTag = tag.Tag(
+                        tag.tagClassContext, tag.tagFormatSimple, 4))),
+                )
+
+    class GeneralNames(SequenceOf):                                              
+        componentType = GeneralName()                                                 
+        sizeSpec = SequenceOf.sizeSpec + ValueSizeConstraint(1, MAX)
+
+
+class ASN1CertificateData(CertificateData):
+    """Certificate information interface.
+    
+    This class actually decodes the certificate, providing all the
+    names there.
+    """
+    _cert_asn1_type = None
+    @classmethod
+    def from_ssl_socket(cls, ssl_socket):
+        """Load certificate data from an SSL socket.
+        """
+        try:
+            data = ssl_socket.getpeercert(True)
+        except AttributeError:
+            # PyPy doesn't have .getpeercert
+            data = None
+        if not data:
+            logger.debug("No certificate infromation")
+            return cls()
+        result = cls.from_der_data(data)
+        result.validated = bool(ssl_socket.getpeercert())
+        return result
+
+    @classmethod
+    def from_der_data(cls, data):
+        logger.debug("Decoding DER certificate: {0!r}".format(data))
+        if cls._cert_asn1_type is None:
+            cls._cert_asn1_type = Certificate()
+        cert = der_decoder.decode(data, asn1Spec = cls._cert_asn1_type)[0]
+        result = cls()
+        tbs_cert = cert.getComponentByName('tbsCertificate')
+        subject = tbs_cert.getComponentByName('subject')
+        logger.debug("Subject: {0!r}".format(subject))
+        result.common_names = []
+        subject_name = []
+        for rdnss in subject:
+            for rdns in rdnss:
+                rdnss_list = []
+                for nameval in rdns:
+                    val_type = nameval.getComponentByName('type')
+                    value = nameval.getComponentByName('value')
+                    if val_type not in dn_oids:
+                        logger.debug("OID {0} not supported".format(val_type))
+                        continue
+                    val_type = dn_oids[val_type]
+                    value = der_decoder.decode(value, 
+                                            asn1Spec = DirectoryString())[0]
+                    value = value.getComponent()
+                    try:
+                        value = _decode_asn1_string(value)
+                    except UnicodeError:
+                        logger.debug("Cannot decode value: {0!r}".format(value))
+                        continue
+                    if val_type == u"CommonName":
+                        result.common_names.append(value)
+                    rdnss_list.append((val_type, value))
+                subject_name.append(tuple(rdnss_list))
+        result.subject_name = tuple(subject_name)
+        validity = tbs_cert.getComponentByName('validity')
+        not_after = validity.getComponentByName('notAfter')
+        not_after = str(not_after.getComponent())
+        if isinstance(not_after, GeneralizedTime):
+            result.not_after = datetime.strptime(not_after, "%Y%m%d%H%M%SZ")
+        else:
+            result.not_after = datetime.strptime(not_after, "%y%m%d%H%M%SZ")
+        extensions = tbs_cert.getComponentByName('extensions')
+        if extensions:
+            for extension in extensions:
+                logger.debug("Extension: {0!r}".format(extension))
+                oid = extension.getComponentByName('extnID')
+                logger.debug("OID: {0!r}".format(oid))
+                if oid != ObjectIdentifier('2.5.29.17'):
+                    continue
+                value = extension.getComponentByName('extnValue')
+                logger.debug("Value: {0!r}".format(value))
+                #alt_name = der_decoder.decode(value, 
+                #                            asn1Spec = GeneralNames())[0]
+                #logger.debug("SubjectAltName: {0!r}".format(alt_name))
+        return result
+
     @classmethod
     def from_file(cls, filename):
         """Load certificate from a file.
         """
-        # pylint: disable=W0613
-        return cls()
+        with open(filename, "r") as pem_file:
+            data = pem.readPemFromFile(pem_file)
+        return cls.from_der_data(data)
 
-def get_certificate_from_ssl_socket(ssl_socket):
-    """Get certificate data from an SSL socket.
-    """
-    return BasicCertificateData.from_ssl_socket(ssl_socket)
+if HAVE_PYASN1:
+    def get_certificate_from_ssl_socket(ssl_socket):
+        """Get certificate data from an SSL socket.
+        """
+        return ASN1CertificateData.from_ssl_socket(ssl_socket)
+else:
+    def get_certificate_from_ssl_socket(ssl_socket):
+        """Get certificate data from an SSL socket.
+        """
+        return BasicCertificateData.from_ssl_socket(ssl_socket)
 
 def get_certificate_from_file(filename):
     """Get certificate data from a PEM file.
     """
-    return BasicCertificateData.from_file(filename)
-
-
+    return ASN1CertificateData.from_file(filename)