1__all__ = ('Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty')
2
3import collections
4import heapq
5
6from . import events
7from . import locks
8
9
10class QueueEmpty(Exception):
11    """Raised when Queue.get_nowait() is called on an empty Queue."""
12    pass
13
14
15class QueueFull(Exception):
16    """Raised when the Queue.put_nowait() method is called on a full Queue."""
17    pass
18
19
20class Queue:
21    """A queue, useful for coordinating producer and consumer coroutines.
22
23    If maxsize is less than or equal to zero, the queue size is infinite. If it
24    is an integer greater than 0, then "await put()" will block when the
25    queue reaches maxsize, until an item is removed by get().
26
27    Unlike the standard library Queue, you can reliably know this Queue's size
28    with qsize(), since your single-threaded asyncio application won't be
29    interrupted between calling qsize() and doing an operation on the Queue.
30    """
31
32    def __init__(self, maxsize=0, *, loop=None):
33        if loop is None:
34            self._loop = events.get_event_loop()
35        else:
36            self._loop = loop
37        self._maxsize = maxsize
38
39        # Futures.
40        self._getters = collections.deque()
41        # Futures.
42        self._putters = collections.deque()
43        self._unfinished_tasks = 0
44        self._finished = locks.Event(loop=self._loop)
45        self._finished.set()
46        self._init(maxsize)
47
48    # These three are overridable in subclasses.
49
50    def _init(self, maxsize):
51        self._queue = collections.deque()
52
53    def _get(self):
54        return self._queue.popleft()
55
56    def _put(self, item):
57        self._queue.append(item)
58
59    # End of the overridable methods.
60
61    def _wakeup_next(self, waiters):
62        # Wake up the next waiter (if any) that isn't cancelled.
63        while waiters:
64            waiter = waiters.popleft()
65            if not waiter.done():
66                waiter.set_result(None)
67                break
68
69    def __repr__(self):
70        return f'<{type(self).__name__} at {id(self):#x} {self._format()}>'
71
72    def __str__(self):
73        return f'<{type(self).__name__} {self._format()}>'
74
75    def _format(self):
76        result = f'maxsize={self._maxsize!r}'
77        if getattr(self, '_queue', None):
78            result += f' _queue={list(self._queue)!r}'
79        if self._getters:
80            result += f' _getters[{len(self._getters)}]'
81        if self._putters:
82            result += f' _putters[{len(self._putters)}]'
83        if self._unfinished_tasks:
84            result += f' tasks={self._unfinished_tasks}'
85        return result
86
87    def qsize(self):
88        """Number of items in the queue."""
89        return len(self._queue)
90
91    @property
92    def maxsize(self):
93        """Number of items allowed in the queue."""
94        return self._maxsize
95
96    def empty(self):
97        """Return True if the queue is empty, False otherwise."""
98        return not self._queue
99
100    def full(self):
101        """Return True if there are maxsize items in the queue.
102
103        Note: if the Queue was initialized with maxsize=0 (the default),
104        then full() is never True.
105        """
106        if self._maxsize <= 0:
107            return False
108        else:
109            return self.qsize() >= self._maxsize
110
111    async def put(self, item):
112        """Put an item into the queue.
113
114        Put an item into the queue. If the queue is full, wait until a free
115        slot is available before adding item.
116        """
117        while self.full():
118            putter = self._loop.create_future()
119            self._putters.append(putter)
120            try:
121                await putter
122            except:
123                putter.cancel()  # Just in case putter is not done yet.
124                try:
125                    # Clean self._putters from canceled putters.
126                    self._putters.remove(putter)
127                except ValueError:
128                    # The putter could be removed from self._putters by a
129                    # previous get_nowait call.
130                    pass
131                if not self.full() and not putter.cancelled():
132                    # We were woken up by get_nowait(), but can't take
133                    # the call.  Wake up the next in line.
134                    self._wakeup_next(self._putters)
135                raise
136        return self.put_nowait(item)
137
138    def put_nowait(self, item):
139        """Put an item into the queue without blocking.
140
141        If no free slot is immediately available, raise QueueFull.
142        """
143        if self.full():
144            raise QueueFull
145        self._put(item)
146        self._unfinished_tasks += 1
147        self._finished.clear()
148        self._wakeup_next(self._getters)
149
150    async def get(self):
151        """Remove and return an item from the queue.
152
153        If queue is empty, wait until an item is available.
154        """
155        while self.empty():
156            getter = self._loop.create_future()
157            self._getters.append(getter)
158            try:
159                await getter
160            except:
161                getter.cancel()  # Just in case getter is not done yet.
162                try:
163                    # Clean self._getters from canceled getters.
164                    self._getters.remove(getter)
165                except ValueError:
166                    # The getter could be removed from self._getters by a
167                    # previous put_nowait call.
168                    pass
169                if not self.empty() and not getter.cancelled():
170                    # We were woken up by put_nowait(), but can't take
171                    # the call.  Wake up the next in line.
172                    self._wakeup_next(self._getters)
173                raise
174        return self.get_nowait()
175
176    def get_nowait(self):
177        """Remove and return an item from the queue.
178
179        Return an item if one is immediately available, else raise QueueEmpty.
180        """
181        if self.empty():
182            raise QueueEmpty
183        item = self._get()
184        self._wakeup_next(self._putters)
185        return item
186
187    def task_done(self):
188        """Indicate that a formerly enqueued task is complete.
189
190        Used by queue consumers. For each get() used to fetch a task,
191        a subsequent call to task_done() tells the queue that the processing
192        on the task is complete.
193
194        If a join() is currently blocking, it will resume when all items have
195        been processed (meaning that a task_done() call was received for every
196        item that had been put() into the queue).
197
198        Raises ValueError if called more times than there were items placed in
199        the queue.
200        """
201        if self._unfinished_tasks <= 0:
202            raise ValueError('task_done() called too many times')
203        self._unfinished_tasks -= 1
204        if self._unfinished_tasks == 0:
205            self._finished.set()
206
207    async def join(self):
208        """Block until all items in the queue have been gotten and processed.
209
210        The count of unfinished tasks goes up whenever an item is added to the
211        queue. The count goes down whenever a consumer calls task_done() to
212        indicate that the item was retrieved and all work on it is complete.
213        When the count of unfinished tasks drops to zero, join() unblocks.
214        """
215        if self._unfinished_tasks > 0:
216            await self._finished.wait()
217
218
219class PriorityQueue(Queue):
220    """A subclass of Queue; retrieves entries in priority order (lowest first).
221
222    Entries are typically tuples of the form: (priority number, data).
223    """
224
225    def _init(self, maxsize):
226        self._queue = []
227
228    def _put(self, item, heappush=heapq.heappush):
229        heappush(self._queue, item)
230
231    def _get(self, heappop=heapq.heappop):
232        return heappop(self._queue)
233
234
235class LifoQueue(Queue):
236    """A subclass of Queue that retrieves most recently added entries first."""
237
238    def _init(self, maxsize):
239        self._queue = []
240
241    def _put(self, item):
242        self._queue.append(item)
243
244    def _get(self):
245        return self._queue.pop()
246