1# This file is dual licensed under the terms of the Apache License, Version
2# 2.0, and the BSD License. See the LICENSE file in the root of this repository
3# for complete details.
4
5from __future__ import absolute_import, division, print_function
6
7import json
8import os
9import subprocess
10import sys
11import textwrap
12
13import pytest
14
15from cryptography.hazmat.bindings.openssl.binding import Binding
16
17
18MEMORY_LEAK_SCRIPT = """
19import sys
20
21
22def main(argv):
23    import gc
24    import json
25
26    import cffi
27
28    from cryptography.hazmat.bindings._openssl import ffi, lib
29
30    heap = {}
31
32    BACKTRACE_ENABLED = False
33    if BACKTRACE_ENABLED:
34        backtrace_ffi = cffi.FFI()
35        backtrace_ffi.cdef('''
36            int backtrace(void **, int);
37            char **backtrace_symbols(void *const *, int);
38        ''')
39        backtrace_lib = backtrace_ffi.dlopen(None)
40
41        def backtrace():
42            buf = backtrace_ffi.new("void*[]", 24)
43            length = backtrace_lib.backtrace(buf, len(buf))
44            return (buf, length)
45
46        def symbolize_backtrace(trace):
47            (buf, length) = trace
48            symbols = backtrace_lib.backtrace_symbols(buf, length)
49            stack = [
50                backtrace_ffi.string(symbols[i]).decode()
51                for i in range(length)
52            ]
53            lib.Cryptography_free_wrapper(symbols, backtrace_ffi.NULL, 0)
54            return stack
55    else:
56        def backtrace():
57            return None
58
59        def symbolize_backtrace(trace):
60            return None
61
62    @ffi.callback("void *(size_t, const char *, int)")
63    def malloc(size, path, line):
64        ptr = lib.Cryptography_malloc_wrapper(size, path, line)
65        heap[ptr] = (size, path, line, backtrace())
66        return ptr
67
68    @ffi.callback("void *(void *, size_t, const char *, int)")
69    def realloc(ptr, size, path, line):
70        if ptr != ffi.NULL:
71            del heap[ptr]
72        new_ptr = lib.Cryptography_realloc_wrapper(ptr, size, path, line)
73        heap[new_ptr] = (size, path, line, backtrace())
74        return new_ptr
75
76    @ffi.callback("void(void *, const char *, int)")
77    def free(ptr, path, line):
78        if ptr != ffi.NULL:
79            del heap[ptr]
80            lib.Cryptography_free_wrapper(ptr, path, line)
81
82    result = lib.Cryptography_CRYPTO_set_mem_functions(malloc, realloc, free)
83    assert result == 1
84
85    # Trigger a bunch of initialization stuff.
86    import cryptography.hazmat.backends.openssl
87
88    start_heap = set(heap)
89
90    func(*argv[1:])
91    gc.collect()
92    gc.collect()
93    gc.collect()
94
95    if lib.Cryptography_HAS_OPENSSL_CLEANUP:
96        lib.OPENSSL_cleanup()
97
98    # Swap back to the original functions so that if OpenSSL tries to free
99    # something from its atexit handle it won't be going through a Python
100    # function, which will be deallocated when this function returns
101    result = lib.Cryptography_CRYPTO_set_mem_functions(
102        ffi.addressof(lib, "Cryptography_malloc_wrapper"),
103        ffi.addressof(lib, "Cryptography_realloc_wrapper"),
104        ffi.addressof(lib, "Cryptography_free_wrapper"),
105    )
106    assert result == 1
107
108    remaining = set(heap) - start_heap
109
110    if remaining:
111        sys.stdout.write(json.dumps(dict(
112            (int(ffi.cast("size_t", ptr)), {
113                "size": heap[ptr][0],
114                "path": ffi.string(heap[ptr][1]).decode(),
115                "line": heap[ptr][2],
116                "backtrace": symbolize_backtrace(heap[ptr][3]),
117            })
118            for ptr in remaining
119        )))
120        sys.stdout.flush()
121        sys.exit(255)
122
123main(sys.argv)
124"""
125
126
127def assert_no_memory_leaks(s, argv=[]):
128    env = os.environ.copy()
129    env["PYTHONPATH"] = os.pathsep.join(sys.path)
130    argv = [
131        sys.executable, "-c", "{0}\n\n{1}".format(s, MEMORY_LEAK_SCRIPT)
132    ] + argv
133    # Shell out to a fresh Python process because OpenSSL does not allow you to
134    # install new memory hooks after the first malloc/free occurs.
135    proc = subprocess.Popen(
136        argv,
137        env=env,
138        stdout=subprocess.PIPE,
139        stderr=subprocess.PIPE,
140    )
141    try:
142        proc.wait()
143        if proc.returncode == 255:
144            # 255 means there was a leak, load the info about what mallocs
145            # weren't freed.
146            out = json.loads(proc.stdout.read().decode())
147            raise AssertionError(out)
148        elif proc.returncode != 0:
149            # Any exception type will do to be honest
150            raise ValueError(proc.stdout.read(), proc.stderr.read())
151    finally:
152        proc.stdout.close()
153        proc.stderr.close()
154
155
156def skip_if_memtesting_not_supported():
157    return pytest.mark.skipif(
158        not Binding().lib.Cryptography_HAS_MEM_FUNCTIONS,
159        reason="Requires OpenSSL memory functions (>=1.1.0)"
160    )
161
162
163@skip_if_memtesting_not_supported()
164class TestAssertNoMemoryLeaks(object):
165    def test_no_leak_no_malloc(self):
166        assert_no_memory_leaks(textwrap.dedent("""
167        def func():
168            pass
169        """))
170
171    def test_no_leak_free(self):
172        assert_no_memory_leaks(textwrap.dedent("""
173        def func():
174            from cryptography.hazmat.bindings.openssl.binding import Binding
175            b = Binding()
176            name = b.lib.X509_NAME_new()
177            b.lib.X509_NAME_free(name)
178        """))
179
180    def test_no_leak_gc(self):
181        assert_no_memory_leaks(textwrap.dedent("""
182        def func():
183            from cryptography.hazmat.bindings.openssl.binding import Binding
184            b = Binding()
185            name = b.lib.X509_NAME_new()
186            b.ffi.gc(name, b.lib.X509_NAME_free)
187        """))
188
189    def test_leak(self):
190        with pytest.raises(AssertionError):
191            assert_no_memory_leaks(textwrap.dedent("""
192            def func():
193                from cryptography.hazmat.bindings.openssl.binding import (
194                    Binding
195                )
196                b = Binding()
197                b.lib.X509_NAME_new()
198            """))
199
200    def test_errors(self):
201        with pytest.raises(ValueError):
202            assert_no_memory_leaks(textwrap.dedent("""
203            def func():
204                raise ZeroDivisionError
205            """))
206
207
208@skip_if_memtesting_not_supported()
209class TestOpenSSLMemoryLeaks(object):
210    @pytest.mark.parametrize("path", [
211        "x509/PKITS_data/certs/ValidcRLIssuerTest28EE.crt",
212    ])
213    def test_x509_certificate_extensions(self, path):
214        assert_no_memory_leaks(textwrap.dedent("""
215        def func(path):
216            from cryptography import x509
217            from cryptography.hazmat.backends.openssl import backend
218
219            import cryptography_vectors
220
221            with cryptography_vectors.open_vector_file(path, "rb") as f:
222                cert = x509.load_der_x509_certificate(
223                    f.read(), backend
224                )
225
226            cert.extensions
227        """), [path])
228
229    def test_x509_csr_extensions(self):
230        assert_no_memory_leaks(textwrap.dedent("""
231        def func():
232            from cryptography import x509
233            from cryptography.hazmat.backends.openssl import backend
234            from cryptography.hazmat.primitives import hashes
235            from cryptography.hazmat.primitives.asymmetric import rsa
236
237            private_key = rsa.generate_private_key(
238                key_size=2048, public_exponent=65537, backend=backend
239            )
240            cert = x509.CertificateSigningRequestBuilder().subject_name(
241                x509.Name([])
242            ).add_extension(
243               x509.OCSPNoCheck(), critical=False
244            ).sign(private_key, hashes.SHA256(), backend)
245
246            cert.extensions
247        """))
248
249    def test_ec_private_numbers_private_key(self):
250        assert_no_memory_leaks(textwrap.dedent("""
251        def func():
252            from cryptography.hazmat.backends.openssl import backend
253            from cryptography.hazmat.primitives.asymmetric import ec
254
255            ec.EllipticCurvePrivateNumbers(
256                private_value=int(
257                    '280814107134858470598753916394807521398239633534281633982576099083'
258                    '35787109896602102090002196616273211495718603965098'
259                ),
260                public_numbers=ec.EllipticCurvePublicNumbers(
261                    curve=ec.SECP384R1(),
262                    x=int(
263                        '10036914308591746758780165503819213553101287571902957054148542'
264                        '504671046744460374996612408381962208627004841444205030'
265                    ),
266                    y=int(
267                        '17337335659928075994560513699823544906448896792102247714689323'
268                        '575406618073069185107088229463828921069465902299522926'
269                    )
270                )
271            ).private_key(backend)
272        """))
273
274    def test_ec_derive_private_key(self):
275        assert_no_memory_leaks(textwrap.dedent("""
276        def func():
277            from cryptography.hazmat.backends.openssl import backend
278            from cryptography.hazmat.primitives.asymmetric import ec
279            ec.derive_private_key(1, ec.SECP256R1(), backend)
280        """))
281
282    def test_x25519_pubkey_from_private_key(self):
283        assert_no_memory_leaks(textwrap.dedent("""
284        def func():
285            from cryptography.hazmat.primitives.asymmetric import x25519
286            private_key = x25519.X25519PrivateKey.generate()
287            private_key.public_key()
288        """))
289
290    def test_create_ocsp_request(self):
291        assert_no_memory_leaks(textwrap.dedent("""
292        def func():
293            from cryptography import x509
294            from cryptography.hazmat.backends.openssl import backend
295            from cryptography.hazmat.primitives import hashes
296            from cryptography.x509 import ocsp
297            import cryptography_vectors
298
299            path = "x509/PKITS_data/certs/ValidcRLIssuerTest28EE.crt"
300            with cryptography_vectors.open_vector_file(path, "rb") as f:
301                cert = x509.load_der_x509_certificate(
302                    f.read(), backend
303                )
304            builder = ocsp.OCSPRequestBuilder()
305            builder = builder.add_certificate(
306                cert, cert, hashes.SHA1()
307            ).add_extension(x509.OCSPNonce(b"0000"), False)
308            req = builder.build()
309        """))
310
311    @pytest.mark.parametrize("path", [
312        "pkcs12/cert-aes256cbc-no-key.p12",
313        "pkcs12/cert-key-aes256cbc.p12",
314    ])
315    def test_load_pkcs12_key_and_certificates(self, path):
316        assert_no_memory_leaks(textwrap.dedent("""
317        def func(path):
318            from cryptography import x509
319            from cryptography.hazmat.backends.openssl import backend
320            from cryptography.hazmat.primitives.serialization import pkcs12
321            import cryptography_vectors
322
323            with cryptography_vectors.open_vector_file(path, "rb") as f:
324                pkcs12.load_key_and_certificates(
325                    f.read(), b"cryptography", backend
326                )
327        """), [path])
328
329    def test_create_crl_with_idp(self):
330        assert_no_memory_leaks(textwrap.dedent("""
331        def func():
332            import datetime
333            from cryptography import x509
334            from cryptography.hazmat.backends.openssl import backend
335            from cryptography.hazmat.primitives import hashes
336            from cryptography.hazmat.primitives.asymmetric import ec
337            from cryptography.x509.oid import NameOID
338
339            key = ec.generate_private_key(ec.SECP256R1(), backend)
340            last_update = datetime.datetime(2002, 1, 1, 12, 1)
341            next_update = datetime.datetime(2030, 1, 1, 12, 1)
342            idp = x509.IssuingDistributionPoint(
343                full_name=None,
344                relative_name=x509.RelativeDistinguishedName([
345                    x509.NameAttribute(
346                        oid=x509.NameOID.ORGANIZATION_NAME, value=u"PyCA")
347                ]),
348                only_contains_user_certs=False,
349                only_contains_ca_certs=True,
350                only_some_reasons=None,
351                indirect_crl=False,
352                only_contains_attribute_certs=False,
353            )
354            builder = x509.CertificateRevocationListBuilder().issuer_name(
355                x509.Name([
356                    x509.NameAttribute(
357                        NameOID.COMMON_NAME, u"cryptography.io CA"
358                    )
359                ])
360            ).last_update(
361                last_update
362            ).next_update(
363                next_update
364            ).add_extension(
365                idp, True
366            )
367
368            crl = builder.sign(key, hashes.SHA256(), backend)
369            crl.extensions.get_extension_for_class(
370                x509.IssuingDistributionPoint
371            )
372        """))
373