1from test.support import (TESTFN, run_unittest, import_module, unlink,
2                          requires, _2G, _4G, gc_collect, cpython_only)
3import unittest
4import os
5import re
6import itertools
7import socket
8import sys
9import weakref
10
11# Skip test if we can't import mmap.
12mmap = import_module('mmap')
13
14PAGESIZE = mmap.PAGESIZE
15
16class MmapTests(unittest.TestCase):
17
18    def setUp(self):
19        if os.path.exists(TESTFN):
20            os.unlink(TESTFN)
21
22    def tearDown(self):
23        try:
24            os.unlink(TESTFN)
25        except OSError:
26            pass
27
28    def test_basic(self):
29        # Test mmap module on Unix systems and Windows
30
31        # Create a file to be mmap'ed.
32        f = open(TESTFN, 'bw+')
33        try:
34            # Write 2 pages worth of data to the file
35            f.write(b'\0'* PAGESIZE)
36            f.write(b'foo')
37            f.write(b'\0'* (PAGESIZE-3) )
38            f.flush()
39            m = mmap.mmap(f.fileno(), 2 * PAGESIZE)
40        finally:
41            f.close()
42
43        # Simple sanity checks
44
45        tp = str(type(m))  # SF bug 128713:  segfaulted on Linux
46        self.assertEqual(m.find(b'foo'), PAGESIZE)
47
48        self.assertEqual(len(m), 2*PAGESIZE)
49
50        self.assertEqual(m[0], 0)
51        self.assertEqual(m[0:3], b'\0\0\0')
52
53        # Shouldn't crash on boundary (Issue #5292)
54        self.assertRaises(IndexError, m.__getitem__, len(m))
55        self.assertRaises(IndexError, m.__setitem__, len(m), b'\0')
56
57        # Modify the file's content
58        m[0] = b'3'[0]
59        m[PAGESIZE +3: PAGESIZE +3+3] = b'bar'
60
61        # Check that the modification worked
62        self.assertEqual(m[0], b'3'[0])
63        self.assertEqual(m[0:3], b'3\0\0')
64        self.assertEqual(m[PAGESIZE-1 : PAGESIZE + 7], b'\0foobar\0')
65
66        m.flush()
67
68        # Test doing a regular expression match in an mmap'ed file
69        match = re.search(b'[A-Za-z]+', m)
70        if match is None:
71            self.fail('regex match on mmap failed!')
72        else:
73            start, end = match.span(0)
74            length = end - start
75
76            self.assertEqual(start, PAGESIZE)
77            self.assertEqual(end, PAGESIZE + 6)
78
79        # test seeking around (try to overflow the seek implementation)
80        m.seek(0,0)
81        self.assertEqual(m.tell(), 0)
82        m.seek(42,1)
83        self.assertEqual(m.tell(), 42)
84        m.seek(0,2)
85        self.assertEqual(m.tell(), len(m))
86
87        # Try to seek to negative position...
88        self.assertRaises(ValueError, m.seek, -1)
89
90        # Try to seek beyond end of mmap...
91        self.assertRaises(ValueError, m.seek, 1, 2)
92
93        # Try to seek to negative position...
94        self.assertRaises(ValueError, m.seek, -len(m)-1, 2)
95
96        # Try resizing map
97        try:
98            m.resize(512)
99        except SystemError:
100            # resize() not supported
101            # No messages are printed, since the output of this test suite
102            # would then be different across platforms.
103            pass
104        else:
105            # resize() is supported
106            self.assertEqual(len(m), 512)
107            # Check that we can no longer seek beyond the new size.
108            self.assertRaises(ValueError, m.seek, 513, 0)
109
110            # Check that the underlying file is truncated too
111            # (bug #728515)
112            f = open(TESTFN, 'rb')
113            try:
114                f.seek(0, 2)
115                self.assertEqual(f.tell(), 512)
116            finally:
117                f.close()
118            self.assertEqual(m.size(), 512)
119
120        m.close()
121
122    def test_access_parameter(self):
123        # Test for "access" keyword parameter
124        mapsize = 10
125        with open(TESTFN, "wb") as fp:
126            fp.write(b"a"*mapsize)
127        with open(TESTFN, "rb") as f:
128            m = mmap.mmap(f.fileno(), mapsize, access=mmap.ACCESS_READ)
129            self.assertEqual(m[:], b'a'*mapsize, "Readonly memory map data incorrect.")
130
131            # Ensuring that readonly mmap can't be slice assigned
132            try:
133                m[:] = b'b'*mapsize
134            except TypeError:
135                pass
136            else:
137                self.fail("Able to write to readonly memory map")
138
139            # Ensuring that readonly mmap can't be item assigned
140            try:
141                m[0] = b'b'
142            except TypeError:
143                pass
144            else:
145                self.fail("Able to write to readonly memory map")
146
147            # Ensuring that readonly mmap can't be write() to
148            try:
149                m.seek(0,0)
150                m.write(b'abc')
151            except TypeError:
152                pass
153            else:
154                self.fail("Able to write to readonly memory map")
155
156            # Ensuring that readonly mmap can't be write_byte() to
157            try:
158                m.seek(0,0)
159                m.write_byte(b'd')
160            except TypeError:
161                pass
162            else:
163                self.fail("Able to write to readonly memory map")
164
165            # Ensuring that readonly mmap can't be resized
166            try:
167                m.resize(2*mapsize)
168            except SystemError:   # resize is not universally supported
169                pass
170            except TypeError:
171                pass
172            else:
173                self.fail("Able to resize readonly memory map")
174            with open(TESTFN, "rb") as fp:
175                self.assertEqual(fp.read(), b'a'*mapsize,
176                                 "Readonly memory map data file was modified")
177
178        # Opening mmap with size too big
179        with open(TESTFN, "r+b") as f:
180            try:
181                m = mmap.mmap(f.fileno(), mapsize+1)
182            except ValueError:
183                # we do not expect a ValueError on Windows
184                # CAUTION:  This also changes the size of the file on disk, and
185                # later tests assume that the length hasn't changed.  We need to
186                # repair that.
187                if sys.platform.startswith('win'):
188                    self.fail("Opening mmap with size+1 should work on Windows.")
189            else:
190                # we expect a ValueError on Unix, but not on Windows
191                if not sys.platform.startswith('win'):
192                    self.fail("Opening mmap with size+1 should raise ValueError.")
193                m.close()
194            if sys.platform.startswith('win'):
195                # Repair damage from the resizing test.
196                with open(TESTFN, 'r+b') as f:
197                    f.truncate(mapsize)
198
199        # Opening mmap with access=ACCESS_WRITE
200        with open(TESTFN, "r+b") as f:
201            m = mmap.mmap(f.fileno(), mapsize, access=mmap.ACCESS_WRITE)
202            # Modifying write-through memory map
203            m[:] = b'c'*mapsize
204            self.assertEqual(m[:], b'c'*mapsize,
205                   "Write-through memory map memory not updated properly.")
206            m.flush()
207            m.close()
208        with open(TESTFN, 'rb') as f:
209            stuff = f.read()
210        self.assertEqual(stuff, b'c'*mapsize,
211               "Write-through memory map data file not updated properly.")
212
213        # Opening mmap with access=ACCESS_COPY
214        with open(TESTFN, "r+b") as f:
215            m = mmap.mmap(f.fileno(), mapsize, access=mmap.ACCESS_COPY)
216            # Modifying copy-on-write memory map
217            m[:] = b'd'*mapsize
218            self.assertEqual(m[:], b'd' * mapsize,
219                             "Copy-on-write memory map data not written correctly.")
220            m.flush()
221            with open(TESTFN, "rb") as fp:
222                self.assertEqual(fp.read(), b'c'*mapsize,
223                                 "Copy-on-write test data file should not be modified.")
224            # Ensuring copy-on-write maps cannot be resized
225            self.assertRaises(TypeError, m.resize, 2*mapsize)
226            m.close()
227
228        # Ensuring invalid access parameter raises exception
229        with open(TESTFN, "r+b") as f:
230            self.assertRaises(ValueError, mmap.mmap, f.fileno(), mapsize, access=4)
231
232        if os.name == "posix":
233            # Try incompatible flags, prot and access parameters.
234            with open(TESTFN, "r+b") as f:
235                self.assertRaises(ValueError, mmap.mmap, f.fileno(), mapsize,
236                                  flags=mmap.MAP_PRIVATE,
237                                  prot=mmap.PROT_READ, access=mmap.ACCESS_WRITE)
238
239            # Try writing with PROT_EXEC and without PROT_WRITE
240            prot = mmap.PROT_READ | getattr(mmap, 'PROT_EXEC', 0)
241            with open(TESTFN, "r+b") as f:
242                m = mmap.mmap(f.fileno(), mapsize, prot=prot)
243                self.assertRaises(TypeError, m.write, b"abcdef")
244                self.assertRaises(TypeError, m.write_byte, 0)
245                m.close()
246
247    def test_bad_file_desc(self):
248        # Try opening a bad file descriptor...
249        self.assertRaises(OSError, mmap.mmap, -2, 4096)
250
251    def test_tougher_find(self):
252        # Do a tougher .find() test.  SF bug 515943 pointed out that, in 2.2,
253        # searching for data with embedded \0 bytes didn't work.
254        with open(TESTFN, 'wb+') as f:
255
256            data = b'aabaac\x00deef\x00\x00aa\x00'
257            n = len(data)
258            f.write(data)
259            f.flush()
260            m = mmap.mmap(f.fileno(), n)
261
262        for start in range(n+1):
263            for finish in range(start, n+1):
264                slice = data[start : finish]
265                self.assertEqual(m.find(slice), data.find(slice))
266                self.assertEqual(m.find(slice + b'x'), -1)
267        m.close()
268
269    def test_find_end(self):
270        # test the new 'end' parameter works as expected
271        f = open(TESTFN, 'wb+')
272        data = b'one two ones'
273        n = len(data)
274        f.write(data)
275        f.flush()
276        m = mmap.mmap(f.fileno(), n)
277        f.close()
278
279        self.assertEqual(m.find(b'one'), 0)
280        self.assertEqual(m.find(b'ones'), 8)
281        self.assertEqual(m.find(b'one', 0, -1), 0)
282        self.assertEqual(m.find(b'one', 1), 8)
283        self.assertEqual(m.find(b'one', 1, -1), 8)
284        self.assertEqual(m.find(b'one', 1, -2), -1)
285        self.assertEqual(m.find(bytearray(b'one')), 0)
286
287
288    def test_rfind(self):
289        # test the new 'end' parameter works as expected
290        f = open(TESTFN, 'wb+')
291        data = b'one two ones'
292        n = len(data)
293        f.write(data)
294        f.flush()
295        m = mmap.mmap(f.fileno(), n)
296        f.close()
297
298        self.assertEqual(m.rfind(b'one'), 8)
299        self.assertEqual(m.rfind(b'one '), 0)
300        self.assertEqual(m.rfind(b'one', 0, -1), 8)
301        self.assertEqual(m.rfind(b'one', 0, -2), 0)
302        self.assertEqual(m.rfind(b'one', 1, -1), 8)
303        self.assertEqual(m.rfind(b'one', 1, -2), -1)
304        self.assertEqual(m.rfind(bytearray(b'one')), 8)
305
306
307    def test_double_close(self):
308        # make sure a double close doesn't crash on Solaris (Bug# 665913)
309        f = open(TESTFN, 'wb+')
310
311        f.write(2**16 * b'a') # Arbitrary character
312        f.close()
313
314        f = open(TESTFN, 'rb')
315        mf = mmap.mmap(f.fileno(), 2**16, access=mmap.ACCESS_READ)
316        mf.close()
317        mf.close()
318        f.close()
319
320    @unittest.skipUnless(hasattr(os, "stat"), "needs os.stat()")
321    def test_entire_file(self):
322        # test mapping of entire file by passing 0 for map length
323        f = open(TESTFN, "wb+")
324
325        f.write(2**16 * b'm') # Arbitrary character
326        f.close()
327
328        f = open(TESTFN, "rb+")
329        mf = mmap.mmap(f.fileno(), 0)
330        self.assertEqual(len(mf), 2**16, "Map size should equal file size.")
331        self.assertEqual(mf.read(2**16), 2**16 * b"m")
332        mf.close()
333        f.close()
334
335    @unittest.skipUnless(hasattr(os, "stat"), "needs os.stat()")
336    def test_length_0_offset(self):
337        # Issue #10916: test mapping of remainder of file by passing 0 for
338        # map length with an offset doesn't cause a segfault.
339        # NOTE: allocation granularity is currently 65536 under Win64,
340        # and therefore the minimum offset alignment.
341        with open(TESTFN, "wb") as f:
342            f.write((65536 * 2) * b'm') # Arbitrary character
343
344        with open(TESTFN, "rb") as f:
345            with mmap.mmap(f.fileno(), 0, offset=65536, access=mmap.ACCESS_READ) as mf:
346                self.assertRaises(IndexError, mf.__getitem__, 80000)
347
348    @unittest.skipUnless(hasattr(os, "stat"), "needs os.stat()")
349    def test_length_0_large_offset(self):
350        # Issue #10959: test mapping of a file by passing 0 for
351        # map length with a large offset doesn't cause a segfault.
352        with open(TESTFN, "wb") as f:
353            f.write(115699 * b'm') # Arbitrary character
354
355        with open(TESTFN, "w+b") as f:
356            self.assertRaises(ValueError, mmap.mmap, f.fileno(), 0,
357                              offset=2147418112)
358
359    def test_move(self):
360        # make move works everywhere (64-bit format problem earlier)
361        f = open(TESTFN, 'wb+')
362
363        f.write(b"ABCDEabcde") # Arbitrary character
364        f.flush()
365
366        mf = mmap.mmap(f.fileno(), 10)
367        mf.move(5, 0, 5)
368        self.assertEqual(mf[:], b"ABCDEABCDE", "Map move should have duplicated front 5")
369        mf.close()
370        f.close()
371
372        # more excessive test
373        data = b"0123456789"
374        for dest in range(len(data)):
375            for src in range(len(data)):
376                for count in range(len(data) - max(dest, src)):
377                    expected = data[:dest] + data[src:src+count] + data[dest+count:]
378                    m = mmap.mmap(-1, len(data))
379                    m[:] = data
380                    m.move(dest, src, count)
381                    self.assertEqual(m[:], expected)
382                    m.close()
383
384        # segfault test (Issue 5387)
385        m = mmap.mmap(-1, 100)
386        offsets = [-100, -1, 0, 1, 100]
387        for source, dest, size in itertools.product(offsets, offsets, offsets):
388            try:
389                m.move(source, dest, size)
390            except ValueError:
391                pass
392
393        offsets = [(-1, -1, -1), (-1, -1, 0), (-1, 0, -1), (0, -1, -1),
394                   (-1, 0, 0), (0, -1, 0), (0, 0, -1)]
395        for source, dest, size in offsets:
396            self.assertRaises(ValueError, m.move, source, dest, size)
397
398        m.close()
399
400        m = mmap.mmap(-1, 1) # single byte
401        self.assertRaises(ValueError, m.move, 0, 0, 2)
402        self.assertRaises(ValueError, m.move, 1, 0, 1)
403        self.assertRaises(ValueError, m.move, 0, 1, 1)
404        m.move(0, 0, 1)
405        m.move(0, 0, 0)
406
407
408    def test_anonymous(self):
409        # anonymous mmap.mmap(-1, PAGE)
410        m = mmap.mmap(-1, PAGESIZE)
411        for x in range(PAGESIZE):
412            self.assertEqual(m[x], 0,
413                             "anonymously mmap'ed contents should be zero")
414
415        for x in range(PAGESIZE):
416            b = x & 0xff
417            m[x] = b
418            self.assertEqual(m[x], b)
419
420    def test_read_all(self):
421        m = mmap.mmap(-1, 16)
422        self.addCleanup(m.close)
423
424        # With no parameters, or None or a negative argument, reads all
425        m.write(bytes(range(16)))
426        m.seek(0)
427        self.assertEqual(m.read(), bytes(range(16)))
428        m.seek(8)
429        self.assertEqual(m.read(), bytes(range(8, 16)))
430        m.seek(16)
431        self.assertEqual(m.read(), b'')
432        m.seek(3)
433        self.assertEqual(m.read(None), bytes(range(3, 16)))
434        m.seek(4)
435        self.assertEqual(m.read(-1), bytes(range(4, 16)))
436        m.seek(5)
437        self.assertEqual(m.read(-2), bytes(range(5, 16)))
438        m.seek(9)
439        self.assertEqual(m.read(-42), bytes(range(9, 16)))
440
441    def test_read_invalid_arg(self):
442        m = mmap.mmap(-1, 16)
443        self.addCleanup(m.close)
444
445        self.assertRaises(TypeError, m.read, 'foo')
446        self.assertRaises(TypeError, m.read, 5.5)
447        self.assertRaises(TypeError, m.read, [1, 2, 3])
448
449    def test_extended_getslice(self):
450        # Test extended slicing by comparing with list slicing.
451        s = bytes(reversed(range(256)))
452        m = mmap.mmap(-1, len(s))
453        m[:] = s
454        self.assertEqual(m[:], s)
455        indices = (0, None, 1, 3, 19, 300, -1, -2, -31, -300)
456        for start in indices:
457            for stop in indices:
458                # Skip step 0 (invalid)
459                for step in indices[1:]:
460                    self.assertEqual(m[start:stop:step],
461                                     s[start:stop:step])
462
463    def test_extended_set_del_slice(self):
464        # Test extended slicing by comparing with list slicing.
465        s = bytes(reversed(range(256)))
466        m = mmap.mmap(-1, len(s))
467        indices = (0, None, 1, 3, 19, 300, -1, -2, -31, -300)
468        for start in indices:
469            for stop in indices:
470                # Skip invalid step 0
471                for step in indices[1:]:
472                    m[:] = s
473                    self.assertEqual(m[:], s)
474                    L = list(s)
475                    # Make sure we have a slice of exactly the right length,
476                    # but with different data.
477                    data = L[start:stop:step]
478                    data = bytes(reversed(data))
479                    L[start:stop:step] = data
480                    m[start:stop:step] = data
481                    self.assertEqual(m[:], bytes(L))
482
483    def make_mmap_file (self, f, halfsize):
484        # Write 2 pages worth of data to the file
485        f.write (b'\0' * halfsize)
486        f.write (b'foo')
487        f.write (b'\0' * (halfsize - 3))
488        f.flush ()
489        return mmap.mmap (f.fileno(), 0)
490
491    def test_empty_file (self):
492        f = open (TESTFN, 'w+b')
493        f.close()
494        with open(TESTFN, "rb") as f :
495            self.assertRaisesRegex(ValueError,
496                                   "cannot mmap an empty file",
497                                   mmap.mmap, f.fileno(), 0,
498                                   access=mmap.ACCESS_READ)
499
500    def test_offset (self):
501        f = open (TESTFN, 'w+b')
502
503        try: # unlink TESTFN no matter what
504            halfsize = mmap.ALLOCATIONGRANULARITY
505            m = self.make_mmap_file (f, halfsize)
506            m.close ()
507            f.close ()
508
509            mapsize = halfsize * 2
510            # Try invalid offset
511            f = open(TESTFN, "r+b")
512            for offset in [-2, -1, None]:
513                try:
514                    m = mmap.mmap(f.fileno(), mapsize, offset=offset)
515                    self.assertEqual(0, 1)
516                except (ValueError, TypeError, OverflowError):
517                    pass
518                else:
519                    self.assertEqual(0, 0)
520            f.close()
521
522            # Try valid offset, hopefully 8192 works on all OSes
523            f = open(TESTFN, "r+b")
524            m = mmap.mmap(f.fileno(), mapsize - halfsize, offset=halfsize)
525            self.assertEqual(m[0:3], b'foo')
526            f.close()
527
528            # Try resizing map
529            try:
530                m.resize(512)
531            except SystemError:
532                pass
533            else:
534                # resize() is supported
535                self.assertEqual(len(m), 512)
536                # Check that we can no longer seek beyond the new size.
537                self.assertRaises(ValueError, m.seek, 513, 0)
538                # Check that the content is not changed
539                self.assertEqual(m[0:3], b'foo')
540
541                # Check that the underlying file is truncated too
542                f = open(TESTFN, 'rb')
543                f.seek(0, 2)
544                self.assertEqual(f.tell(), halfsize + 512)
545                f.close()
546                self.assertEqual(m.size(), halfsize + 512)
547
548            m.close()
549
550        finally:
551            f.close()
552            try:
553                os.unlink(TESTFN)
554            except OSError:
555                pass
556
557    def test_subclass(self):
558        class anon_mmap(mmap.mmap):
559            def __new__(klass, *args, **kwargs):
560                return mmap.mmap.__new__(klass, -1, *args, **kwargs)
561        anon_mmap(PAGESIZE)
562
563    @unittest.skipUnless(hasattr(mmap, 'PROT_READ'), "needs mmap.PROT_READ")
564    def test_prot_readonly(self):
565        mapsize = 10
566        with open(TESTFN, "wb") as fp:
567            fp.write(b"a"*mapsize)
568        f = open(TESTFN, "rb")
569        m = mmap.mmap(f.fileno(), mapsize, prot=mmap.PROT_READ)
570        self.assertRaises(TypeError, m.write, "foo")
571        f.close()
572
573    def test_error(self):
574        self.assertIs(mmap.error, OSError)
575
576    def test_io_methods(self):
577        data = b"0123456789"
578        with open(TESTFN, "wb") as fp:
579            fp.write(b"x"*len(data))
580        f = open(TESTFN, "r+b")
581        m = mmap.mmap(f.fileno(), len(data))
582        f.close()
583        # Test write_byte()
584        for i in range(len(data)):
585            self.assertEqual(m.tell(), i)
586            m.write_byte(data[i])
587            self.assertEqual(m.tell(), i+1)
588        self.assertRaises(ValueError, m.write_byte, b"x"[0])
589        self.assertEqual(m[:], data)
590        # Test read_byte()
591        m.seek(0)
592        for i in range(len(data)):
593            self.assertEqual(m.tell(), i)
594            self.assertEqual(m.read_byte(), data[i])
595            self.assertEqual(m.tell(), i+1)
596        self.assertRaises(ValueError, m.read_byte)
597        # Test read()
598        m.seek(3)
599        self.assertEqual(m.read(3), b"345")
600        self.assertEqual(m.tell(), 6)
601        # Test write()
602        m.seek(3)
603        m.write(b"bar")
604        self.assertEqual(m.tell(), 6)
605        self.assertEqual(m[:], b"012bar6789")
606        m.write(bytearray(b"baz"))
607        self.assertEqual(m.tell(), 9)
608        self.assertEqual(m[:], b"012barbaz9")
609        self.assertRaises(ValueError, m.write, b"ba")
610
611    def test_non_ascii_byte(self):
612        for b in (129, 200, 255): # > 128
613            m = mmap.mmap(-1, 1)
614            m.write_byte(b)
615            self.assertEqual(m[0], b)
616            m.seek(0)
617            self.assertEqual(m.read_byte(), b)
618            m.close()
619
620    @unittest.skipUnless(os.name == 'nt', 'requires Windows')
621    def test_tagname(self):
622        data1 = b"0123456789"
623        data2 = b"abcdefghij"
624        assert len(data1) == len(data2)
625
626        # Test same tag
627        m1 = mmap.mmap(-1, len(data1), tagname="foo")
628        m1[:] = data1
629        m2 = mmap.mmap(-1, len(data2), tagname="foo")
630        m2[:] = data2
631        self.assertEqual(m1[:], data2)
632        self.assertEqual(m2[:], data2)
633        m2.close()
634        m1.close()
635
636        # Test different tag
637        m1 = mmap.mmap(-1, len(data1), tagname="foo")
638        m1[:] = data1
639        m2 = mmap.mmap(-1, len(data2), tagname="boo")
640        m2[:] = data2
641        self.assertEqual(m1[:], data1)
642        self.assertEqual(m2[:], data2)
643        m2.close()
644        m1.close()
645
646    @cpython_only
647    @unittest.skipUnless(os.name == 'nt', 'requires Windows')
648    def test_sizeof(self):
649        m1 = mmap.mmap(-1, 100)
650        tagname = "foo"
651        m2 = mmap.mmap(-1, 100, tagname=tagname)
652        self.assertEqual(sys.getsizeof(m2),
653                         sys.getsizeof(m1) + len(tagname) + 1)
654
655    @unittest.skipUnless(os.name == 'nt', 'requires Windows')
656    def test_crasher_on_windows(self):
657        # Should not crash (Issue 1733986)
658        m = mmap.mmap(-1, 1000, tagname="foo")
659        try:
660            mmap.mmap(-1, 5000, tagname="foo")[:] # same tagname, but larger size
661        except:
662            pass
663        m.close()
664
665        # Should not crash (Issue 5385)
666        with open(TESTFN, "wb") as fp:
667            fp.write(b"x"*10)
668        f = open(TESTFN, "r+b")
669        m = mmap.mmap(f.fileno(), 0)
670        f.close()
671        try:
672            m.resize(0) # will raise OSError
673        except:
674            pass
675        try:
676            m[:]
677        except:
678            pass
679        m.close()
680
681    @unittest.skipUnless(os.name == 'nt', 'requires Windows')
682    def test_invalid_descriptor(self):
683        # socket file descriptors are valid, but out of range
684        # for _get_osfhandle, causing a crash when validating the
685        # parameters to _get_osfhandle.
686        s = socket.socket()
687        try:
688            with self.assertRaises(OSError):
689                m = mmap.mmap(s.fileno(), 10)
690        finally:
691            s.close()
692
693    def test_context_manager(self):
694        with mmap.mmap(-1, 10) as m:
695            self.assertFalse(m.closed)
696        self.assertTrue(m.closed)
697
698    def test_context_manager_exception(self):
699        # Test that the OSError gets passed through
700        with self.assertRaises(Exception) as exc:
701            with mmap.mmap(-1, 10) as m:
702                raise OSError
703        self.assertIsInstance(exc.exception, OSError,
704                              "wrong exception raised in context manager")
705        self.assertTrue(m.closed, "context manager failed")
706
707    def test_weakref(self):
708        # Check mmap objects are weakrefable
709        mm = mmap.mmap(-1, 16)
710        wr = weakref.ref(mm)
711        self.assertIs(wr(), mm)
712        del mm
713        gc_collect()
714        self.assertIs(wr(), None)
715
716    def test_write_returning_the_number_of_bytes_written(self):
717        mm = mmap.mmap(-1, 16)
718        self.assertEqual(mm.write(b""), 0)
719        self.assertEqual(mm.write(b"x"), 1)
720        self.assertEqual(mm.write(b"yz"), 2)
721        self.assertEqual(mm.write(b"python"), 6)
722
723    @unittest.skipIf(os.name == 'nt', 'cannot resize anonymous mmaps on Windows')
724    def test_resize_past_pos(self):
725        m = mmap.mmap(-1, 8192)
726        self.addCleanup(m.close)
727        m.read(5000)
728        try:
729            m.resize(4096)
730        except SystemError:
731            self.skipTest("resizing not supported")
732        self.assertEqual(m.read(14), b'')
733        self.assertRaises(ValueError, m.read_byte)
734        self.assertRaises(ValueError, m.write_byte, 42)
735        self.assertRaises(ValueError, m.write, b'abc')
736
737    def test_concat_repeat_exception(self):
738        m = mmap.mmap(-1, 16)
739        with self.assertRaises(TypeError):
740            m + m
741        with self.assertRaises(TypeError):
742            m * 2
743
744
745class LargeMmapTests(unittest.TestCase):
746
747    def setUp(self):
748        unlink(TESTFN)
749
750    def tearDown(self):
751        unlink(TESTFN)
752
753    def _make_test_file(self, num_zeroes, tail):
754        if sys.platform[:3] == 'win' or sys.platform == 'darwin':
755            requires('largefile',
756                'test requires %s bytes and a long time to run' % str(0x180000000))
757        f = open(TESTFN, 'w+b')
758        try:
759            f.seek(num_zeroes)
760            f.write(tail)
761            f.flush()
762        except (OSError, OverflowError, ValueError):
763            try:
764                f.close()
765            except (OSError, OverflowError):
766                pass
767            raise unittest.SkipTest("filesystem does not have largefile support")
768        return f
769
770    def test_large_offset(self):
771        with self._make_test_file(0x14FFFFFFF, b" ") as f:
772            with mmap.mmap(f.fileno(), 0, offset=0x140000000, access=mmap.ACCESS_READ) as m:
773                self.assertEqual(m[0xFFFFFFF], 32)
774
775    def test_large_filesize(self):
776        with self._make_test_file(0x17FFFFFFF, b" ") as f:
777            if sys.maxsize < 0x180000000:
778                # On 32 bit platforms the file is larger than sys.maxsize so
779                # mapping the whole file should fail -- Issue #16743
780                with self.assertRaises(OverflowError):
781                    mmap.mmap(f.fileno(), 0x180000000, access=mmap.ACCESS_READ)
782                with self.assertRaises(ValueError):
783                    mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
784            with mmap.mmap(f.fileno(), 0x10000, access=mmap.ACCESS_READ) as m:
785                self.assertEqual(m.size(), 0x180000000)
786
787    # Issue 11277: mmap() with large (~4 GiB) sparse files crashes on OS X.
788
789    def _test_around_boundary(self, boundary):
790        tail = b'  DEARdear  '
791        start = boundary - len(tail) // 2
792        end = start + len(tail)
793        with self._make_test_file(start, tail) as f:
794            with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as m:
795                self.assertEqual(m[start:end], tail)
796
797    @unittest.skipUnless(sys.maxsize > _4G, "test cannot run on 32-bit systems")
798    def test_around_2GB(self):
799        self._test_around_boundary(_2G)
800
801    @unittest.skipUnless(sys.maxsize > _4G, "test cannot run on 32-bit systems")
802    def test_around_4GB(self):
803        self._test_around_boundary(_4G)
804
805
806def test_main():
807    run_unittest(MmapTests, LargeMmapTests)
808
809if __name__ == '__main__':
810    test_main()
811