1import os 2import sys 3import threading 4 5from . import process 6from . import reduction 7 8__all__ = [] # things are copied from here to __init__.py 9 10# 11# Exceptions 12# 13 14class ProcessError(Exception): 15 pass 16 17class BufferTooShort(ProcessError): 18 pass 19 20class TimeoutError(ProcessError): 21 pass 22 23class AuthenticationError(ProcessError): 24 pass 25 26# 27# Base type for contexts 28# 29 30class BaseContext(object): 31 32 ProcessError = ProcessError 33 BufferTooShort = BufferTooShort 34 TimeoutError = TimeoutError 35 AuthenticationError = AuthenticationError 36 37 current_process = staticmethod(process.current_process) 38 active_children = staticmethod(process.active_children) 39 40 def cpu_count(self): 41 '''Returns the number of CPUs in the system''' 42 num = os.cpu_count() 43 if num is None: 44 raise NotImplementedError('cannot determine number of cpus') 45 else: 46 return num 47 48 def Manager(self): 49 '''Returns a manager associated with a running server process 50 51 The managers methods such as `Lock()`, `Condition()` and `Queue()` 52 can be used to create shared objects. 53 ''' 54 from .managers import SyncManager 55 m = SyncManager(ctx=self.get_context()) 56 m.start() 57 return m 58 59 def Pipe(self, duplex=True): 60 '''Returns two connection object connected by a pipe''' 61 from .connection import Pipe 62 return Pipe(duplex) 63 64 def Lock(self): 65 '''Returns a non-recursive lock object''' 66 from .synchronize import Lock 67 return Lock(ctx=self.get_context()) 68 69 def RLock(self): 70 '''Returns a recursive lock object''' 71 from .synchronize import RLock 72 return RLock(ctx=self.get_context()) 73 74 def Condition(self, lock=None): 75 '''Returns a condition object''' 76 from .synchronize import Condition 77 return Condition(lock, ctx=self.get_context()) 78 79 def Semaphore(self, value=1): 80 '''Returns a semaphore object''' 81 from .synchronize import Semaphore 82 return Semaphore(value, ctx=self.get_context()) 83 84 def BoundedSemaphore(self, value=1): 85 '''Returns a bounded semaphore object''' 86 from .synchronize import BoundedSemaphore 87 return BoundedSemaphore(value, ctx=self.get_context()) 88 89 def Event(self): 90 '''Returns an event object''' 91 from .synchronize import Event 92 return Event(ctx=self.get_context()) 93 94 def Barrier(self, parties, action=None, timeout=None): 95 '''Returns a barrier object''' 96 from .synchronize import Barrier 97 return Barrier(parties, action, timeout, ctx=self.get_context()) 98 99 def Queue(self, maxsize=0): 100 '''Returns a queue object''' 101 from .queues import Queue 102 return Queue(maxsize, ctx=self.get_context()) 103 104 def JoinableQueue(self, maxsize=0): 105 '''Returns a queue object''' 106 from .queues import JoinableQueue 107 return JoinableQueue(maxsize, ctx=self.get_context()) 108 109 def SimpleQueue(self): 110 '''Returns a queue object''' 111 from .queues import SimpleQueue 112 return SimpleQueue(ctx=self.get_context()) 113 114 def Pool(self, processes=None, initializer=None, initargs=(), 115 maxtasksperchild=None): 116 '''Returns a process pool object''' 117 from .pool import Pool 118 return Pool(processes, initializer, initargs, maxtasksperchild, 119 context=self.get_context()) 120 121 def RawValue(self, typecode_or_type, *args): 122 '''Returns a shared object''' 123 from .sharedctypes import RawValue 124 return RawValue(typecode_or_type, *args) 125 126 def RawArray(self, typecode_or_type, size_or_initializer): 127 '''Returns a shared array''' 128 from .sharedctypes import RawArray 129 return RawArray(typecode_or_type, size_or_initializer) 130 131 def Value(self, typecode_or_type, *args, lock=True): 132 '''Returns a synchronized shared object''' 133 from .sharedctypes import Value 134 return Value(typecode_or_type, *args, lock=lock, 135 ctx=self.get_context()) 136 137 def Array(self, typecode_or_type, size_or_initializer, *, lock=True): 138 '''Returns a synchronized shared array''' 139 from .sharedctypes import Array 140 return Array(typecode_or_type, size_or_initializer, lock=lock, 141 ctx=self.get_context()) 142 143 def freeze_support(self): 144 '''Check whether this is a fake forked process in a frozen executable. 145 If so then run code specified by commandline and exit. 146 ''' 147 if sys.platform == 'win32' and getattr(sys, 'frozen', False): 148 from .spawn import freeze_support 149 freeze_support() 150 151 def get_logger(self): 152 '''Return package logger -- if it does not already exist then 153 it is created. 154 ''' 155 from .util import get_logger 156 return get_logger() 157 158 def log_to_stderr(self, level=None): 159 '''Turn on logging and add a handler which prints to stderr''' 160 from .util import log_to_stderr 161 return log_to_stderr(level) 162 163 def allow_connection_pickling(self): 164 '''Install support for sending connections and sockets 165 between processes 166 ''' 167 # This is undocumented. In previous versions of multiprocessing 168 # its only effect was to make socket objects inheritable on Windows. 169 from . import connection 170 171 def set_executable(self, executable): 172 '''Sets the path to a python.exe or pythonw.exe binary used to run 173 child processes instead of sys.executable when using the 'spawn' 174 start method. Useful for people embedding Python. 175 ''' 176 from .spawn import set_executable 177 set_executable(executable) 178 179 def set_forkserver_preload(self, module_names): 180 '''Set list of module names to try to load in forkserver process. 181 This is really just a hint. 182 ''' 183 from .forkserver import set_forkserver_preload 184 set_forkserver_preload(module_names) 185 186 def get_context(self, method=None): 187 if method is None: 188 return self 189 try: 190 ctx = _concrete_contexts[method] 191 except KeyError: 192 raise ValueError('cannot find context for %r' % method) from None 193 ctx._check_available() 194 return ctx 195 196 def get_start_method(self, allow_none=False): 197 return self._name 198 199 def set_start_method(self, method, force=False): 200 raise ValueError('cannot set start method of concrete context') 201 202 @property 203 def reducer(self): 204 '''Controls how objects will be reduced to a form that can be 205 shared with other processes.''' 206 return globals().get('reduction') 207 208 @reducer.setter 209 def reducer(self, reduction): 210 globals()['reduction'] = reduction 211 212 def _check_available(self): 213 pass 214 215# 216# Type of default context -- underlying context can be set at most once 217# 218 219class Process(process.BaseProcess): 220 _start_method = None 221 @staticmethod 222 def _Popen(process_obj): 223 return _default_context.get_context().Process._Popen(process_obj) 224 225class DefaultContext(BaseContext): 226 Process = Process 227 228 def __init__(self, context): 229 self._default_context = context 230 self._actual_context = None 231 232 def get_context(self, method=None): 233 if method is None: 234 if self._actual_context is None: 235 self._actual_context = self._default_context 236 return self._actual_context 237 else: 238 return super().get_context(method) 239 240 def set_start_method(self, method, force=False): 241 if self._actual_context is not None and not force: 242 raise RuntimeError('context has already been set') 243 if method is None and force: 244 self._actual_context = None 245 return 246 self._actual_context = self.get_context(method) 247 248 def get_start_method(self, allow_none=False): 249 if self._actual_context is None: 250 if allow_none: 251 return None 252 self._actual_context = self._default_context 253 return self._actual_context._name 254 255 def get_all_start_methods(self): 256 if sys.platform == 'win32': 257 return ['spawn'] 258 else: 259 if reduction.HAVE_SEND_HANDLE: 260 return ['fork', 'spawn', 'forkserver'] 261 else: 262 return ['fork', 'spawn'] 263 264DefaultContext.__all__ = [x for x in dir(DefaultContext) if x[0] != '_'] 265 266# 267# Context types for fixed start method 268# 269 270if sys.platform != 'win32': 271 272 class ForkProcess(process.BaseProcess): 273 _start_method = 'fork' 274 @staticmethod 275 def _Popen(process_obj): 276 from .popen_fork import Popen 277 return Popen(process_obj) 278 279 class SpawnProcess(process.BaseProcess): 280 _start_method = 'spawn' 281 @staticmethod 282 def _Popen(process_obj): 283 from .popen_spawn_posix import Popen 284 return Popen(process_obj) 285 286 class ForkServerProcess(process.BaseProcess): 287 _start_method = 'forkserver' 288 @staticmethod 289 def _Popen(process_obj): 290 from .popen_forkserver import Popen 291 return Popen(process_obj) 292 293 class ForkContext(BaseContext): 294 _name = 'fork' 295 Process = ForkProcess 296 297 class SpawnContext(BaseContext): 298 _name = 'spawn' 299 Process = SpawnProcess 300 301 class ForkServerContext(BaseContext): 302 _name = 'forkserver' 303 Process = ForkServerProcess 304 def _check_available(self): 305 if not reduction.HAVE_SEND_HANDLE: 306 raise ValueError('forkserver start method not available') 307 308 _concrete_contexts = { 309 'fork': ForkContext(), 310 'spawn': SpawnContext(), 311 'forkserver': ForkServerContext(), 312 } 313 _default_context = DefaultContext(_concrete_contexts['fork']) 314 315else: 316 317 class SpawnProcess(process.BaseProcess): 318 _start_method = 'spawn' 319 @staticmethod 320 def _Popen(process_obj): 321 from .popen_spawn_win32 import Popen 322 return Popen(process_obj) 323 324 class SpawnContext(BaseContext): 325 _name = 'spawn' 326 Process = SpawnProcess 327 328 _concrete_contexts = { 329 'spawn': SpawnContext(), 330 } 331 _default_context = DefaultContext(_concrete_contexts['spawn']) 332 333# 334# Force the start method 335# 336 337def _force_start_method(method): 338 _default_context._actual_context = _concrete_contexts[method] 339 340# 341# Check that the current thread is spawning a child process 342# 343 344_tls = threading.local() 345 346def get_spawning_popen(): 347 return getattr(_tls, 'spawning_popen', None) 348 349def set_spawning_popen(popen): 350 _tls.spawning_popen = popen 351 352def assert_spawning(obj): 353 if get_spawning_popen() is None: 354 raise RuntimeError( 355 '%s objects should only be shared between processes' 356 ' through inheritance' % type(obj).__name__ 357 ) 358