1# This file is dual licensed under the terms of the Apache License, Version
2# 2.0, and the BSD License. See the LICENSE file in the root of this repository
3# for complete details.
4
5from __future__ import absolute_import, division, print_function
6
7import datetime
8import operator
9import warnings
10
11from cryptography import utils, x509
12from cryptography.exceptions import UnsupportedAlgorithm
13from cryptography.hazmat.backends.openssl.decode_asn1 import (
14    _CERTIFICATE_EXTENSION_PARSER, _CERTIFICATE_EXTENSION_PARSER_NO_SCT,
15    _CRL_EXTENSION_PARSER, _CSR_EXTENSION_PARSER,
16    _REVOKED_CERTIFICATE_EXTENSION_PARSER, _asn1_integer_to_int,
17    _asn1_string_to_bytes, _decode_x509_name, _obj2txt, _parse_asn1_time
18)
19from cryptography.hazmat.backends.openssl.encode_asn1 import (
20    _encode_asn1_int_gc
21)
22from cryptography.hazmat.primitives import hashes, serialization
23from cryptography.hazmat.primitives.asymmetric import dsa, ec, rsa
24
25
26@utils.register_interface(x509.Certificate)
27class _Certificate(object):
28    def __init__(self, backend, x509):
29        self._backend = backend
30        self._x509 = x509
31
32    def __repr__(self):
33        return "<Certificate(subject={0}, ...)>".format(self.subject)
34
35    def __eq__(self, other):
36        if not isinstance(other, x509.Certificate):
37            return NotImplemented
38
39        res = self._backend._lib.X509_cmp(self._x509, other._x509)
40        return res == 0
41
42    def __ne__(self, other):
43        return not self == other
44
45    def __hash__(self):
46        return hash(self.public_bytes(serialization.Encoding.DER))
47
48    def fingerprint(self, algorithm):
49        h = hashes.Hash(algorithm, self._backend)
50        h.update(self.public_bytes(serialization.Encoding.DER))
51        return h.finalize()
52
53    @property
54    def version(self):
55        version = self._backend._lib.X509_get_version(self._x509)
56        if version == 0:
57            return x509.Version.v1
58        elif version == 2:
59            return x509.Version.v3
60        else:
61            raise x509.InvalidVersion(
62                "{0} is not a valid X509 version".format(version), version
63            )
64
65    @property
66    def serial(self):
67        warnings.warn(
68            "Certificate serial is deprecated, use serial_number instead.",
69            utils.PersistentlyDeprecated,
70            stacklevel=2
71        )
72        return self.serial_number
73
74    @property
75    def serial_number(self):
76        asn1_int = self._backend._lib.X509_get_serialNumber(self._x509)
77        self._backend.openssl_assert(asn1_int != self._backend._ffi.NULL)
78        return _asn1_integer_to_int(self._backend, asn1_int)
79
80    def public_key(self):
81        pkey = self._backend._lib.X509_get_pubkey(self._x509)
82        if pkey == self._backend._ffi.NULL:
83            # Remove errors from the stack.
84            self._backend._consume_errors()
85            raise ValueError("Certificate public key is of an unknown type")
86
87        pkey = self._backend._ffi.gc(pkey, self._backend._lib.EVP_PKEY_free)
88
89        return self._backend._evp_pkey_to_public_key(pkey)
90
91    @property
92    def not_valid_before(self):
93        asn1_time = self._backend._lib.X509_get_notBefore(self._x509)
94        return _parse_asn1_time(self._backend, asn1_time)
95
96    @property
97    def not_valid_after(self):
98        asn1_time = self._backend._lib.X509_get_notAfter(self._x509)
99        return _parse_asn1_time(self._backend, asn1_time)
100
101    @property
102    def issuer(self):
103        issuer = self._backend._lib.X509_get_issuer_name(self._x509)
104        self._backend.openssl_assert(issuer != self._backend._ffi.NULL)
105        return _decode_x509_name(self._backend, issuer)
106
107    @property
108    def subject(self):
109        subject = self._backend._lib.X509_get_subject_name(self._x509)
110        self._backend.openssl_assert(subject != self._backend._ffi.NULL)
111        return _decode_x509_name(self._backend, subject)
112
113    @property
114    def signature_hash_algorithm(self):
115        oid = self.signature_algorithm_oid
116        try:
117            return x509._SIG_OIDS_TO_HASH[oid]
118        except KeyError:
119            raise UnsupportedAlgorithm(
120                "Signature algorithm OID:{0} not recognized".format(oid)
121            )
122
123    @property
124    def signature_algorithm_oid(self):
125        alg = self._backend._ffi.new("X509_ALGOR **")
126        self._backend._lib.X509_get0_signature(
127            self._backend._ffi.NULL, alg, self._x509
128        )
129        self._backend.openssl_assert(alg[0] != self._backend._ffi.NULL)
130        oid = _obj2txt(self._backend, alg[0].algorithm)
131        return x509.ObjectIdentifier(oid)
132
133    @utils.cached_property
134    def extensions(self):
135        if self._backend._lib.CRYPTOGRAPHY_OPENSSL_110_OR_GREATER:
136            return _CERTIFICATE_EXTENSION_PARSER.parse(
137                self._backend, self._x509
138            )
139        else:
140            return _CERTIFICATE_EXTENSION_PARSER_NO_SCT.parse(
141                self._backend, self._x509
142            )
143
144    @property
145    def signature(self):
146        sig = self._backend._ffi.new("ASN1_BIT_STRING **")
147        self._backend._lib.X509_get0_signature(
148            sig, self._backend._ffi.NULL, self._x509
149        )
150        self._backend.openssl_assert(sig[0] != self._backend._ffi.NULL)
151        return _asn1_string_to_bytes(self._backend, sig[0])
152
153    @property
154    def tbs_certificate_bytes(self):
155        pp = self._backend._ffi.new("unsigned char **")
156        res = self._backend._lib.i2d_re_X509_tbs(self._x509, pp)
157        self._backend.openssl_assert(res > 0)
158        pp = self._backend._ffi.gc(
159            pp, lambda pointer: self._backend._lib.OPENSSL_free(pointer[0])
160        )
161        return self._backend._ffi.buffer(pp[0], res)[:]
162
163    def public_bytes(self, encoding):
164        bio = self._backend._create_mem_bio_gc()
165        if encoding is serialization.Encoding.PEM:
166            res = self._backend._lib.PEM_write_bio_X509(bio, self._x509)
167        elif encoding is serialization.Encoding.DER:
168            res = self._backend._lib.i2d_X509_bio(bio, self._x509)
169        else:
170            raise TypeError("encoding must be an item from the Encoding enum")
171
172        self._backend.openssl_assert(res == 1)
173        return self._backend._read_mem_bio(bio)
174
175
176@utils.register_interface(x509.RevokedCertificate)
177class _RevokedCertificate(object):
178    def __init__(self, backend, crl, x509_revoked):
179        self._backend = backend
180        # The X509_REVOKED_value is a X509_REVOKED * that has
181        # no reference counting. This means when X509_CRL_free is
182        # called then the CRL and all X509_REVOKED * are freed. Since
183        # you can retain a reference to a single revoked certificate
184        # and let the CRL fall out of scope we need to retain a
185        # private reference to the CRL inside the RevokedCertificate
186        # object to prevent the gc from being called inappropriately.
187        self._crl = crl
188        self._x509_revoked = x509_revoked
189
190    @property
191    def serial_number(self):
192        asn1_int = self._backend._lib.X509_REVOKED_get0_serialNumber(
193            self._x509_revoked
194        )
195        self._backend.openssl_assert(asn1_int != self._backend._ffi.NULL)
196        return _asn1_integer_to_int(self._backend, asn1_int)
197
198    @property
199    def revocation_date(self):
200        return _parse_asn1_time(
201            self._backend,
202            self._backend._lib.X509_REVOKED_get0_revocationDate(
203                self._x509_revoked
204            )
205        )
206
207    @utils.cached_property
208    def extensions(self):
209        return _REVOKED_CERTIFICATE_EXTENSION_PARSER.parse(
210            self._backend, self._x509_revoked
211        )
212
213
214@utils.register_interface(x509.CertificateRevocationList)
215class _CertificateRevocationList(object):
216    def __init__(self, backend, x509_crl):
217        self._backend = backend
218        self._x509_crl = x509_crl
219
220    def __eq__(self, other):
221        if not isinstance(other, x509.CertificateRevocationList):
222            return NotImplemented
223
224        res = self._backend._lib.X509_CRL_cmp(self._x509_crl, other._x509_crl)
225        return res == 0
226
227    def __ne__(self, other):
228        return not self == other
229
230    def fingerprint(self, algorithm):
231        h = hashes.Hash(algorithm, self._backend)
232        bio = self._backend._create_mem_bio_gc()
233        res = self._backend._lib.i2d_X509_CRL_bio(
234            bio, self._x509_crl
235        )
236        self._backend.openssl_assert(res == 1)
237        der = self._backend._read_mem_bio(bio)
238        h.update(der)
239        return h.finalize()
240
241    @utils.cached_property
242    def _sorted_crl(self):
243        # X509_CRL_get0_by_serial sorts in place, which breaks a variety of
244        # things we don't want to break (like iteration and the signature).
245        # Let's dupe it and sort that instead.
246        dup = self._backend._lib.X509_CRL_dup(self._x509_crl)
247        self._backend.openssl_assert(dup != self._backend._ffi.NULL)
248        dup = self._backend._ffi.gc(dup, self._backend._lib.X509_CRL_free)
249        return dup
250
251    def get_revoked_certificate_by_serial_number(self, serial_number):
252        revoked = self._backend._ffi.new("X509_REVOKED **")
253        asn1_int = _encode_asn1_int_gc(self._backend, serial_number)
254        res = self._backend._lib.X509_CRL_get0_by_serial(
255            self._sorted_crl, revoked, asn1_int
256        )
257        if res == 0:
258            return None
259        else:
260            self._backend.openssl_assert(
261                revoked[0] != self._backend._ffi.NULL
262            )
263            return _RevokedCertificate(
264                self._backend, self._sorted_crl, revoked[0]
265            )
266
267    @property
268    def signature_hash_algorithm(self):
269        oid = self.signature_algorithm_oid
270        try:
271            return x509._SIG_OIDS_TO_HASH[oid]
272        except KeyError:
273            raise UnsupportedAlgorithm(
274                "Signature algorithm OID:{0} not recognized".format(oid)
275            )
276
277    @property
278    def signature_algorithm_oid(self):
279        alg = self._backend._ffi.new("X509_ALGOR **")
280        self._backend._lib.X509_CRL_get0_signature(
281            self._x509_crl, self._backend._ffi.NULL, alg
282        )
283        self._backend.openssl_assert(alg[0] != self._backend._ffi.NULL)
284        oid = _obj2txt(self._backend, alg[0].algorithm)
285        return x509.ObjectIdentifier(oid)
286
287    @property
288    def issuer(self):
289        issuer = self._backend._lib.X509_CRL_get_issuer(self._x509_crl)
290        self._backend.openssl_assert(issuer != self._backend._ffi.NULL)
291        return _decode_x509_name(self._backend, issuer)
292
293    @property
294    def next_update(self):
295        nu = self._backend._lib.X509_CRL_get_nextUpdate(self._x509_crl)
296        self._backend.openssl_assert(nu != self._backend._ffi.NULL)
297        return _parse_asn1_time(self._backend, nu)
298
299    @property
300    def last_update(self):
301        lu = self._backend._lib.X509_CRL_get_lastUpdate(self._x509_crl)
302        self._backend.openssl_assert(lu != self._backend._ffi.NULL)
303        return _parse_asn1_time(self._backend, lu)
304
305    @property
306    def signature(self):
307        sig = self._backend._ffi.new("ASN1_BIT_STRING **")
308        self._backend._lib.X509_CRL_get0_signature(
309            self._x509_crl, sig, self._backend._ffi.NULL
310        )
311        self._backend.openssl_assert(sig[0] != self._backend._ffi.NULL)
312        return _asn1_string_to_bytes(self._backend, sig[0])
313
314    @property
315    def tbs_certlist_bytes(self):
316        pp = self._backend._ffi.new("unsigned char **")
317        res = self._backend._lib.i2d_re_X509_CRL_tbs(self._x509_crl, pp)
318        self._backend.openssl_assert(res > 0)
319        pp = self._backend._ffi.gc(
320            pp, lambda pointer: self._backend._lib.OPENSSL_free(pointer[0])
321        )
322        return self._backend._ffi.buffer(pp[0], res)[:]
323
324    def public_bytes(self, encoding):
325        bio = self._backend._create_mem_bio_gc()
326        if encoding is serialization.Encoding.PEM:
327            res = self._backend._lib.PEM_write_bio_X509_CRL(
328                bio, self._x509_crl
329            )
330        elif encoding is serialization.Encoding.DER:
331            res = self._backend._lib.i2d_X509_CRL_bio(bio, self._x509_crl)
332        else:
333            raise TypeError("encoding must be an item from the Encoding enum")
334
335        self._backend.openssl_assert(res == 1)
336        return self._backend._read_mem_bio(bio)
337
338    def _revoked_cert(self, idx):
339        revoked = self._backend._lib.X509_CRL_get_REVOKED(self._x509_crl)
340        r = self._backend._lib.sk_X509_REVOKED_value(revoked, idx)
341        self._backend.openssl_assert(r != self._backend._ffi.NULL)
342        return _RevokedCertificate(self._backend, self, r)
343
344    def __iter__(self):
345        for i in range(len(self)):
346            yield self._revoked_cert(i)
347
348    def __getitem__(self, idx):
349        if isinstance(idx, slice):
350            start, stop, step = idx.indices(len(self))
351            return [self._revoked_cert(i) for i in range(start, stop, step)]
352        else:
353            idx = operator.index(idx)
354            if idx < 0:
355                idx += len(self)
356            if not 0 <= idx < len(self):
357                raise IndexError
358            return self._revoked_cert(idx)
359
360    def __len__(self):
361        revoked = self._backend._lib.X509_CRL_get_REVOKED(self._x509_crl)
362        if revoked == self._backend._ffi.NULL:
363            return 0
364        else:
365            return self._backend._lib.sk_X509_REVOKED_num(revoked)
366
367    @utils.cached_property
368    def extensions(self):
369        return _CRL_EXTENSION_PARSER.parse(self._backend, self._x509_crl)
370
371    def is_signature_valid(self, public_key):
372        if not isinstance(public_key, (dsa.DSAPublicKey, rsa.RSAPublicKey,
373                                       ec.EllipticCurvePublicKey)):
374            raise TypeError('Expecting one of DSAPublicKey, RSAPublicKey,'
375                            ' or EllipticCurvePublicKey.')
376        res = self._backend._lib.X509_CRL_verify(
377            self._x509_crl, public_key._evp_pkey
378        )
379
380        if res != 1:
381            self._backend._consume_errors()
382            return False
383
384        return True
385
386
387@utils.register_interface(x509.CertificateSigningRequest)
388class _CertificateSigningRequest(object):
389    def __init__(self, backend, x509_req):
390        self._backend = backend
391        self._x509_req = x509_req
392
393    def __eq__(self, other):
394        if not isinstance(other, _CertificateSigningRequest):
395            return NotImplemented
396
397        self_bytes = self.public_bytes(serialization.Encoding.DER)
398        other_bytes = other.public_bytes(serialization.Encoding.DER)
399        return self_bytes == other_bytes
400
401    def __ne__(self, other):
402        return not self == other
403
404    def __hash__(self):
405        return hash(self.public_bytes(serialization.Encoding.DER))
406
407    def public_key(self):
408        pkey = self._backend._lib.X509_REQ_get_pubkey(self._x509_req)
409        self._backend.openssl_assert(pkey != self._backend._ffi.NULL)
410        pkey = self._backend._ffi.gc(pkey, self._backend._lib.EVP_PKEY_free)
411        return self._backend._evp_pkey_to_public_key(pkey)
412
413    @property
414    def subject(self):
415        subject = self._backend._lib.X509_REQ_get_subject_name(self._x509_req)
416        self._backend.openssl_assert(subject != self._backend._ffi.NULL)
417        return _decode_x509_name(self._backend, subject)
418
419    @property
420    def signature_hash_algorithm(self):
421        oid = self.signature_algorithm_oid
422        try:
423            return x509._SIG_OIDS_TO_HASH[oid]
424        except KeyError:
425            raise UnsupportedAlgorithm(
426                "Signature algorithm OID:{0} not recognized".format(oid)
427            )
428
429    @property
430    def signature_algorithm_oid(self):
431        alg = self._backend._ffi.new("X509_ALGOR **")
432        self._backend._lib.X509_REQ_get0_signature(
433            self._x509_req, self._backend._ffi.NULL, alg
434        )
435        self._backend.openssl_assert(alg[0] != self._backend._ffi.NULL)
436        oid = _obj2txt(self._backend, alg[0].algorithm)
437        return x509.ObjectIdentifier(oid)
438
439    @utils.cached_property
440    def extensions(self):
441        x509_exts = self._backend._lib.X509_REQ_get_extensions(self._x509_req)
442        x509_exts = self._backend._ffi.gc(
443            x509_exts,
444            lambda x: self._backend._lib.sk_X509_EXTENSION_pop_free(
445                x, self._backend._ffi.addressof(
446                    self._backend._lib._original_lib, "X509_EXTENSION_free"
447                )
448            )
449        )
450        return _CSR_EXTENSION_PARSER.parse(self._backend, x509_exts)
451
452    def public_bytes(self, encoding):
453        bio = self._backend._create_mem_bio_gc()
454        if encoding is serialization.Encoding.PEM:
455            res = self._backend._lib.PEM_write_bio_X509_REQ(
456                bio, self._x509_req
457            )
458        elif encoding is serialization.Encoding.DER:
459            res = self._backend._lib.i2d_X509_REQ_bio(bio, self._x509_req)
460        else:
461            raise TypeError("encoding must be an item from the Encoding enum")
462
463        self._backend.openssl_assert(res == 1)
464        return self._backend._read_mem_bio(bio)
465
466    @property
467    def tbs_certrequest_bytes(self):
468        pp = self._backend._ffi.new("unsigned char **")
469        res = self._backend._lib.i2d_re_X509_REQ_tbs(self._x509_req, pp)
470        self._backend.openssl_assert(res > 0)
471        pp = self._backend._ffi.gc(
472            pp, lambda pointer: self._backend._lib.OPENSSL_free(pointer[0])
473        )
474        return self._backend._ffi.buffer(pp[0], res)[:]
475
476    @property
477    def signature(self):
478        sig = self._backend._ffi.new("ASN1_BIT_STRING **")
479        self._backend._lib.X509_REQ_get0_signature(
480            self._x509_req, sig, self._backend._ffi.NULL
481        )
482        self._backend.openssl_assert(sig[0] != self._backend._ffi.NULL)
483        return _asn1_string_to_bytes(self._backend, sig[0])
484
485    @property
486    def is_signature_valid(self):
487        pkey = self._backend._lib.X509_REQ_get_pubkey(self._x509_req)
488        self._backend.openssl_assert(pkey != self._backend._ffi.NULL)
489        pkey = self._backend._ffi.gc(pkey, self._backend._lib.EVP_PKEY_free)
490        res = self._backend._lib.X509_REQ_verify(self._x509_req, pkey)
491
492        if res != 1:
493            self._backend._consume_errors()
494            return False
495
496        return True
497
498
499@utils.register_interface(
500    x509.certificate_transparency.SignedCertificateTimestamp
501)
502class _SignedCertificateTimestamp(object):
503    def __init__(self, backend, sct_list, sct):
504        self._backend = backend
505        # Keep the SCT_LIST that this SCT came from alive.
506        self._sct_list = sct_list
507        self._sct = sct
508
509    @property
510    def version(self):
511        version = self._backend._lib.SCT_get_version(self._sct)
512        assert version == self._backend._lib.SCT_VERSION_V1
513        return x509.certificate_transparency.Version.v1
514
515    @property
516    def log_id(self):
517        out = self._backend._ffi.new("unsigned char **")
518        log_id_length = self._backend._lib.SCT_get0_log_id(self._sct, out)
519        assert log_id_length >= 0
520        return self._backend._ffi.buffer(out[0], log_id_length)[:]
521
522    @property
523    def timestamp(self):
524        timestamp = self._backend._lib.SCT_get_timestamp(self._sct)
525        milliseconds = timestamp % 1000
526        return datetime.datetime.utcfromtimestamp(
527            timestamp // 1000
528        ).replace(microsecond=milliseconds * 1000)
529
530    @property
531    def entry_type(self):
532        entry_type = self._backend._lib.SCT_get_log_entry_type(self._sct)
533        # We currently only support loading SCTs from the X.509 extension, so
534        # we only have precerts.
535        assert entry_type == self._backend._lib.CT_LOG_ENTRY_TYPE_PRECERT
536        return x509.certificate_transparency.LogEntryType.PRE_CERTIFICATE
537
538    @property
539    def _signature(self):
540        ptrptr = self._backend._ffi.new("unsigned char **")
541        res = self._backend._lib.SCT_get0_signature(self._sct, ptrptr)
542        self._backend.openssl_assert(res > 0)
543        self._backend.openssl_assert(ptrptr[0] != self._backend._ffi.NULL)
544        return self._backend._ffi.buffer(ptrptr[0], res)[:]
545
546    def __hash__(self):
547        return hash(self._signature)
548
549    def __eq__(self, other):
550        if not isinstance(other, _SignedCertificateTimestamp):
551            return NotImplemented
552
553        return self._signature == other._signature
554
555    def __ne__(self, other):
556        return not self == other
557