1#
2# Analogue of `multiprocessing.connection` which uses queues instead of sockets
3#
4# multiprocessing/dummy/connection.py
5#
6# Copyright (c) 2006-2008, R Oudkerk
7# Licensed to PSF under a Contributor Agreement.
8#
9
10__all__ = [ 'Client', 'Listener', 'Pipe' ]
11
12from queue import Queue
13
14
15families = [None]
16
17
18class Listener(object):
19
20    def __init__(self, address=None, family=None, backlog=1):
21        self._backlog_queue = Queue(backlog)
22
23    def accept(self):
24        return Connection(*self._backlog_queue.get())
25
26    def close(self):
27        self._backlog_queue = None
28
29    address = property(lambda self: self._backlog_queue)
30
31    def __enter__(self):
32        return self
33
34    def __exit__(self, exc_type, exc_value, exc_tb):
35        self.close()
36
37
38def Client(address):
39    _in, _out = Queue(), Queue()
40    address.put((_out, _in))
41    return Connection(_in, _out)
42
43
44def Pipe(duplex=True):
45    a, b = Queue(), Queue()
46    return Connection(a, b), Connection(b, a)
47
48
49class Connection(object):
50
51    def __init__(self, _in, _out):
52        self._out = _out
53        self._in = _in
54        self.send = self.send_bytes = _out.put
55        self.recv = self.recv_bytes = _in.get
56
57    def poll(self, timeout=0.0):
58        if self._in.qsize() > 0:
59            return True
60        if timeout <= 0.0:
61            return False
62        with self._in.not_empty:
63            self._in.not_empty.wait(timeout)
64        return self._in.qsize() > 0
65
66    def close(self):
67        pass
68
69    def __enter__(self):
70        return self
71
72    def __exit__(self, exc_type, exc_value, exc_tb):
73        self.close()
74