1# 2# Analogue of `multiprocessing.connection` which uses queues instead of sockets 3# 4# multiprocessing/dummy/connection.py 5# 6# Copyright (c) 2006-2008, R Oudkerk 7# Licensed to PSF under a Contributor Agreement. 8# 9 10__all__ = [ 'Client', 'Listener', 'Pipe' ] 11 12from queue import Queue 13 14 15families = [None] 16 17 18class Listener(object): 19 20 def __init__(self, address=None, family=None, backlog=1): 21 self._backlog_queue = Queue(backlog) 22 23 def accept(self): 24 return Connection(*self._backlog_queue.get()) 25 26 def close(self): 27 self._backlog_queue = None 28 29 address = property(lambda self: self._backlog_queue) 30 31 def __enter__(self): 32 return self 33 34 def __exit__(self, exc_type, exc_value, exc_tb): 35 self.close() 36 37 38def Client(address): 39 _in, _out = Queue(), Queue() 40 address.put((_out, _in)) 41 return Connection(_in, _out) 42 43 44def Pipe(duplex=True): 45 a, b = Queue(), Queue() 46 return Connection(a, b), Connection(b, a) 47 48 49class Connection(object): 50 51 def __init__(self, _in, _out): 52 self._out = _out 53 self._in = _in 54 self.send = self.send_bytes = _out.put 55 self.recv = self.recv_bytes = _in.get 56 57 def poll(self, timeout=0.0): 58 if self._in.qsize() > 0: 59 return True 60 if timeout <= 0.0: 61 return False 62 with self._in.not_empty: 63 self._in.not_empty.wait(timeout) 64 return self._in.qsize() > 0 65 66 def close(self): 67 pass 68 69 def __enter__(self): 70 return self 71 72 def __exit__(self, exc_type, exc_value, exc_tb): 73 self.close() 74