1# This file is dual licensed under the terms of the Apache License, Version 2# 2.0, and the BSD License. See the LICENSE file in the root of this repository 3# for complete details. 4 5from __future__ import absolute_import, division, print_function 6 7import json 8import os 9import subprocess 10import sys 11import textwrap 12 13import pytest 14 15from cryptography.hazmat.bindings.openssl.binding import Binding 16 17 18MEMORY_LEAK_SCRIPT = """ 19import sys 20 21 22def main(argv): 23 import gc 24 import json 25 26 import cffi 27 28 from cryptography.hazmat.bindings._openssl import ffi, lib 29 30 heap = {} 31 32 BACKTRACE_ENABLED = False 33 if BACKTRACE_ENABLED: 34 backtrace_ffi = cffi.FFI() 35 backtrace_ffi.cdef(''' 36 int backtrace(void **, int); 37 char **backtrace_symbols(void *const *, int); 38 ''') 39 backtrace_lib = backtrace_ffi.dlopen(None) 40 41 def backtrace(): 42 buf = backtrace_ffi.new("void*[]", 24) 43 length = backtrace_lib.backtrace(buf, len(buf)) 44 return (buf, length) 45 46 def symbolize_backtrace(trace): 47 (buf, length) = trace 48 symbols = backtrace_lib.backtrace_symbols(buf, length) 49 stack = [ 50 backtrace_ffi.string(symbols[i]).decode() 51 for i in range(length) 52 ] 53 lib.Cryptography_free_wrapper(symbols, backtrace_ffi.NULL, 0) 54 return stack 55 else: 56 def backtrace(): 57 return None 58 59 def symbolize_backtrace(trace): 60 return None 61 62 @ffi.callback("void *(size_t, const char *, int)") 63 def malloc(size, path, line): 64 ptr = lib.Cryptography_malloc_wrapper(size, path, line) 65 heap[ptr] = (size, path, line, backtrace()) 66 return ptr 67 68 @ffi.callback("void *(void *, size_t, const char *, int)") 69 def realloc(ptr, size, path, line): 70 if ptr != ffi.NULL: 71 del heap[ptr] 72 new_ptr = lib.Cryptography_realloc_wrapper(ptr, size, path, line) 73 heap[new_ptr] = (size, path, line, backtrace()) 74 return new_ptr 75 76 @ffi.callback("void(void *, const char *, int)") 77 def free(ptr, path, line): 78 if ptr != ffi.NULL: 79 del heap[ptr] 80 lib.Cryptography_free_wrapper(ptr, path, line) 81 82 result = lib.Cryptography_CRYPTO_set_mem_functions(malloc, realloc, free) 83 assert result == 1 84 85 # Trigger a bunch of initialization stuff. 86 import cryptography.hazmat.backends.openssl 87 88 start_heap = set(heap) 89 90 func(*argv[1:]) 91 gc.collect() 92 gc.collect() 93 gc.collect() 94 95 if lib.Cryptography_HAS_OPENSSL_CLEANUP: 96 lib.OPENSSL_cleanup() 97 98 # Swap back to the original functions so that if OpenSSL tries to free 99 # something from its atexit handle it won't be going through a Python 100 # function, which will be deallocated when this function returns 101 result = lib.Cryptography_CRYPTO_set_mem_functions( 102 ffi.addressof(lib, "Cryptography_malloc_wrapper"), 103 ffi.addressof(lib, "Cryptography_realloc_wrapper"), 104 ffi.addressof(lib, "Cryptography_free_wrapper"), 105 ) 106 assert result == 1 107 108 remaining = set(heap) - start_heap 109 110 if remaining: 111 sys.stdout.write(json.dumps(dict( 112 (int(ffi.cast("size_t", ptr)), { 113 "size": heap[ptr][0], 114 "path": ffi.string(heap[ptr][1]).decode(), 115 "line": heap[ptr][2], 116 "backtrace": symbolize_backtrace(heap[ptr][3]), 117 }) 118 for ptr in remaining 119 ))) 120 sys.stdout.flush() 121 sys.exit(255) 122 123main(sys.argv) 124""" 125 126 127def assert_no_memory_leaks(s, argv=[]): 128 env = os.environ.copy() 129 env["PYTHONPATH"] = os.pathsep.join(sys.path) 130 argv = [ 131 sys.executable, "-c", "{0}\n\n{1}".format(s, MEMORY_LEAK_SCRIPT) 132 ] + argv 133 # Shell out to a fresh Python process because OpenSSL does not allow you to 134 # install new memory hooks after the first malloc/free occurs. 135 proc = subprocess.Popen( 136 argv, 137 env=env, 138 stdout=subprocess.PIPE, 139 stderr=subprocess.PIPE, 140 ) 141 try: 142 proc.wait() 143 if proc.returncode == 255: 144 # 255 means there was a leak, load the info about what mallocs 145 # weren't freed. 146 out = json.loads(proc.stdout.read().decode()) 147 raise AssertionError(out) 148 elif proc.returncode != 0: 149 # Any exception type will do to be honest 150 raise ValueError(proc.stdout.read(), proc.stderr.read()) 151 finally: 152 proc.stdout.close() 153 proc.stderr.close() 154 155 156def skip_if_memtesting_not_supported(): 157 return pytest.mark.skipif( 158 not Binding().lib.Cryptography_HAS_MEM_FUNCTIONS, 159 reason="Requires OpenSSL memory functions (>=1.1.0)" 160 ) 161 162 163@skip_if_memtesting_not_supported() 164class TestAssertNoMemoryLeaks(object): 165 def test_no_leak_no_malloc(self): 166 assert_no_memory_leaks(textwrap.dedent(""" 167 def func(): 168 pass 169 """)) 170 171 def test_no_leak_free(self): 172 assert_no_memory_leaks(textwrap.dedent(""" 173 def func(): 174 from cryptography.hazmat.bindings.openssl.binding import Binding 175 b = Binding() 176 name = b.lib.X509_NAME_new() 177 b.lib.X509_NAME_free(name) 178 """)) 179 180 def test_no_leak_gc(self): 181 assert_no_memory_leaks(textwrap.dedent(""" 182 def func(): 183 from cryptography.hazmat.bindings.openssl.binding import Binding 184 b = Binding() 185 name = b.lib.X509_NAME_new() 186 b.ffi.gc(name, b.lib.X509_NAME_free) 187 """)) 188 189 def test_leak(self): 190 with pytest.raises(AssertionError): 191 assert_no_memory_leaks(textwrap.dedent(""" 192 def func(): 193 from cryptography.hazmat.bindings.openssl.binding import ( 194 Binding 195 ) 196 b = Binding() 197 b.lib.X509_NAME_new() 198 """)) 199 200 def test_errors(self): 201 with pytest.raises(ValueError): 202 assert_no_memory_leaks(textwrap.dedent(""" 203 def func(): 204 raise ZeroDivisionError 205 """)) 206 207 208@skip_if_memtesting_not_supported() 209class TestOpenSSLMemoryLeaks(object): 210 @pytest.mark.parametrize("path", [ 211 "x509/PKITS_data/certs/ValidcRLIssuerTest28EE.crt", 212 ]) 213 def test_x509_certificate_extensions(self, path): 214 assert_no_memory_leaks(textwrap.dedent(""" 215 def func(path): 216 from cryptography import x509 217 from cryptography.hazmat.backends.openssl import backend 218 219 import cryptography_vectors 220 221 with cryptography_vectors.open_vector_file(path, "rb") as f: 222 cert = x509.load_der_x509_certificate( 223 f.read(), backend 224 ) 225 226 cert.extensions 227 """), [path]) 228 229 def test_x509_csr_extensions(self): 230 assert_no_memory_leaks(textwrap.dedent(""" 231 def func(): 232 from cryptography import x509 233 from cryptography.hazmat.backends.openssl import backend 234 from cryptography.hazmat.primitives import hashes 235 from cryptography.hazmat.primitives.asymmetric import rsa 236 237 private_key = rsa.generate_private_key( 238 key_size=2048, public_exponent=65537, backend=backend 239 ) 240 cert = x509.CertificateSigningRequestBuilder().subject_name( 241 x509.Name([]) 242 ).add_extension( 243 x509.OCSPNoCheck(), critical=False 244 ).sign(private_key, hashes.SHA256(), backend) 245 246 cert.extensions 247 """)) 248 249 def test_ec_private_numbers_private_key(self): 250 assert_no_memory_leaks(textwrap.dedent(""" 251 def func(): 252 from cryptography.hazmat.backends.openssl import backend 253 from cryptography.hazmat.primitives.asymmetric import ec 254 255 ec.EllipticCurvePrivateNumbers( 256 private_value=int( 257 '280814107134858470598753916394807521398239633534281633982576099083' 258 '35787109896602102090002196616273211495718603965098' 259 ), 260 public_numbers=ec.EllipticCurvePublicNumbers( 261 curve=ec.SECP384R1(), 262 x=int( 263 '10036914308591746758780165503819213553101287571902957054148542' 264 '504671046744460374996612408381962208627004841444205030' 265 ), 266 y=int( 267 '17337335659928075994560513699823544906448896792102247714689323' 268 '575406618073069185107088229463828921069465902299522926' 269 ) 270 ) 271 ).private_key(backend) 272 """)) 273 274 def test_ec_derive_private_key(self): 275 assert_no_memory_leaks(textwrap.dedent(""" 276 def func(): 277 from cryptography.hazmat.backends.openssl import backend 278 from cryptography.hazmat.primitives.asymmetric import ec 279 ec.derive_private_key(1, ec.SECP256R1(), backend) 280 """)) 281 282 def test_x25519_pubkey_from_private_key(self): 283 assert_no_memory_leaks(textwrap.dedent(""" 284 def func(): 285 from cryptography.hazmat.primitives.asymmetric import x25519 286 private_key = x25519.X25519PrivateKey.generate() 287 private_key.public_key() 288 """)) 289 290 def test_create_ocsp_request(self): 291 assert_no_memory_leaks(textwrap.dedent(""" 292 def func(): 293 from cryptography import x509 294 from cryptography.hazmat.backends.openssl import backend 295 from cryptography.hazmat.primitives import hashes 296 from cryptography.x509 import ocsp 297 import cryptography_vectors 298 299 path = "x509/PKITS_data/certs/ValidcRLIssuerTest28EE.crt" 300 with cryptography_vectors.open_vector_file(path, "rb") as f: 301 cert = x509.load_der_x509_certificate( 302 f.read(), backend 303 ) 304 builder = ocsp.OCSPRequestBuilder() 305 builder = builder.add_certificate( 306 cert, cert, hashes.SHA1() 307 ).add_extension(x509.OCSPNonce(b"0000"), False) 308 req = builder.build() 309 """)) 310 311 @pytest.mark.parametrize("path", [ 312 "pkcs12/cert-aes256cbc-no-key.p12", 313 "pkcs12/cert-key-aes256cbc.p12", 314 ]) 315 def test_load_pkcs12_key_and_certificates(self, path): 316 assert_no_memory_leaks(textwrap.dedent(""" 317 def func(path): 318 from cryptography import x509 319 from cryptography.hazmat.backends.openssl import backend 320 from cryptography.hazmat.primitives.serialization import pkcs12 321 import cryptography_vectors 322 323 with cryptography_vectors.open_vector_file(path, "rb") as f: 324 pkcs12.load_key_and_certificates( 325 f.read(), b"cryptography", backend 326 ) 327 """), [path]) 328 329 def test_create_crl_with_idp(self): 330 assert_no_memory_leaks(textwrap.dedent(""" 331 def func(): 332 import datetime 333 from cryptography import x509 334 from cryptography.hazmat.backends.openssl import backend 335 from cryptography.hazmat.primitives import hashes 336 from cryptography.hazmat.primitives.asymmetric import ec 337 from cryptography.x509.oid import NameOID 338 339 key = ec.generate_private_key(ec.SECP256R1(), backend) 340 last_update = datetime.datetime(2002, 1, 1, 12, 1) 341 next_update = datetime.datetime(2030, 1, 1, 12, 1) 342 idp = x509.IssuingDistributionPoint( 343 full_name=None, 344 relative_name=x509.RelativeDistinguishedName([ 345 x509.NameAttribute( 346 oid=x509.NameOID.ORGANIZATION_NAME, value=u"PyCA") 347 ]), 348 only_contains_user_certs=False, 349 only_contains_ca_certs=True, 350 only_some_reasons=None, 351 indirect_crl=False, 352 only_contains_attribute_certs=False, 353 ) 354 builder = x509.CertificateRevocationListBuilder().issuer_name( 355 x509.Name([ 356 x509.NameAttribute( 357 NameOID.COMMON_NAME, u"cryptography.io CA" 358 ) 359 ]) 360 ).last_update( 361 last_update 362 ).next_update( 363 next_update 364 ).add_extension( 365 idp, True 366 ) 367 368 crl = builder.sign(key, hashes.SHA256(), backend) 369 crl.extensions.get_extension_for_class( 370 x509.IssuingDistributionPoint 371 ) 372 """)) 373