1# 2# Module for starting a process object using os.fork() or CreateProcess() 3# 4# multiprocessing/forking.py 5# 6# Copyright (c) 2006-2008, R Oudkerk 7# All rights reserved. 8# 9# Redistribution and use in source and binary forms, with or without 10# modification, are permitted provided that the following conditions 11# are met: 12# 13# 1. Redistributions of source code must retain the above copyright 14# notice, this list of conditions and the following disclaimer. 15# 2. Redistributions in binary form must reproduce the above copyright 16# notice, this list of conditions and the following disclaimer in the 17# documentation and/or other materials provided with the distribution. 18# 3. Neither the name of author nor the names of any contributors may be 19# used to endorse or promote products derived from this software 20# without specific prior written permission. 21# 22# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND 23# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 24# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 25# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 26# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 27# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 28# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 29# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 30# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 31# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32# SUCH DAMAGE. 33# 34 35import os 36import sys 37import signal 38import errno 39 40from multiprocessing import util, process 41 42__all__ = ['Popen', 'assert_spawning', 'exit', 'duplicate', 'close', 'ForkingPickler'] 43 44# 45# Check that the current thread is spawning a child process 46# 47 48def assert_spawning(self): 49 if not Popen.thread_is_spawning(): 50 raise RuntimeError( 51 '%s objects should only be shared between processes' 52 ' through inheritance' % type(self).__name__ 53 ) 54 55# 56# Try making some callable types picklable 57# 58 59from pickle import Pickler 60class ForkingPickler(Pickler): 61 dispatch = Pickler.dispatch.copy() 62 63 @classmethod 64 def register(cls, type, reduce): 65 def dispatcher(self, obj): 66 rv = reduce(obj) 67 self.save_reduce(obj=obj, *rv) 68 cls.dispatch[type] = dispatcher 69 70def _reduce_method(m): 71 if m.im_self is None: 72 return getattr, (m.im_class, m.im_func.func_name) 73 else: 74 return getattr, (m.im_self, m.im_func.func_name) 75ForkingPickler.register(type(ForkingPickler.save), _reduce_method) 76 77def _reduce_method_descriptor(m): 78 return getattr, (m.__objclass__, m.__name__) 79ForkingPickler.register(type(list.append), _reduce_method_descriptor) 80ForkingPickler.register(type(int.__add__), _reduce_method_descriptor) 81 82#def _reduce_builtin_function_or_method(m): 83# return getattr, (m.__self__, m.__name__) 84#ForkingPickler.register(type(list().append), _reduce_builtin_function_or_method) 85#ForkingPickler.register(type(int().__add__), _reduce_builtin_function_or_method) 86 87try: 88 from functools import partial 89except ImportError: 90 pass 91else: 92 def _reduce_partial(p): 93 return _rebuild_partial, (p.func, p.args, p.keywords or {}) 94 def _rebuild_partial(func, args, keywords): 95 return partial(func, *args, **keywords) 96 ForkingPickler.register(partial, _reduce_partial) 97 98# 99# Unix 100# 101 102if sys.platform != 'win32': 103 import time 104 105 exit = os._exit 106 duplicate = os.dup 107 close = os.close 108 109 # 110 # We define a Popen class similar to the one from subprocess, but 111 # whose constructor takes a process object as its argument. 112 # 113 114 class Popen(object): 115 116 def __init__(self, process_obj): 117 sys.stdout.flush() 118 sys.stderr.flush() 119 self.returncode = None 120 121 self.pid = os.fork() 122 if self.pid == 0: 123 if 'random' in sys.modules: 124 import random 125 random.seed() 126 code = process_obj._bootstrap() 127 sys.stdout.flush() 128 sys.stderr.flush() 129 os._exit(code) 130 131 def poll(self, flag=os.WNOHANG): 132 if self.returncode is None: 133 while True: 134 try: 135 pid, sts = os.waitpid(self.pid, flag) 136 except os.error as e: 137 if e.errno == errno.EINTR: 138 continue 139 # Child process not yet created. See #1731717 140 # e.errno == errno.ECHILD == 10 141 return None 142 else: 143 break 144 if pid == self.pid: 145 if os.WIFSIGNALED(sts): 146 self.returncode = -os.WTERMSIG(sts) 147 else: 148 assert os.WIFEXITED(sts) 149 self.returncode = os.WEXITSTATUS(sts) 150 return self.returncode 151 152 def wait(self, timeout=None): 153 if timeout is None: 154 return self.poll(0) 155 deadline = time.time() + timeout 156 delay = 0.0005 157 while 1: 158 res = self.poll() 159 if res is not None: 160 break 161 remaining = deadline - time.time() 162 if remaining <= 0: 163 break 164 delay = min(delay * 2, remaining, 0.05) 165 time.sleep(delay) 166 return res 167 168 def terminate(self): 169 if self.returncode is None: 170 try: 171 os.kill(self.pid, signal.SIGTERM) 172 except OSError, e: 173 if self.wait(timeout=0.1) is None: 174 raise 175 176 @staticmethod 177 def thread_is_spawning(): 178 return False 179 180# 181# Windows 182# 183 184else: 185 import thread 186 import msvcrt 187 import _subprocess 188 import time 189 190 from _multiprocessing import win32, Connection, PipeConnection 191 from .util import Finalize 192 193 #try: 194 # from cPickle import dump, load, HIGHEST_PROTOCOL 195 #except ImportError: 196 from pickle import load, HIGHEST_PROTOCOL 197 198 def dump(obj, file, protocol=None): 199 ForkingPickler(file, protocol).dump(obj) 200 201 # 202 # 203 # 204 205 TERMINATE = 0x10000 206 WINEXE = (sys.platform == 'win32' and getattr(sys, 'frozen', False)) 207 WINSERVICE = sys.executable.lower().endswith("pythonservice.exe") 208 209 exit = win32.ExitProcess 210 close = win32.CloseHandle 211 212 # 213 # _python_exe is the assumed path to the python executable. 214 # People embedding Python want to modify it. 215 # 216 217 if WINSERVICE: 218 _python_exe = os.path.join(sys.exec_prefix, 'python.exe') 219 else: 220 _python_exe = sys.executable 221 222 def set_executable(exe): 223 global _python_exe 224 _python_exe = exe 225 226 # 227 # 228 # 229 230 def duplicate(handle, target_process=None, inheritable=False): 231 if target_process is None: 232 target_process = _subprocess.GetCurrentProcess() 233 return _subprocess.DuplicateHandle( 234 _subprocess.GetCurrentProcess(), handle, target_process, 235 0, inheritable, _subprocess.DUPLICATE_SAME_ACCESS 236 ).Detach() 237 238 # 239 # We define a Popen class similar to the one from subprocess, but 240 # whose constructor takes a process object as its argument. 241 # 242 243 class Popen(object): 244 ''' 245 Start a subprocess to run the code of a process object 246 ''' 247 _tls = thread._local() 248 249 def __init__(self, process_obj): 250 # create pipe for communication with child 251 rfd, wfd = os.pipe() 252 253 # get handle for read end of the pipe and make it inheritable 254 rhandle = duplicate(msvcrt.get_osfhandle(rfd), inheritable=True) 255 os.close(rfd) 256 257 # start process 258 cmd = get_command_line() + [rhandle] 259 cmd = ' '.join('"%s"' % x for x in cmd) 260 hp, ht, pid, tid = _subprocess.CreateProcess( 261 _python_exe, cmd, None, None, 1, 0, None, None, None 262 ) 263 ht.Close() 264 close(rhandle) 265 266 # set attributes of self 267 self.pid = pid 268 self.returncode = None 269 self._handle = hp 270 271 # send information to child 272 prep_data = get_preparation_data(process_obj._name) 273 to_child = os.fdopen(wfd, 'wb') 274 Popen._tls.process_handle = int(hp) 275 try: 276 dump(prep_data, to_child, HIGHEST_PROTOCOL) 277 dump(process_obj, to_child, HIGHEST_PROTOCOL) 278 finally: 279 del Popen._tls.process_handle 280 to_child.close() 281 282 @staticmethod 283 def thread_is_spawning(): 284 return getattr(Popen._tls, 'process_handle', None) is not None 285 286 @staticmethod 287 def duplicate_for_child(handle): 288 return duplicate(handle, Popen._tls.process_handle) 289 290 def wait(self, timeout=None): 291 if self.returncode is None: 292 if timeout is None: 293 msecs = _subprocess.INFINITE 294 else: 295 msecs = max(0, int(timeout * 1000 + 0.5)) 296 297 res = _subprocess.WaitForSingleObject(int(self._handle), msecs) 298 if res == _subprocess.WAIT_OBJECT_0: 299 code = _subprocess.GetExitCodeProcess(self._handle) 300 if code == TERMINATE: 301 code = -signal.SIGTERM 302 self.returncode = code 303 304 return self.returncode 305 306 def poll(self): 307 return self.wait(timeout=0) 308 309 def terminate(self): 310 if self.returncode is None: 311 try: 312 _subprocess.TerminateProcess(int(self._handle), TERMINATE) 313 except WindowsError: 314 if self.wait(timeout=0.1) is None: 315 raise 316 317 # 318 # 319 # 320 321 def is_forking(argv): 322 ''' 323 Return whether commandline indicates we are forking 324 ''' 325 if len(argv) >= 2 and argv[1] == '--multiprocessing-fork': 326 assert len(argv) == 3 327 return True 328 else: 329 return False 330 331 332 def freeze_support(): 333 ''' 334 Run code for process object if this in not the main process 335 ''' 336 if is_forking(sys.argv): 337 main() 338 sys.exit() 339 340 341 def get_command_line(): 342 ''' 343 Returns prefix of command line used for spawning a child process 344 ''' 345 if getattr(process.current_process(), '_inheriting', False): 346 raise RuntimeError(''' 347 Attempt to start a new process before the current process 348 has finished its bootstrapping phase. 349 350 This probably means that you are on Windows and you have 351 forgotten to use the proper idiom in the main module: 352 353 if __name__ == '__main__': 354 freeze_support() 355 ... 356 357 The "freeze_support()" line can be omitted if the program 358 is not going to be frozen to produce a Windows executable.''') 359 360 if getattr(sys, 'frozen', False): 361 return [sys.executable, '--multiprocessing-fork'] 362 else: 363 prog = 'from multiprocessing.forking import main; main()' 364 opts = util._args_from_interpreter_flags() 365 return [_python_exe] + opts + ['-c', prog, '--multiprocessing-fork'] 366 367 368 def main(): 369 ''' 370 Run code specified by data received over pipe 371 ''' 372 assert is_forking(sys.argv) 373 374 handle = int(sys.argv[-1]) 375 fd = msvcrt.open_osfhandle(handle, os.O_RDONLY) 376 from_parent = os.fdopen(fd, 'rb') 377 378 process.current_process()._inheriting = True 379 preparation_data = load(from_parent) 380 prepare(preparation_data) 381 self = load(from_parent) 382 process.current_process()._inheriting = False 383 384 from_parent.close() 385 386 exitcode = self._bootstrap() 387 exit(exitcode) 388 389 390 def get_preparation_data(name): 391 ''' 392 Return info about parent needed by child to unpickle process object 393 ''' 394 from .util import _logger, _log_to_stderr 395 396 d = dict( 397 name=name, 398 sys_path=sys.path, 399 sys_argv=sys.argv, 400 log_to_stderr=_log_to_stderr, 401 orig_dir=process.ORIGINAL_DIR, 402 authkey=process.current_process().authkey, 403 ) 404 405 if _logger is not None: 406 d['log_level'] = _logger.getEffectiveLevel() 407 408 if not WINEXE and not WINSERVICE and \ 409 not d['sys_argv'][0].lower().endswith('pythonservice.exe'): 410 main_path = getattr(sys.modules['__main__'], '__file__', None) 411 if not main_path and sys.argv[0] not in ('', '-c'): 412 main_path = sys.argv[0] 413 if main_path is not None: 414 if not os.path.isabs(main_path) and \ 415 process.ORIGINAL_DIR is not None: 416 main_path = os.path.join(process.ORIGINAL_DIR, main_path) 417 d['main_path'] = os.path.normpath(main_path) 418 419 return d 420 421 # 422 # Make (Pipe)Connection picklable 423 # 424 425 def reduce_connection(conn): 426 if not Popen.thread_is_spawning(): 427 raise RuntimeError( 428 'By default %s objects can only be shared between processes\n' 429 'using inheritance' % type(conn).__name__ 430 ) 431 return type(conn), (Popen.duplicate_for_child(conn.fileno()), 432 conn.readable, conn.writable) 433 434 ForkingPickler.register(Connection, reduce_connection) 435 ForkingPickler.register(PipeConnection, reduce_connection) 436 437# 438# Prepare current process 439# 440 441old_main_modules = [] 442 443def prepare(data): 444 ''' 445 Try to get current process ready to unpickle process object 446 ''' 447 old_main_modules.append(sys.modules['__main__']) 448 449 if 'name' in data: 450 process.current_process().name = data['name'] 451 452 if 'authkey' in data: 453 process.current_process()._authkey = data['authkey'] 454 455 if 'log_to_stderr' in data and data['log_to_stderr']: 456 util.log_to_stderr() 457 458 if 'log_level' in data: 459 util.get_logger().setLevel(data['log_level']) 460 461 if 'sys_path' in data: 462 sys.path = data['sys_path'] 463 464 if 'sys_argv' in data: 465 sys.argv = data['sys_argv'] 466 467 if 'dir' in data: 468 os.chdir(data['dir']) 469 470 if 'orig_dir' in data: 471 process.ORIGINAL_DIR = data['orig_dir'] 472 473 if 'main_path' in data: 474 # XXX (ncoghlan): The following code makes several bogus 475 # assumptions regarding the relationship between __file__ 476 # and a module's real name. See PEP 302 and issue #10845 477 # The problem is resolved properly in Python 3.4+, as 478 # described in issue #19946 479 480 main_path = data['main_path'] 481 main_name = os.path.splitext(os.path.basename(main_path))[0] 482 if main_name == '__init__': 483 main_name = os.path.basename(os.path.dirname(main_path)) 484 485 if main_name == '__main__': 486 # For directory and zipfile execution, we assume an implicit 487 # "if __name__ == '__main__':" around the module, and don't 488 # rerun the main module code in spawned processes 489 main_module = sys.modules['__main__'] 490 main_module.__file__ = main_path 491 elif main_name != 'ipython': 492 # Main modules not actually called __main__.py may 493 # contain additional code that should still be executed 494 import imp 495 496 if main_path is None: 497 dirs = None 498 elif os.path.basename(main_path).startswith('__init__.py'): 499 dirs = [os.path.dirname(os.path.dirname(main_path))] 500 else: 501 dirs = [os.path.dirname(main_path)] 502 503 assert main_name not in sys.modules, main_name 504 file, path_name, etc = imp.find_module(main_name, dirs) 505 try: 506 # We would like to do "imp.load_module('__main__', ...)" 507 # here. However, that would cause 'if __name__ == 508 # "__main__"' clauses to be executed. 509 main_module = imp.load_module( 510 '__parents_main__', file, path_name, etc 511 ) 512 finally: 513 if file: 514 file.close() 515 516 sys.modules['__main__'] = main_module 517 main_module.__name__ = '__main__' 518 519 # Try to make the potentially picklable objects in 520 # sys.modules['__main__'] realize they are in the main 521 # module -- somewhat ugly. 522 for obj in main_module.__dict__.values(): 523 try: 524 if obj.__module__ == '__parents_main__': 525 obj.__module__ = '__main__' 526 except Exception: 527 pass 528