1#
2# Module which supports allocation of memory from an mmap
3#
4# multiprocessing/heap.py
5#
6# Copyright (c) 2006-2008, R Oudkerk
7# Licensed to PSF under a Contributor Agreement.
8#
9
10import bisect
11import mmap
12import os
13import sys
14import tempfile
15import threading
16
17from .context import reduction, assert_spawning
18from . import util
19
20__all__ = ['BufferWrapper']
21
22#
23# Inheritable class which wraps an mmap, and from which blocks can be allocated
24#
25
26if sys.platform == 'win32':
27
28    import _winapi
29
30    class Arena(object):
31
32        _rand = tempfile._RandomNameSequence()
33
34        def __init__(self, size):
35            self.size = size
36            for i in range(100):
37                name = 'pym-%d-%s' % (os.getpid(), next(self._rand))
38                buf = mmap.mmap(-1, size, tagname=name)
39                if _winapi.GetLastError() == 0:
40                    break
41                # We have reopened a preexisting mmap.
42                buf.close()
43            else:
44                raise FileExistsError('Cannot find name for new mmap')
45            self.name = name
46            self.buffer = buf
47            self._state = (self.size, self.name)
48
49        def __getstate__(self):
50            assert_spawning(self)
51            return self._state
52
53        def __setstate__(self, state):
54            self.size, self.name = self._state = state
55            self.buffer = mmap.mmap(-1, self.size, tagname=self.name)
56            # XXX Temporarily preventing buildbot failures while determining
57            # XXX the correct long-term fix. See issue 23060
58            #assert _winapi.GetLastError() == _winapi.ERROR_ALREADY_EXISTS
59
60else:
61
62    class Arena(object):
63        if sys.platform == 'linux':
64            _dir_candidates = ['/dev/shm']
65        else:
66            _dir_candidates = []
67
68        def __init__(self, size, fd=-1):
69            self.size = size
70            self.fd = fd
71            if fd == -1:
72                self.fd, name = tempfile.mkstemp(
73                     prefix='pym-%d-'%os.getpid(),
74                     dir=self._choose_dir(size))
75                os.unlink(name)
76                util.Finalize(self, os.close, (self.fd,))
77                os.ftruncate(self.fd, size)
78            self.buffer = mmap.mmap(self.fd, self.size)
79
80        def _choose_dir(self, size):
81            # Choose a non-storage backed directory if possible,
82            # to improve performance
83            for d in self._dir_candidates:
84                st = os.statvfs(d)
85                if st.f_bavail * st.f_frsize >= size:  # enough free space?
86                    return d
87            return util.get_temp_dir()
88
89    def reduce_arena(a):
90        if a.fd == -1:
91            raise ValueError('Arena is unpicklable because '
92                             'forking was enabled when it was created')
93        return rebuild_arena, (a.size, reduction.DupFd(a.fd))
94
95    def rebuild_arena(size, dupfd):
96        return Arena(size, dupfd.detach())
97
98    reduction.register(Arena, reduce_arena)
99
100#
101# Class allowing allocation of chunks of memory from arenas
102#
103
104class Heap(object):
105
106    _alignment = 8
107
108    def __init__(self, size=mmap.PAGESIZE):
109        self._lastpid = os.getpid()
110        self._lock = threading.Lock()
111        self._size = size
112        self._lengths = []
113        self._len_to_seq = {}
114        self._start_to_block = {}
115        self._stop_to_block = {}
116        self._allocated_blocks = set()
117        self._arenas = []
118        # list of pending blocks to free - see free() comment below
119        self._pending_free_blocks = []
120
121    @staticmethod
122    def _roundup(n, alignment):
123        # alignment must be a power of 2
124        mask = alignment - 1
125        return (n + mask) & ~mask
126
127    def _malloc(self, size):
128        # returns a large enough block -- it might be much larger
129        i = bisect.bisect_left(self._lengths, size)
130        if i == len(self._lengths):
131            length = self._roundup(max(self._size, size), mmap.PAGESIZE)
132            self._size *= 2
133            util.info('allocating a new mmap of length %d', length)
134            arena = Arena(length)
135            self._arenas.append(arena)
136            return (arena, 0, length)
137        else:
138            length = self._lengths[i]
139            seq = self._len_to_seq[length]
140            block = seq.pop()
141            if not seq:
142                del self._len_to_seq[length], self._lengths[i]
143
144        (arena, start, stop) = block
145        del self._start_to_block[(arena, start)]
146        del self._stop_to_block[(arena, stop)]
147        return block
148
149    def _free(self, block):
150        # free location and try to merge with neighbours
151        (arena, start, stop) = block
152
153        try:
154            prev_block = self._stop_to_block[(arena, start)]
155        except KeyError:
156            pass
157        else:
158            start, _ = self._absorb(prev_block)
159
160        try:
161            next_block = self._start_to_block[(arena, stop)]
162        except KeyError:
163            pass
164        else:
165            _, stop = self._absorb(next_block)
166
167        block = (arena, start, stop)
168        length = stop - start
169
170        try:
171            self._len_to_seq[length].append(block)
172        except KeyError:
173            self._len_to_seq[length] = [block]
174            bisect.insort(self._lengths, length)
175
176        self._start_to_block[(arena, start)] = block
177        self._stop_to_block[(arena, stop)] = block
178
179    def _absorb(self, block):
180        # deregister this block so it can be merged with a neighbour
181        (arena, start, stop) = block
182        del self._start_to_block[(arena, start)]
183        del self._stop_to_block[(arena, stop)]
184
185        length = stop - start
186        seq = self._len_to_seq[length]
187        seq.remove(block)
188        if not seq:
189            del self._len_to_seq[length]
190            self._lengths.remove(length)
191
192        return start, stop
193
194    def _free_pending_blocks(self):
195        # Free all the blocks in the pending list - called with the lock held.
196        while True:
197            try:
198                block = self._pending_free_blocks.pop()
199            except IndexError:
200                break
201            self._allocated_blocks.remove(block)
202            self._free(block)
203
204    def free(self, block):
205        # free a block returned by malloc()
206        # Since free() can be called asynchronously by the GC, it could happen
207        # that it's called while self._lock is held: in that case,
208        # self._lock.acquire() would deadlock (issue #12352). To avoid that, a
209        # trylock is used instead, and if the lock can't be acquired
210        # immediately, the block is added to a list of blocks to be freed
211        # synchronously sometimes later from malloc() or free(), by calling
212        # _free_pending_blocks() (appending and retrieving from a list is not
213        # strictly thread-safe but under cPython it's atomic thanks to the GIL).
214        if os.getpid() != self._lastpid:
215            raise ValueError(
216                "My pid ({0:n}) is not last pid {1:n}".format(
217                    os.getpid(),self._lastpid))
218        if not self._lock.acquire(False):
219            # can't acquire the lock right now, add the block to the list of
220            # pending blocks to free
221            self._pending_free_blocks.append(block)
222        else:
223            # we hold the lock
224            try:
225                self._free_pending_blocks()
226                self._allocated_blocks.remove(block)
227                self._free(block)
228            finally:
229                self._lock.release()
230
231    def malloc(self, size):
232        # return a block of right size (possibly rounded up)
233        if size < 0:
234            raise ValueError("Size {0:n} out of range".format(size))
235        if sys.maxsize <= size:
236            raise OverflowError("Size {0:n} too large".format(size))
237        if os.getpid() != self._lastpid:
238            self.__init__()                     # reinitialize after fork
239        with self._lock:
240            self._free_pending_blocks()
241            size = self._roundup(max(size,1), self._alignment)
242            (arena, start, stop) = self._malloc(size)
243            new_stop = start + size
244            if new_stop < stop:
245                self._free((arena, new_stop, stop))
246            block = (arena, start, new_stop)
247            self._allocated_blocks.add(block)
248            return block
249
250#
251# Class representing a chunk of an mmap -- can be inherited by child process
252#
253
254class BufferWrapper(object):
255
256    _heap = Heap()
257
258    def __init__(self, size):
259        if size < 0:
260            raise ValueError("Size {0:n} out of range".format(size))
261        if sys.maxsize <= size:
262            raise OverflowError("Size {0:n} too large".format(size))
263        block = BufferWrapper._heap.malloc(size)
264        self._state = (block, size)
265        util.Finalize(self, BufferWrapper._heap.free, args=(block,))
266
267    def create_memoryview(self):
268        (arena, start, stop), size = self._state
269        return memoryview(arena.buffer)[start:start+size]
270