1# This file is dual licensed under the terms of the Apache License, Version
2# 2.0, and the BSD License. See the LICENSE file in the root of this repository
3# for complete details.
4
5from __future__ import absolute_import, division, print_function
6
7import datetime
8import ipaddress
9
10import asn1crypto.core
11
12import six
13
14from cryptography import x509
15from cryptography.x509.extensions import _TLS_FEATURE_TYPE_TO_ENUM
16from cryptography.x509.name import _ASN1_TYPE_TO_ENUM
17from cryptography.x509.oid import (
18    CRLEntryExtensionOID, CertificatePoliciesOID, ExtensionOID,
19    OCSPExtensionOID,
20)
21
22
23class _Integers(asn1crypto.core.SequenceOf):
24    _child_spec = asn1crypto.core.Integer
25
26
27def _obj2txt(backend, obj):
28    # Set to 80 on the recommendation of
29    # https://www.openssl.org/docs/crypto/OBJ_nid2ln.html#return_values
30    #
31    # But OIDs longer than this occur in real life (e.g. Active
32    # Directory makes some very long OIDs).  So we need to detect
33    # and properly handle the case where the default buffer is not
34    # big enough.
35    #
36    buf_len = 80
37    buf = backend._ffi.new("char[]", buf_len)
38
39    # 'res' is the number of bytes that *would* be written if the
40    # buffer is large enough.  If 'res' > buf_len - 1, we need to
41    # alloc a big-enough buffer and go again.
42    res = backend._lib.OBJ_obj2txt(buf, buf_len, obj, 1)
43    if res > buf_len - 1:  # account for terminating null byte
44        buf_len = res + 1
45        buf = backend._ffi.new("char[]", buf_len)
46        res = backend._lib.OBJ_obj2txt(buf, buf_len, obj, 1)
47    backend.openssl_assert(res > 0)
48    return backend._ffi.buffer(buf, res)[:].decode()
49
50
51def _decode_x509_name_entry(backend, x509_name_entry):
52    obj = backend._lib.X509_NAME_ENTRY_get_object(x509_name_entry)
53    backend.openssl_assert(obj != backend._ffi.NULL)
54    data = backend._lib.X509_NAME_ENTRY_get_data(x509_name_entry)
55    backend.openssl_assert(data != backend._ffi.NULL)
56    value = _asn1_string_to_utf8(backend, data)
57    oid = _obj2txt(backend, obj)
58    type = _ASN1_TYPE_TO_ENUM[data.type]
59
60    return x509.NameAttribute(x509.ObjectIdentifier(oid), value, type)
61
62
63def _decode_x509_name(backend, x509_name):
64    count = backend._lib.X509_NAME_entry_count(x509_name)
65    attributes = []
66    prev_set_id = -1
67    for x in range(count):
68        entry = backend._lib.X509_NAME_get_entry(x509_name, x)
69        attribute = _decode_x509_name_entry(backend, entry)
70        set_id = backend._lib.Cryptography_X509_NAME_ENTRY_set(entry)
71        if set_id != prev_set_id:
72            attributes.append(set([attribute]))
73        else:
74            # is in the same RDN a previous entry
75            attributes[-1].add(attribute)
76        prev_set_id = set_id
77
78    return x509.Name(x509.RelativeDistinguishedName(rdn) for rdn in attributes)
79
80
81def _decode_general_names(backend, gns):
82    num = backend._lib.sk_GENERAL_NAME_num(gns)
83    names = []
84    for i in range(num):
85        gn = backend._lib.sk_GENERAL_NAME_value(gns, i)
86        backend.openssl_assert(gn != backend._ffi.NULL)
87        names.append(_decode_general_name(backend, gn))
88
89    return names
90
91
92def _decode_general_name(backend, gn):
93    if gn.type == backend._lib.GEN_DNS:
94        # Convert to bytes and then decode to utf8. We don't use
95        # asn1_string_to_utf8 here because it doesn't properly convert
96        # utf8 from ia5strings.
97        data = _asn1_string_to_bytes(backend, gn.d.dNSName).decode("utf8")
98        # We don't use the constructor for DNSName so we can bypass validation
99        # This allows us to create DNSName objects that have unicode chars
100        # when a certificate (against the RFC) contains them.
101        return x509.DNSName._init_without_validation(data)
102    elif gn.type == backend._lib.GEN_URI:
103        # Convert to bytes and then decode to utf8. We don't use
104        # asn1_string_to_utf8 here because it doesn't properly convert
105        # utf8 from ia5strings.
106        data = _asn1_string_to_bytes(
107            backend, gn.d.uniformResourceIdentifier
108        ).decode("utf8")
109        # We don't use the constructor for URI so we can bypass validation
110        # This allows us to create URI objects that have unicode chars
111        # when a certificate (against the RFC) contains them.
112        return x509.UniformResourceIdentifier._init_without_validation(data)
113    elif gn.type == backend._lib.GEN_RID:
114        oid = _obj2txt(backend, gn.d.registeredID)
115        return x509.RegisteredID(x509.ObjectIdentifier(oid))
116    elif gn.type == backend._lib.GEN_IPADD:
117        data = _asn1_string_to_bytes(backend, gn.d.iPAddress)
118        data_len = len(data)
119        if data_len == 8 or data_len == 32:
120            # This is an IPv4 or IPv6 Network and not a single IP. This
121            # type of data appears in Name Constraints. Unfortunately,
122            # ipaddress doesn't support packed bytes + netmask. Additionally,
123            # IPv6Network can only handle CIDR rather than the full 16 byte
124            # netmask. To handle this we convert the netmask to integer, then
125            # find the first 0 bit, which will be the prefix. If another 1
126            # bit is present after that the netmask is invalid.
127            base = ipaddress.ip_address(data[:data_len // 2])
128            netmask = ipaddress.ip_address(data[data_len // 2:])
129            bits = bin(int(netmask))[2:]
130            prefix = bits.find('0')
131            # If no 0 bits are found it is a /32 or /128
132            if prefix == -1:
133                prefix = len(bits)
134
135            if "1" in bits[prefix:]:
136                raise ValueError("Invalid netmask")
137
138            ip = ipaddress.ip_network(base.exploded + u"/{0}".format(prefix))
139        else:
140            ip = ipaddress.ip_address(data)
141
142        return x509.IPAddress(ip)
143    elif gn.type == backend._lib.GEN_DIRNAME:
144        return x509.DirectoryName(
145            _decode_x509_name(backend, gn.d.directoryName)
146        )
147    elif gn.type == backend._lib.GEN_EMAIL:
148        # Convert to bytes and then decode to utf8. We don't use
149        # asn1_string_to_utf8 here because it doesn't properly convert
150        # utf8 from ia5strings.
151        data = _asn1_string_to_bytes(backend, gn.d.rfc822Name).decode("utf8")
152        # We don't use the constructor for RFC822Name so we can bypass
153        # validation. This allows us to create RFC822Name objects that have
154        # unicode chars when a certificate (against the RFC) contains them.
155        return x509.RFC822Name._init_without_validation(data)
156    elif gn.type == backend._lib.GEN_OTHERNAME:
157        type_id = _obj2txt(backend, gn.d.otherName.type_id)
158        value = _asn1_to_der(backend, gn.d.otherName.value)
159        return x509.OtherName(x509.ObjectIdentifier(type_id), value)
160    else:
161        # x400Address or ediPartyName
162        raise x509.UnsupportedGeneralNameType(
163            "{0} is not a supported type".format(
164                x509._GENERAL_NAMES.get(gn.type, gn.type)
165            ),
166            gn.type
167        )
168
169
170def _decode_ocsp_no_check(backend, ext):
171    return x509.OCSPNoCheck()
172
173
174def _decode_crl_number(backend, ext):
175    asn1_int = backend._ffi.cast("ASN1_INTEGER *", ext)
176    asn1_int = backend._ffi.gc(asn1_int, backend._lib.ASN1_INTEGER_free)
177    return x509.CRLNumber(_asn1_integer_to_int(backend, asn1_int))
178
179
180def _decode_delta_crl_indicator(backend, ext):
181    asn1_int = backend._ffi.cast("ASN1_INTEGER *", ext)
182    asn1_int = backend._ffi.gc(asn1_int, backend._lib.ASN1_INTEGER_free)
183    return x509.DeltaCRLIndicator(_asn1_integer_to_int(backend, asn1_int))
184
185
186class _X509ExtensionParser(object):
187    def __init__(self, ext_count, get_ext, handlers):
188        self.ext_count = ext_count
189        self.get_ext = get_ext
190        self.handlers = handlers
191
192    def parse(self, backend, x509_obj):
193        extensions = []
194        seen_oids = set()
195        for i in range(self.ext_count(backend, x509_obj)):
196            ext = self.get_ext(backend, x509_obj, i)
197            backend.openssl_assert(ext != backend._ffi.NULL)
198            crit = backend._lib.X509_EXTENSION_get_critical(ext)
199            critical = crit == 1
200            oid = x509.ObjectIdentifier(
201                _obj2txt(backend, backend._lib.X509_EXTENSION_get_object(ext))
202            )
203            if oid in seen_oids:
204                raise x509.DuplicateExtension(
205                    "Duplicate {0} extension found".format(oid), oid
206                )
207
208            # These OIDs are only supported in OpenSSL 1.1.0+ but we want
209            # to support them in all versions of OpenSSL so we decode them
210            # ourselves.
211            if oid == ExtensionOID.TLS_FEATURE:
212                data = backend._lib.X509_EXTENSION_get_data(ext)
213                parsed = _Integers.load(_asn1_string_to_bytes(backend, data))
214                value = x509.TLSFeature(
215                    [_TLS_FEATURE_TYPE_TO_ENUM[x.native] for x in parsed]
216                )
217                extensions.append(x509.Extension(oid, critical, value))
218                seen_oids.add(oid)
219                continue
220            elif oid == ExtensionOID.PRECERT_POISON:
221                data = backend._lib.X509_EXTENSION_get_data(ext)
222                parsed = asn1crypto.core.Null.load(
223                    _asn1_string_to_bytes(backend, data)
224                )
225                assert parsed == asn1crypto.core.Null()
226                extensions.append(x509.Extension(
227                    oid, critical, x509.PrecertPoison()
228                ))
229                seen_oids.add(oid)
230                continue
231
232            try:
233                handler = self.handlers[oid]
234            except KeyError:
235                # Dump the DER payload into an UnrecognizedExtension object
236                data = backend._lib.X509_EXTENSION_get_data(ext)
237                backend.openssl_assert(data != backend._ffi.NULL)
238                der = backend._ffi.buffer(data.data, data.length)[:]
239                unrecognized = x509.UnrecognizedExtension(oid, der)
240                extensions.append(
241                    x509.Extension(oid, critical, unrecognized)
242                )
243            else:
244                ext_data = backend._lib.X509V3_EXT_d2i(ext)
245                if ext_data == backend._ffi.NULL:
246                    backend._consume_errors()
247                    raise ValueError(
248                        "The {0} extension is invalid and can't be "
249                        "parsed".format(oid)
250                    )
251
252                value = handler(backend, ext_data)
253                extensions.append(x509.Extension(oid, critical, value))
254
255            seen_oids.add(oid)
256
257        return x509.Extensions(extensions)
258
259
260def _decode_certificate_policies(backend, cp):
261    cp = backend._ffi.cast("Cryptography_STACK_OF_POLICYINFO *", cp)
262    cp = backend._ffi.gc(cp, backend._lib.CERTIFICATEPOLICIES_free)
263
264    num = backend._lib.sk_POLICYINFO_num(cp)
265    certificate_policies = []
266    for i in range(num):
267        qualifiers = None
268        pi = backend._lib.sk_POLICYINFO_value(cp, i)
269        oid = x509.ObjectIdentifier(_obj2txt(backend, pi.policyid))
270        if pi.qualifiers != backend._ffi.NULL:
271            qnum = backend._lib.sk_POLICYQUALINFO_num(pi.qualifiers)
272            qualifiers = []
273            for j in range(qnum):
274                pqi = backend._lib.sk_POLICYQUALINFO_value(
275                    pi.qualifiers, j
276                )
277                pqualid = x509.ObjectIdentifier(
278                    _obj2txt(backend, pqi.pqualid)
279                )
280                if pqualid == CertificatePoliciesOID.CPS_QUALIFIER:
281                    cpsuri = backend._ffi.buffer(
282                        pqi.d.cpsuri.data, pqi.d.cpsuri.length
283                    )[:].decode('ascii')
284                    qualifiers.append(cpsuri)
285                else:
286                    assert pqualid == CertificatePoliciesOID.CPS_USER_NOTICE
287                    user_notice = _decode_user_notice(
288                        backend, pqi.d.usernotice
289                    )
290                    qualifiers.append(user_notice)
291
292        certificate_policies.append(
293            x509.PolicyInformation(oid, qualifiers)
294        )
295
296    return x509.CertificatePolicies(certificate_policies)
297
298
299def _decode_user_notice(backend, un):
300    explicit_text = None
301    notice_reference = None
302
303    if un.exptext != backend._ffi.NULL:
304        explicit_text = _asn1_string_to_utf8(backend, un.exptext)
305
306    if un.noticeref != backend._ffi.NULL:
307        organization = _asn1_string_to_utf8(
308            backend, un.noticeref.organization
309        )
310
311        num = backend._lib.sk_ASN1_INTEGER_num(
312            un.noticeref.noticenos
313        )
314        notice_numbers = []
315        for i in range(num):
316            asn1_int = backend._lib.sk_ASN1_INTEGER_value(
317                un.noticeref.noticenos, i
318            )
319            notice_num = _asn1_integer_to_int(backend, asn1_int)
320            notice_numbers.append(notice_num)
321
322        notice_reference = x509.NoticeReference(
323            organization, notice_numbers
324        )
325
326    return x509.UserNotice(notice_reference, explicit_text)
327
328
329def _decode_basic_constraints(backend, bc_st):
330    basic_constraints = backend._ffi.cast("BASIC_CONSTRAINTS *", bc_st)
331    basic_constraints = backend._ffi.gc(
332        basic_constraints, backend._lib.BASIC_CONSTRAINTS_free
333    )
334    # The byte representation of an ASN.1 boolean true is \xff. OpenSSL
335    # chooses to just map this to its ordinal value, so true is 255 and
336    # false is 0.
337    ca = basic_constraints.ca == 255
338    path_length = _asn1_integer_to_int_or_none(
339        backend, basic_constraints.pathlen
340    )
341
342    return x509.BasicConstraints(ca, path_length)
343
344
345def _decode_subject_key_identifier(backend, asn1_string):
346    asn1_string = backend._ffi.cast("ASN1_OCTET_STRING *", asn1_string)
347    asn1_string = backend._ffi.gc(
348        asn1_string, backend._lib.ASN1_OCTET_STRING_free
349    )
350    return x509.SubjectKeyIdentifier(
351        backend._ffi.buffer(asn1_string.data, asn1_string.length)[:]
352    )
353
354
355def _decode_authority_key_identifier(backend, akid):
356    akid = backend._ffi.cast("AUTHORITY_KEYID *", akid)
357    akid = backend._ffi.gc(akid, backend._lib.AUTHORITY_KEYID_free)
358    key_identifier = None
359    authority_cert_issuer = None
360
361    if akid.keyid != backend._ffi.NULL:
362        key_identifier = backend._ffi.buffer(
363            akid.keyid.data, akid.keyid.length
364        )[:]
365
366    if akid.issuer != backend._ffi.NULL:
367        authority_cert_issuer = _decode_general_names(
368            backend, akid.issuer
369        )
370
371    authority_cert_serial_number = _asn1_integer_to_int_or_none(
372        backend, akid.serial
373    )
374
375    return x509.AuthorityKeyIdentifier(
376        key_identifier, authority_cert_issuer, authority_cert_serial_number
377    )
378
379
380def _decode_authority_information_access(backend, aia):
381    aia = backend._ffi.cast("Cryptography_STACK_OF_ACCESS_DESCRIPTION *", aia)
382    aia = backend._ffi.gc(aia, backend._lib.sk_ACCESS_DESCRIPTION_free)
383    num = backend._lib.sk_ACCESS_DESCRIPTION_num(aia)
384    access_descriptions = []
385    for i in range(num):
386        ad = backend._lib.sk_ACCESS_DESCRIPTION_value(aia, i)
387        backend.openssl_assert(ad.method != backend._ffi.NULL)
388        oid = x509.ObjectIdentifier(_obj2txt(backend, ad.method))
389        backend.openssl_assert(ad.location != backend._ffi.NULL)
390        gn = _decode_general_name(backend, ad.location)
391        access_descriptions.append(x509.AccessDescription(oid, gn))
392
393    return x509.AuthorityInformationAccess(access_descriptions)
394
395
396def _decode_key_usage(backend, bit_string):
397    bit_string = backend._ffi.cast("ASN1_BIT_STRING *", bit_string)
398    bit_string = backend._ffi.gc(bit_string, backend._lib.ASN1_BIT_STRING_free)
399    get_bit = backend._lib.ASN1_BIT_STRING_get_bit
400    digital_signature = get_bit(bit_string, 0) == 1
401    content_commitment = get_bit(bit_string, 1) == 1
402    key_encipherment = get_bit(bit_string, 2) == 1
403    data_encipherment = get_bit(bit_string, 3) == 1
404    key_agreement = get_bit(bit_string, 4) == 1
405    key_cert_sign = get_bit(bit_string, 5) == 1
406    crl_sign = get_bit(bit_string, 6) == 1
407    encipher_only = get_bit(bit_string, 7) == 1
408    decipher_only = get_bit(bit_string, 8) == 1
409    return x509.KeyUsage(
410        digital_signature,
411        content_commitment,
412        key_encipherment,
413        data_encipherment,
414        key_agreement,
415        key_cert_sign,
416        crl_sign,
417        encipher_only,
418        decipher_only
419    )
420
421
422def _decode_general_names_extension(backend, gns):
423    gns = backend._ffi.cast("GENERAL_NAMES *", gns)
424    gns = backend._ffi.gc(gns, backend._lib.GENERAL_NAMES_free)
425    general_names = _decode_general_names(backend, gns)
426    return general_names
427
428
429def _decode_subject_alt_name(backend, ext):
430    return x509.SubjectAlternativeName(
431        _decode_general_names_extension(backend, ext)
432    )
433
434
435def _decode_issuer_alt_name(backend, ext):
436    return x509.IssuerAlternativeName(
437        _decode_general_names_extension(backend, ext)
438    )
439
440
441def _decode_name_constraints(backend, nc):
442    nc = backend._ffi.cast("NAME_CONSTRAINTS *", nc)
443    nc = backend._ffi.gc(nc, backend._lib.NAME_CONSTRAINTS_free)
444    permitted = _decode_general_subtrees(backend, nc.permittedSubtrees)
445    excluded = _decode_general_subtrees(backend, nc.excludedSubtrees)
446    return x509.NameConstraints(
447        permitted_subtrees=permitted, excluded_subtrees=excluded
448    )
449
450
451def _decode_general_subtrees(backend, stack_subtrees):
452    if stack_subtrees == backend._ffi.NULL:
453        return None
454
455    num = backend._lib.sk_GENERAL_SUBTREE_num(stack_subtrees)
456    subtrees = []
457
458    for i in range(num):
459        obj = backend._lib.sk_GENERAL_SUBTREE_value(stack_subtrees, i)
460        backend.openssl_assert(obj != backend._ffi.NULL)
461        name = _decode_general_name(backend, obj.base)
462        subtrees.append(name)
463
464    return subtrees
465
466
467def _decode_issuing_dist_point(backend, idp):
468    idp = backend._ffi.cast("ISSUING_DIST_POINT *", idp)
469    idp = backend._ffi.gc(idp, backend._lib.ISSUING_DIST_POINT_free)
470    if idp.distpoint != backend._ffi.NULL:
471        full_name, relative_name = _decode_distpoint(backend, idp.distpoint)
472    else:
473        full_name = None
474        relative_name = None
475
476    only_user = idp.onlyuser == 255
477    only_ca = idp.onlyCA == 255
478    indirect_crl = idp.indirectCRL == 255
479    only_attr = idp.onlyattr == 255
480    if idp.onlysomereasons != backend._ffi.NULL:
481        only_some_reasons = _decode_reasons(backend, idp.onlysomereasons)
482    else:
483        only_some_reasons = None
484
485    return x509.IssuingDistributionPoint(
486        full_name, relative_name, only_user, only_ca, only_some_reasons,
487        indirect_crl, only_attr
488    )
489
490
491def _decode_policy_constraints(backend, pc):
492    pc = backend._ffi.cast("POLICY_CONSTRAINTS *", pc)
493    pc = backend._ffi.gc(pc, backend._lib.POLICY_CONSTRAINTS_free)
494
495    require_explicit_policy = _asn1_integer_to_int_or_none(
496        backend, pc.requireExplicitPolicy
497    )
498    inhibit_policy_mapping = _asn1_integer_to_int_or_none(
499        backend, pc.inhibitPolicyMapping
500    )
501
502    return x509.PolicyConstraints(
503        require_explicit_policy, inhibit_policy_mapping
504    )
505
506
507def _decode_extended_key_usage(backend, sk):
508    sk = backend._ffi.cast("Cryptography_STACK_OF_ASN1_OBJECT *", sk)
509    sk = backend._ffi.gc(sk, backend._lib.sk_ASN1_OBJECT_free)
510    num = backend._lib.sk_ASN1_OBJECT_num(sk)
511    ekus = []
512
513    for i in range(num):
514        obj = backend._lib.sk_ASN1_OBJECT_value(sk, i)
515        backend.openssl_assert(obj != backend._ffi.NULL)
516        oid = x509.ObjectIdentifier(_obj2txt(backend, obj))
517        ekus.append(oid)
518
519    return x509.ExtendedKeyUsage(ekus)
520
521
522_DISTPOINT_TYPE_FULLNAME = 0
523_DISTPOINT_TYPE_RELATIVENAME = 1
524
525
526def _decode_dist_points(backend, cdps):
527    cdps = backend._ffi.cast("Cryptography_STACK_OF_DIST_POINT *", cdps)
528    cdps = backend._ffi.gc(cdps, backend._lib.CRL_DIST_POINTS_free)
529
530    num = backend._lib.sk_DIST_POINT_num(cdps)
531    dist_points = []
532    for i in range(num):
533        full_name = None
534        relative_name = None
535        crl_issuer = None
536        reasons = None
537        cdp = backend._lib.sk_DIST_POINT_value(cdps, i)
538        if cdp.reasons != backend._ffi.NULL:
539            reasons = _decode_reasons(backend, cdp.reasons)
540
541        if cdp.CRLissuer != backend._ffi.NULL:
542            crl_issuer = _decode_general_names(backend, cdp.CRLissuer)
543
544        # Certificates may have a crl_issuer/reasons and no distribution
545        # point so make sure it's not null.
546        if cdp.distpoint != backend._ffi.NULL:
547            full_name, relative_name = _decode_distpoint(
548                backend, cdp.distpoint
549            )
550
551        dist_points.append(
552            x509.DistributionPoint(
553                full_name, relative_name, reasons, crl_issuer
554            )
555        )
556
557    return dist_points
558
559
560# ReasonFlags ::= BIT STRING {
561#      unused                  (0),
562#      keyCompromise           (1),
563#      cACompromise            (2),
564#      affiliationChanged      (3),
565#      superseded              (4),
566#      cessationOfOperation    (5),
567#      certificateHold         (6),
568#      privilegeWithdrawn      (7),
569#      aACompromise            (8) }
570_REASON_BIT_MAPPING = {
571    1: x509.ReasonFlags.key_compromise,
572    2: x509.ReasonFlags.ca_compromise,
573    3: x509.ReasonFlags.affiliation_changed,
574    4: x509.ReasonFlags.superseded,
575    5: x509.ReasonFlags.cessation_of_operation,
576    6: x509.ReasonFlags.certificate_hold,
577    7: x509.ReasonFlags.privilege_withdrawn,
578    8: x509.ReasonFlags.aa_compromise,
579}
580
581
582def _decode_reasons(backend, reasons):
583    # We will check each bit from RFC 5280
584    enum_reasons = []
585    for bit_position, reason in six.iteritems(_REASON_BIT_MAPPING):
586        if backend._lib.ASN1_BIT_STRING_get_bit(reasons, bit_position):
587            enum_reasons.append(reason)
588
589    return frozenset(enum_reasons)
590
591
592def _decode_distpoint(backend, distpoint):
593    if distpoint.type == _DISTPOINT_TYPE_FULLNAME:
594        full_name = _decode_general_names(backend, distpoint.name.fullname)
595        return full_name, None
596
597    # OpenSSL code doesn't test for a specific type for
598    # relativename, everything that isn't fullname is considered
599    # relativename.  Per RFC 5280:
600    #
601    # DistributionPointName ::= CHOICE {
602    #      fullName                [0]      GeneralNames,
603    #      nameRelativeToCRLIssuer [1]      RelativeDistinguishedName }
604    rns = distpoint.name.relativename
605    rnum = backend._lib.sk_X509_NAME_ENTRY_num(rns)
606    attributes = set()
607    for i in range(rnum):
608        rn = backend._lib.sk_X509_NAME_ENTRY_value(
609            rns, i
610        )
611        backend.openssl_assert(rn != backend._ffi.NULL)
612        attributes.add(
613            _decode_x509_name_entry(backend, rn)
614        )
615
616    relative_name = x509.RelativeDistinguishedName(attributes)
617
618    return None, relative_name
619
620
621def _decode_crl_distribution_points(backend, cdps):
622    dist_points = _decode_dist_points(backend, cdps)
623    return x509.CRLDistributionPoints(dist_points)
624
625
626def _decode_freshest_crl(backend, cdps):
627    dist_points = _decode_dist_points(backend, cdps)
628    return x509.FreshestCRL(dist_points)
629
630
631def _decode_inhibit_any_policy(backend, asn1_int):
632    asn1_int = backend._ffi.cast("ASN1_INTEGER *", asn1_int)
633    asn1_int = backend._ffi.gc(asn1_int, backend._lib.ASN1_INTEGER_free)
634    skip_certs = _asn1_integer_to_int(backend, asn1_int)
635    return x509.InhibitAnyPolicy(skip_certs)
636
637
638def _decode_precert_signed_certificate_timestamps(backend, asn1_scts):
639    from cryptography.hazmat.backends.openssl.x509 import (
640        _SignedCertificateTimestamp
641    )
642    asn1_scts = backend._ffi.cast("Cryptography_STACK_OF_SCT *", asn1_scts)
643    asn1_scts = backend._ffi.gc(asn1_scts, backend._lib.SCT_LIST_free)
644
645    scts = []
646    for i in range(backend._lib.sk_SCT_num(asn1_scts)):
647        sct = backend._lib.sk_SCT_value(asn1_scts, i)
648
649        scts.append(_SignedCertificateTimestamp(backend, asn1_scts, sct))
650    return x509.PrecertificateSignedCertificateTimestamps(scts)
651
652
653#    CRLReason ::= ENUMERATED {
654#        unspecified             (0),
655#        keyCompromise           (1),
656#        cACompromise            (2),
657#        affiliationChanged      (3),
658#        superseded              (4),
659#        cessationOfOperation    (5),
660#        certificateHold         (6),
661#             -- value 7 is not used
662#        removeFromCRL           (8),
663#        privilegeWithdrawn      (9),
664#        aACompromise           (10) }
665_CRL_ENTRY_REASON_CODE_TO_ENUM = {
666    0: x509.ReasonFlags.unspecified,
667    1: x509.ReasonFlags.key_compromise,
668    2: x509.ReasonFlags.ca_compromise,
669    3: x509.ReasonFlags.affiliation_changed,
670    4: x509.ReasonFlags.superseded,
671    5: x509.ReasonFlags.cessation_of_operation,
672    6: x509.ReasonFlags.certificate_hold,
673    8: x509.ReasonFlags.remove_from_crl,
674    9: x509.ReasonFlags.privilege_withdrawn,
675    10: x509.ReasonFlags.aa_compromise,
676}
677
678
679_CRL_ENTRY_REASON_ENUM_TO_CODE = {
680    x509.ReasonFlags.unspecified: 0,
681    x509.ReasonFlags.key_compromise: 1,
682    x509.ReasonFlags.ca_compromise: 2,
683    x509.ReasonFlags.affiliation_changed: 3,
684    x509.ReasonFlags.superseded: 4,
685    x509.ReasonFlags.cessation_of_operation: 5,
686    x509.ReasonFlags.certificate_hold: 6,
687    x509.ReasonFlags.remove_from_crl: 8,
688    x509.ReasonFlags.privilege_withdrawn: 9,
689    x509.ReasonFlags.aa_compromise: 10
690}
691
692
693def _decode_crl_reason(backend, enum):
694    enum = backend._ffi.cast("ASN1_ENUMERATED *", enum)
695    enum = backend._ffi.gc(enum, backend._lib.ASN1_ENUMERATED_free)
696    code = backend._lib.ASN1_ENUMERATED_get(enum)
697
698    try:
699        return x509.CRLReason(_CRL_ENTRY_REASON_CODE_TO_ENUM[code])
700    except KeyError:
701        raise ValueError("Unsupported reason code: {0}".format(code))
702
703
704def _decode_invalidity_date(backend, inv_date):
705    generalized_time = backend._ffi.cast(
706        "ASN1_GENERALIZEDTIME *", inv_date
707    )
708    generalized_time = backend._ffi.gc(
709        generalized_time, backend._lib.ASN1_GENERALIZEDTIME_free
710    )
711    return x509.InvalidityDate(
712        _parse_asn1_generalized_time(backend, generalized_time)
713    )
714
715
716def _decode_cert_issuer(backend, gns):
717    gns = backend._ffi.cast("GENERAL_NAMES *", gns)
718    gns = backend._ffi.gc(gns, backend._lib.GENERAL_NAMES_free)
719    general_names = _decode_general_names(backend, gns)
720    return x509.CertificateIssuer(general_names)
721
722
723def _asn1_to_der(backend, asn1_type):
724    buf = backend._ffi.new("unsigned char **")
725    res = backend._lib.i2d_ASN1_TYPE(asn1_type, buf)
726    backend.openssl_assert(res >= 0)
727    backend.openssl_assert(buf[0] != backend._ffi.NULL)
728    buf = backend._ffi.gc(
729        buf, lambda buffer: backend._lib.OPENSSL_free(buffer[0])
730    )
731    return backend._ffi.buffer(buf[0], res)[:]
732
733
734def _asn1_integer_to_int(backend, asn1_int):
735    bn = backend._lib.ASN1_INTEGER_to_BN(asn1_int, backend._ffi.NULL)
736    backend.openssl_assert(bn != backend._ffi.NULL)
737    bn = backend._ffi.gc(bn, backend._lib.BN_free)
738    return backend._bn_to_int(bn)
739
740
741def _asn1_integer_to_int_or_none(backend, asn1_int):
742    if asn1_int == backend._ffi.NULL:
743        return None
744    else:
745        return _asn1_integer_to_int(backend, asn1_int)
746
747
748def _asn1_string_to_bytes(backend, asn1_string):
749    return backend._ffi.buffer(asn1_string.data, asn1_string.length)[:]
750
751
752def _asn1_string_to_ascii(backend, asn1_string):
753    return _asn1_string_to_bytes(backend, asn1_string).decode("ascii")
754
755
756def _asn1_string_to_utf8(backend, asn1_string):
757    buf = backend._ffi.new("unsigned char **")
758    res = backend._lib.ASN1_STRING_to_UTF8(buf, asn1_string)
759    if res == -1:
760        raise ValueError(
761            "Unsupported ASN1 string type. Type: {0}".format(asn1_string.type)
762        )
763
764    backend.openssl_assert(buf[0] != backend._ffi.NULL)
765    buf = backend._ffi.gc(
766        buf, lambda buffer: backend._lib.OPENSSL_free(buffer[0])
767    )
768    return backend._ffi.buffer(buf[0], res)[:].decode('utf8')
769
770
771def _parse_asn1_time(backend, asn1_time):
772    backend.openssl_assert(asn1_time != backend._ffi.NULL)
773    generalized_time = backend._lib.ASN1_TIME_to_generalizedtime(
774        asn1_time, backend._ffi.NULL
775    )
776    if generalized_time == backend._ffi.NULL:
777        raise ValueError(
778            "Couldn't parse ASN.1 time as generalizedtime {!r}".format(
779                _asn1_string_to_bytes(backend, asn1_time)
780            )
781        )
782
783    generalized_time = backend._ffi.gc(
784        generalized_time, backend._lib.ASN1_GENERALIZEDTIME_free
785    )
786    return _parse_asn1_generalized_time(backend, generalized_time)
787
788
789def _parse_asn1_generalized_time(backend, generalized_time):
790    time = _asn1_string_to_ascii(
791        backend, backend._ffi.cast("ASN1_STRING *", generalized_time)
792    )
793    return datetime.datetime.strptime(time, "%Y%m%d%H%M%SZ")
794
795
796def _decode_nonce(backend, nonce):
797    nonce = backend._ffi.cast("ASN1_OCTET_STRING *", nonce)
798    nonce = backend._ffi.gc(nonce, backend._lib.ASN1_OCTET_STRING_free)
799    return x509.OCSPNonce(_asn1_string_to_bytes(backend, nonce))
800
801
802_EXTENSION_HANDLERS_NO_SCT = {
803    ExtensionOID.BASIC_CONSTRAINTS: _decode_basic_constraints,
804    ExtensionOID.SUBJECT_KEY_IDENTIFIER: _decode_subject_key_identifier,
805    ExtensionOID.KEY_USAGE: _decode_key_usage,
806    ExtensionOID.SUBJECT_ALTERNATIVE_NAME: _decode_subject_alt_name,
807    ExtensionOID.EXTENDED_KEY_USAGE: _decode_extended_key_usage,
808    ExtensionOID.AUTHORITY_KEY_IDENTIFIER: _decode_authority_key_identifier,
809    ExtensionOID.AUTHORITY_INFORMATION_ACCESS: (
810        _decode_authority_information_access
811    ),
812    ExtensionOID.CERTIFICATE_POLICIES: _decode_certificate_policies,
813    ExtensionOID.CRL_DISTRIBUTION_POINTS: _decode_crl_distribution_points,
814    ExtensionOID.FRESHEST_CRL: _decode_freshest_crl,
815    ExtensionOID.OCSP_NO_CHECK: _decode_ocsp_no_check,
816    ExtensionOID.INHIBIT_ANY_POLICY: _decode_inhibit_any_policy,
817    ExtensionOID.ISSUER_ALTERNATIVE_NAME: _decode_issuer_alt_name,
818    ExtensionOID.NAME_CONSTRAINTS: _decode_name_constraints,
819    ExtensionOID.POLICY_CONSTRAINTS: _decode_policy_constraints,
820}
821_EXTENSION_HANDLERS = _EXTENSION_HANDLERS_NO_SCT.copy()
822_EXTENSION_HANDLERS[
823    ExtensionOID.PRECERT_SIGNED_CERTIFICATE_TIMESTAMPS
824] = _decode_precert_signed_certificate_timestamps
825
826
827_REVOKED_EXTENSION_HANDLERS = {
828    CRLEntryExtensionOID.CRL_REASON: _decode_crl_reason,
829    CRLEntryExtensionOID.INVALIDITY_DATE: _decode_invalidity_date,
830    CRLEntryExtensionOID.CERTIFICATE_ISSUER: _decode_cert_issuer,
831}
832
833_CRL_EXTENSION_HANDLERS = {
834    ExtensionOID.CRL_NUMBER: _decode_crl_number,
835    ExtensionOID.DELTA_CRL_INDICATOR: _decode_delta_crl_indicator,
836    ExtensionOID.AUTHORITY_KEY_IDENTIFIER: _decode_authority_key_identifier,
837    ExtensionOID.ISSUER_ALTERNATIVE_NAME: _decode_issuer_alt_name,
838    ExtensionOID.AUTHORITY_INFORMATION_ACCESS: (
839        _decode_authority_information_access
840    ),
841    ExtensionOID.ISSUING_DISTRIBUTION_POINT: _decode_issuing_dist_point,
842}
843
844_OCSP_REQ_EXTENSION_HANDLERS = {
845    OCSPExtensionOID.NONCE: _decode_nonce,
846}
847
848_OCSP_BASICRESP_EXTENSION_HANDLERS = {
849    OCSPExtensionOID.NONCE: _decode_nonce,
850}
851
852_CERTIFICATE_EXTENSION_PARSER_NO_SCT = _X509ExtensionParser(
853    ext_count=lambda backend, x: backend._lib.X509_get_ext_count(x),
854    get_ext=lambda backend, x, i: backend._lib.X509_get_ext(x, i),
855    handlers=_EXTENSION_HANDLERS_NO_SCT
856)
857
858_CERTIFICATE_EXTENSION_PARSER = _X509ExtensionParser(
859    ext_count=lambda backend, x: backend._lib.X509_get_ext_count(x),
860    get_ext=lambda backend, x, i: backend._lib.X509_get_ext(x, i),
861    handlers=_EXTENSION_HANDLERS
862)
863
864_CSR_EXTENSION_PARSER = _X509ExtensionParser(
865    ext_count=lambda backend, x: backend._lib.sk_X509_EXTENSION_num(x),
866    get_ext=lambda backend, x, i: backend._lib.sk_X509_EXTENSION_value(x, i),
867    handlers=_EXTENSION_HANDLERS
868)
869
870_REVOKED_CERTIFICATE_EXTENSION_PARSER = _X509ExtensionParser(
871    ext_count=lambda backend, x: backend._lib.X509_REVOKED_get_ext_count(x),
872    get_ext=lambda backend, x, i: backend._lib.X509_REVOKED_get_ext(x, i),
873    handlers=_REVOKED_EXTENSION_HANDLERS,
874)
875
876_CRL_EXTENSION_PARSER = _X509ExtensionParser(
877    ext_count=lambda backend, x: backend._lib.X509_CRL_get_ext_count(x),
878    get_ext=lambda backend, x, i: backend._lib.X509_CRL_get_ext(x, i),
879    handlers=_CRL_EXTENSION_HANDLERS,
880)
881
882_OCSP_REQ_EXT_PARSER = _X509ExtensionParser(
883    ext_count=lambda backend, x: backend._lib.OCSP_REQUEST_get_ext_count(x),
884    get_ext=lambda backend, x, i: backend._lib.OCSP_REQUEST_get_ext(x, i),
885    handlers=_OCSP_REQ_EXTENSION_HANDLERS,
886)
887
888_OCSP_BASICRESP_EXT_PARSER = _X509ExtensionParser(
889    ext_count=lambda backend, x: backend._lib.OCSP_BASICRESP_get_ext_count(x),
890    get_ext=lambda backend, x, i: backend._lib.OCSP_BASICRESP_get_ext(x, i),
891    handlers=_OCSP_BASICRESP_EXTENSION_HANDLERS,
892)
893