Commits

Jajcus committed e4de81c

pylinted

  • Participants
  • Parent commits e241528

Comments (0)

Files changed (1)

 from datetime import datetime
 
 try:
-    from pyasn1_modules.rfc2459 import Certificate, DirectoryString, MAX, Name
-    from pyasn1_modules import pem
-    from pyasn1.codec.der import decoder as der_decoder
-    from pyasn1.type.char import BMPString, IA5String, UTF8String
-    from pyasn1.type.univ import Sequence, SequenceOf, Choice
-    from pyasn1.type.univ import Any, ObjectIdentifier, OctetString
-    from pyasn1.type.namedtype import NamedTypes, NamedType
-    from pyasn1.type.useful import GeneralizedTime
-    from pyasn1.type.constraint import ValueSizeConstraint
-    from pyasn1.type import tag
+    import pyasn1  # pylint: disable=W0611
+    import pyasn1_modules.rfc2459  # pylint: disable=W0611
     HAVE_PYASN1 = True
 except ImportError:
     HAVE_PYASN1 = False
 
 class CertificateData(object):
     """Certificate information interface.
-    
+
     This class provides only that information from the certificate, which
     is provided by the python API.
     """
 
     @property
     def display_name(self):
-        """Get human-readable subject name derived from the SubjectName 
+        """Get human-readable subject name derived from the SubjectName
         or SubjectAltName field.
         """
         if self.subject_name:
             return u", ".join( [ u", ".join(
-                        [ u"{0}={1}".format(k,v) for k, v in dn_tuple ] ) 
+                        [ u"{0}={1}".format(k,v) for k, v in dn_tuple ] )
                             for dn_tuple in self.subject_name ])
         for name_type in ("XmppAddr", "DNS", "SRV"):
             names = self.alt_names.get(name_type)
     def get_jids(self):
         """Return JIDs for which this certificate is valid (except the domain
         wildcards).
-        
+
         :Returtype: `list` of `JID`
         """
         result = []
             addrs =  self.alt_names.get("XmppAddr", []) + self.alt_names.get(
                                                                     "DNS", [])
         elif self.common_names:
-            addrs = [addr for addr in self.common_names 
+            addrs = [addr for addr in self.common_names
                                 if "@" not in addr and "/" not in addr]
         else:
             return []
 
 class BasicCertificateData(CertificateData):
     """Certificate information interface.
-    
+
     This class provides only that information from the certificate, which
     is provided by the python API.
     """
                 new.append(name)
             self.alt_names[key] = new
 
-dn_oids = {
+DN_OIDS = {
         (2, 5, 4, 41): u"Name",
         (2, 5, 4, 4): u"Surname",
         (2, 5, 4, 42): u"GivenName",
 }
 
 def _decode_asn1_string(data):
+    """Convert ASN.1 string to a Unicode string.
+    """
     if isinstance(data, BMPString):
         return bytes(data).decode("utf-16-be")
     else:
         return bytes(data).decode("utf-8")
 
 if HAVE_PYASN1:
+    from pyasn1_modules.rfc2459 import Certificate, DirectoryString, MAX, Name
+    from pyasn1_modules import pem
+    from pyasn1.codec.der import decoder as der_decoder
+    from pyasn1.type.char import BMPString, IA5String, UTF8String
+    from pyasn1.type.univ import Sequence, SequenceOf, Choice
+    from pyasn1.type.univ import Any, ObjectIdentifier, OctetString
+    from pyasn1.type.namedtype import NamedTypes, NamedType
+    from pyasn1.type.useful import GeneralizedTime
+    from pyasn1.type.constraint import ValueSizeConstraint
+    from pyasn1.type import tag
+
     XMPPADDR_OID = ObjectIdentifier('1.3.6.1.5.5.7.8.5')
     SRVNAME_OID = ObjectIdentifier('1.3.6.1.5.5.7.8.7')
     SUBJECT_ALT_NAME_OID = ObjectIdentifier('2.5.29.17')
+
     class OtherName(Sequence):
+        # pylint: disable=C0111,R0903
         componentType = NamedTypes(
                 NamedType('type-id', ObjectIdentifier()),
                 NamedType('value', Any().subtype(explicitTag = tag.Tag(
                 )
 
     class GeneralName(Choice):
+        # pylint: disable=C0111,R0903
         componentType = NamedTypes(
                 NamedType('otherName',
                     OtherName().subtype(implicitTag = tag.Tag(
                         tag.tagClassContext, tag.tagFormatSimple, 8))),
                 )
 
-    class GeneralNames(SequenceOf):                                              
-        componentType = GeneralName()                                                 
+    class GeneralNames(SequenceOf):
+        # pylint: disable=C0111,R0903
+        componentType = GeneralName()
         sizeSpec = SequenceOf.sizeSpec + ValueSizeConstraint(1, MAX)
 
 
 class ASN1CertificateData(CertificateData):
     """Certificate information interface.
-    
+
     This class actually decodes the certificate, providing all the
     names there.
     """
 
     @classmethod
     def from_der_data(cls, data):
+        """Decode DER-encoded certificate.
+
+        :Parameters:
+            - `data`: the encoded certificate
+        :Types:
+            - `data`: `bytes`
+
+        :Return: decoded certificate data
+        :Returntype: ASN1CertificateData
+        """
+        # pylint: disable=W0212
         logger.debug("Decoding DER certificate: {0!r}".format(data))
         if cls._cert_asn1_type is None:
             cls._cert_asn1_type = Certificate()
         tbs_cert = cert.getComponentByName('tbsCertificate')
         subject = tbs_cert.getComponentByName('subject')
         logger.debug("Subject: {0!r}".format(subject))
-        result.common_names = []
-        subject_name = []
-        for rdnss in subject:
-            for rdns in rdnss:
-                rdnss_list = []
-                for nameval in rdns:
-                    val_type = nameval.getComponentByName('type')
-                    value = nameval.getComponentByName('value')
-                    if val_type not in dn_oids:
-                        logger.debug("OID {0} not supported".format(val_type))
-                        continue
-                    val_type = dn_oids[val_type]
-                    value = der_decoder.decode(value, 
-                                            asn1Spec = DirectoryString())[0]
-                    value = value.getComponent()
-                    try:
-                        value = _decode_asn1_string(value)
-                    except UnicodeError:
-                        logger.debug("Cannot decode value: {0!r}".format(value))
-                        continue
-                    if val_type == u"CommonName":
-                        result.common_names.append(value)
-                    rdnss_list.append((val_type, value))
-                subject_name.append(tuple(rdnss_list))
-        result.subject_name = tuple(subject_name)
+        result._decode_subject(subject)
         validity = tbs_cert.getComponentByName('validity')
-        not_after = validity.getComponentByName('notAfter')
-        not_after = str(not_after.getComponent())
-        if isinstance(not_after, GeneralizedTime):
-            result.not_after = datetime.strptime(not_after, "%Y%m%d%H%M%SZ")
-        else:
-            result.not_after = datetime.strptime(not_after, "%y%m%d%H%M%SZ")
-        result.alt_names = defaultdict(list)
+        result._decode_validity(validity)
         extensions = tbs_cert.getComponentByName('extensions')
         if extensions:
             for extension in extensions:
                 value = extension.getComponentByName('extnValue')
                 logger.debug("Value: {0!r}".format(value))
                 if isinstance(value, Any):
-                    # should be OctetString, but is Any 
+                    # should be OctetString, but is Any
                     # in pyasn1_modules-0.0.1a
-                    value = der_decoder.decode(value, 
+                    value = der_decoder.decode(value,
                                                 asn1Spec = OctetString())[0]
-                alt_names = der_decoder.decode(value, 
+                alt_names = der_decoder.decode(value,
                                             asn1Spec = GeneralNames())[0]
                 logger.debug("SubjectAltName: {0!r}".format(alt_names))
-                for alt_name in alt_names:
-                    tname = alt_name.getName()
-                    comp = alt_name.getComponent()
-                    if tname == "dNSName":
-                        key = "DNS"
-                        value = _decode_asn1_string(comp)
-                    elif tname == "uniformResourceIdentifier":
-                        key = "URI"
-                        value = _decode_asn1_string(comp)
-                    elif tname == "otherName":
-                        oid = comp.getComponentByName("type-id")
-                        value = comp.getComponentByName("value")
-                        if oid == XMPPADDR_OID:
-                            key = "XmppAddr"
-                            value = der_decoder.decode(value,
-                                                    asn1Spec = UTF8String())[0]
-                            value = _decode_asn1_string(value)
-                        elif oid == SRVNAME_OID:
-                            key = "SRVName"
-                            value = der_decoder.decode(value,
-                                                    asn1Spec = IA5String())[0]
-                            value = _decode_asn1_string(value)
-                        else:
-                            logger.debug("Unknown other name: {0}".format(oid))
-                            continue
-                    else:
-                        logger.debug("Unsupported general name: {0}"
-                                                                .format(tname))
+                result._decode_alt_names(alt_names)
+        return result
+
+    def _decode_subject(self, subject):
+        """Load data from a ASN.1 subject.
+        """
+        self.common_names = []
+        subject_name = []
+        for rdnss in subject:
+            for rdns in rdnss:
+                rdnss_list = []
+                for nameval in rdns:
+                    val_type = nameval.getComponentByName('type')
+                    value = nameval.getComponentByName('value')
+                    if val_type not in DN_OIDS:
+                        logger.debug("OID {0} not supported".format(val_type))
                         continue
-                    result.alt_names[key].append(value)
-        return result
+                    val_type = DN_OIDS[val_type]
+                    value = der_decoder.decode(value,
+                                            asn1Spec = DirectoryString())[0]
+                    value = value.getComponent()
+                    try:
+                        value = _decode_asn1_string(value)
+                    except UnicodeError:
+                        logger.debug("Cannot decode value: {0!r}".format(value))
+                        continue
+                    if val_type == u"CommonName":
+                        self.common_names.append(value)
+                    rdnss_list.append((val_type, value))
+                subject_name.append(tuple(rdnss_list))
+        self.subject_name = tuple(subject_name)
+
+    def _decode_validity(self, validity):
+        """Load data from a ASN.1 validity value.
+        """
+        not_after = validity.getComponentByName('notAfter')
+        not_after = str(not_after.getComponent())
+        if isinstance(not_after, GeneralizedTime):
+            self.not_after = datetime.strptime(not_after, "%Y%m%d%H%M%SZ")
+        else:
+            self.not_after = datetime.strptime(not_after, "%y%m%d%H%M%SZ")
+        self.alt_names = defaultdict(list)
+
+    def _decode_alt_names(self, alt_names):
+        """Load SubjectAltName from a ASN.1 GeneralNames value.
+
+        :Values:
+            - `alt_names`: the SubjectAltNama extension value
+        :Types:
+            - `alt_name`: `GeneralNames`
+        """
+        for alt_name in alt_names:
+            tname = alt_name.getName()
+            comp = alt_name.getComponent()
+            if tname == "dNSName":
+                key = "DNS"
+                value = _decode_asn1_string(comp)
+            elif tname == "uniformResourceIdentifier":
+                key = "URI"
+                value = _decode_asn1_string(comp)
+            elif tname == "otherName":
+                oid = comp.getComponentByName("type-id")
+                value = comp.getComponentByName("value")
+                if oid == XMPPADDR_OID:
+                    key = "XmppAddr"
+                    value = der_decoder.decode(value,
+                                            asn1Spec = UTF8String())[0]
+                    value = _decode_asn1_string(value)
+                elif oid == SRVNAME_OID:
+                    key = "SRVName"
+                    value = der_decoder.decode(value,
+                                            asn1Spec = IA5String())[0]
+                    value = _decode_asn1_string(value)
+                else:
+                    logger.debug("Unknown other name: {0}".format(oid))
+                    continue
+            else:
+                logger.debug("Unsupported general name: {0}"
+                                                        .format(tname))
+                continue
+            self.alt_names[key].append(value)
 
     @classmethod
     def from_file(cls, filename):