1__all__ = ('Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty') 2 3import collections 4import heapq 5 6from . import events 7from . import locks 8 9 10class QueueEmpty(Exception): 11 """Raised when Queue.get_nowait() is called on an empty Queue.""" 12 pass 13 14 15class QueueFull(Exception): 16 """Raised when the Queue.put_nowait() method is called on a full Queue.""" 17 pass 18 19 20class Queue: 21 """A queue, useful for coordinating producer and consumer coroutines. 22 23 If maxsize is less than or equal to zero, the queue size is infinite. If it 24 is an integer greater than 0, then "await put()" will block when the 25 queue reaches maxsize, until an item is removed by get(). 26 27 Unlike the standard library Queue, you can reliably know this Queue's size 28 with qsize(), since your single-threaded asyncio application won't be 29 interrupted between calling qsize() and doing an operation on the Queue. 30 """ 31 32 def __init__(self, maxsize=0, *, loop=None): 33 if loop is None: 34 self._loop = events.get_event_loop() 35 else: 36 self._loop = loop 37 self._maxsize = maxsize 38 39 # Futures. 40 self._getters = collections.deque() 41 # Futures. 42 self._putters = collections.deque() 43 self._unfinished_tasks = 0 44 self._finished = locks.Event(loop=self._loop) 45 self._finished.set() 46 self._init(maxsize) 47 48 # These three are overridable in subclasses. 49 50 def _init(self, maxsize): 51 self._queue = collections.deque() 52 53 def _get(self): 54 return self._queue.popleft() 55 56 def _put(self, item): 57 self._queue.append(item) 58 59 # End of the overridable methods. 60 61 def _wakeup_next(self, waiters): 62 # Wake up the next waiter (if any) that isn't cancelled. 63 while waiters: 64 waiter = waiters.popleft() 65 if not waiter.done(): 66 waiter.set_result(None) 67 break 68 69 def __repr__(self): 70 return f'<{type(self).__name__} at {id(self):#x} {self._format()}>' 71 72 def __str__(self): 73 return f'<{type(self).__name__} {self._format()}>' 74 75 def _format(self): 76 result = f'maxsize={self._maxsize!r}' 77 if getattr(self, '_queue', None): 78 result += f' _queue={list(self._queue)!r}' 79 if self._getters: 80 result += f' _getters[{len(self._getters)}]' 81 if self._putters: 82 result += f' _putters[{len(self._putters)}]' 83 if self._unfinished_tasks: 84 result += f' tasks={self._unfinished_tasks}' 85 return result 86 87 def qsize(self): 88 """Number of items in the queue.""" 89 return len(self._queue) 90 91 @property 92 def maxsize(self): 93 """Number of items allowed in the queue.""" 94 return self._maxsize 95 96 def empty(self): 97 """Return True if the queue is empty, False otherwise.""" 98 return not self._queue 99 100 def full(self): 101 """Return True if there are maxsize items in the queue. 102 103 Note: if the Queue was initialized with maxsize=0 (the default), 104 then full() is never True. 105 """ 106 if self._maxsize <= 0: 107 return False 108 else: 109 return self.qsize() >= self._maxsize 110 111 async def put(self, item): 112 """Put an item into the queue. 113 114 Put an item into the queue. If the queue is full, wait until a free 115 slot is available before adding item. 116 """ 117 while self.full(): 118 putter = self._loop.create_future() 119 self._putters.append(putter) 120 try: 121 await putter 122 except: 123 putter.cancel() # Just in case putter is not done yet. 124 try: 125 # Clean self._putters from canceled putters. 126 self._putters.remove(putter) 127 except ValueError: 128 # The putter could be removed from self._putters by a 129 # previous get_nowait call. 130 pass 131 if not self.full() and not putter.cancelled(): 132 # We were woken up by get_nowait(), but can't take 133 # the call. Wake up the next in line. 134 self._wakeup_next(self._putters) 135 raise 136 return self.put_nowait(item) 137 138 def put_nowait(self, item): 139 """Put an item into the queue without blocking. 140 141 If no free slot is immediately available, raise QueueFull. 142 """ 143 if self.full(): 144 raise QueueFull 145 self._put(item) 146 self._unfinished_tasks += 1 147 self._finished.clear() 148 self._wakeup_next(self._getters) 149 150 async def get(self): 151 """Remove and return an item from the queue. 152 153 If queue is empty, wait until an item is available. 154 """ 155 while self.empty(): 156 getter = self._loop.create_future() 157 self._getters.append(getter) 158 try: 159 await getter 160 except: 161 getter.cancel() # Just in case getter is not done yet. 162 try: 163 # Clean self._getters from canceled getters. 164 self._getters.remove(getter) 165 except ValueError: 166 # The getter could be removed from self._getters by a 167 # previous put_nowait call. 168 pass 169 if not self.empty() and not getter.cancelled(): 170 # We were woken up by put_nowait(), but can't take 171 # the call. Wake up the next in line. 172 self._wakeup_next(self._getters) 173 raise 174 return self.get_nowait() 175 176 def get_nowait(self): 177 """Remove and return an item from the queue. 178 179 Return an item if one is immediately available, else raise QueueEmpty. 180 """ 181 if self.empty(): 182 raise QueueEmpty 183 item = self._get() 184 self._wakeup_next(self._putters) 185 return item 186 187 def task_done(self): 188 """Indicate that a formerly enqueued task is complete. 189 190 Used by queue consumers. For each get() used to fetch a task, 191 a subsequent call to task_done() tells the queue that the processing 192 on the task is complete. 193 194 If a join() is currently blocking, it will resume when all items have 195 been processed (meaning that a task_done() call was received for every 196 item that had been put() into the queue). 197 198 Raises ValueError if called more times than there were items placed in 199 the queue. 200 """ 201 if self._unfinished_tasks <= 0: 202 raise ValueError('task_done() called too many times') 203 self._unfinished_tasks -= 1 204 if self._unfinished_tasks == 0: 205 self._finished.set() 206 207 async def join(self): 208 """Block until all items in the queue have been gotten and processed. 209 210 The count of unfinished tasks goes up whenever an item is added to the 211 queue. The count goes down whenever a consumer calls task_done() to 212 indicate that the item was retrieved and all work on it is complete. 213 When the count of unfinished tasks drops to zero, join() unblocks. 214 """ 215 if self._unfinished_tasks > 0: 216 await self._finished.wait() 217 218 219class PriorityQueue(Queue): 220 """A subclass of Queue; retrieves entries in priority order (lowest first). 221 222 Entries are typically tuples of the form: (priority number, data). 223 """ 224 225 def _init(self, maxsize): 226 self._queue = [] 227 228 def _put(self, item, heappush=heapq.heappush): 229 heappush(self._queue, item) 230 231 def _get(self, heappop=heapq.heappop): 232 return heappop(self._queue) 233 234 235class LifoQueue(Queue): 236 """A subclass of Queue that retrieves most recently added entries first.""" 237 238 def _init(self, maxsize): 239 self._queue = [] 240 241 def _put(self, item): 242 self._queue.append(item) 243 244 def _get(self): 245 return self._queue.pop() 246