1.. currentmodule:: asyncio 2 3.. _asyncio-queues: 4 5====== 6Queues 7====== 8 9asyncio queues are designed to be similar to classes of the 10:mod:`queue` module. Although asyncio queues are not thread-safe, 11they are designed to be used specifically in async/await code. 12 13Note that methods of asyncio queues don't have a *timeout* parameter; 14use :func:`asyncio.wait_for` function to do queue operations with a 15timeout. 16 17See also the `Examples`_ section below. 18 19Queue 20===== 21 22.. class:: Queue(maxsize=0, \*, loop=None) 23 24 A first in, first out (FIFO) queue. 25 26 If *maxsize* is less than or equal to zero, the queue size is 27 infinite. If it is an integer greater than ``0``, then 28 ``await put()`` blocks when the queue reaches *maxsize* 29 until an item is removed by :meth:`get`. 30 31 Unlike the standard library threading :mod:`queue`, the size of 32 the queue is always known and can be returned by calling the 33 :meth:`qsize` method. 34 35 This class is :ref:`not thread safe <asyncio-multithreading>`. 36 37 .. attribute:: maxsize 38 39 Number of items allowed in the queue. 40 41 .. method:: empty() 42 43 Return ``True`` if the queue is empty, ``False`` otherwise. 44 45 .. method:: full() 46 47 Return ``True`` if there are :attr:`maxsize` items in the queue. 48 49 If the queue was initialized with ``maxsize=0`` (the default), 50 then :meth:`full()` never returns ``True``. 51 52 .. coroutinemethod:: get() 53 54 Remove and return an item from the queue. If queue is empty, 55 wait until an item is available. 56 57 .. method:: get_nowait() 58 59 Return an item if one is immediately available, else raise 60 :exc:`QueueEmpty`. 61 62 .. coroutinemethod:: join() 63 64 Block until all items in the queue have been received and processed. 65 66 The count of unfinished tasks goes up whenever an item is added 67 to the queue. The count goes down whenever a consumer coroutine calls 68 :meth:`task_done` to indicate that the item was retrieved and all 69 work on it is complete. When the count of unfinished tasks drops 70 to zero, :meth:`join` unblocks. 71 72 .. coroutinemethod:: put(item) 73 74 Put an item into the queue. If the queue is full, wait until a 75 free slot is available before adding the item. 76 77 .. method:: put_nowait(item) 78 79 Put an item into the queue without blocking. 80 81 If no free slot is immediately available, raise :exc:`QueueFull`. 82 83 .. method:: qsize() 84 85 Return the number of items in the queue. 86 87 .. method:: task_done() 88 89 Indicate that a formerly enqueued task is complete. 90 91 Used by queue consumers. For each :meth:`~Queue.get` used to 92 fetch a task, a subsequent call to :meth:`task_done` tells the 93 queue that the processing on the task is complete. 94 95 If a :meth:`join` is currently blocking, it will resume when all 96 items have been processed (meaning that a :meth:`task_done` 97 call was received for every item that had been :meth:`~Queue.put` 98 into the queue). 99 100 Raises :exc:`ValueError` if called more times than there were 101 items placed in the queue. 102 103 104Priority Queue 105============== 106 107.. class:: PriorityQueue 108 109 A variant of :class:`Queue`; retrieves entries in priority order 110 (lowest first). 111 112 Entries are typically tuples of the form 113 ``(priority_number, data)``. 114 115 116LIFO Queue 117========== 118 119.. class:: LifoQueue 120 121 A variant of :class:`Queue` that retrieves most recently added 122 entries first (last in, first out). 123 124 125Exceptions 126========== 127 128.. exception:: QueueEmpty 129 130 This exception is raised when the :meth:`~Queue.get_nowait` method 131 is called on an empty queue. 132 133 134.. exception:: QueueFull 135 136 Exception raised when the :meth:`~Queue.put_nowait` method is called 137 on a queue that has reached its *maxsize*. 138 139 140Examples 141======== 142 143.. _asyncio_example_queue_dist: 144 145Queues can be used to distribute workload between several 146concurrent tasks:: 147 148 import asyncio 149 import random 150 import time 151 152 153 async def worker(name, queue): 154 while True: 155 # Get a "work item" out of the queue. 156 sleep_for = await queue.get() 157 158 # Sleep for the "sleep_for" seconds. 159 await asyncio.sleep(sleep_for) 160 161 # Notify the queue that the "work item" has been processed. 162 queue.task_done() 163 164 print(f'{name} has slept for {sleep_for:.2f} seconds') 165 166 167 async def main(): 168 # Create a queue that we will use to store our "workload". 169 queue = asyncio.Queue() 170 171 # Generate random timings and put them into the queue. 172 total_sleep_time = 0 173 for _ in range(20): 174 sleep_for = random.uniform(0.05, 1.0) 175 total_sleep_time += sleep_for 176 queue.put_nowait(sleep_for) 177 178 # Create three worker tasks to process the queue concurrently. 179 tasks = [] 180 for i in range(3): 181 task = asyncio.create_task(worker(f'worker-{i}', queue)) 182 tasks.append(task) 183 184 # Wait until the queue is fully processed. 185 started_at = time.monotonic() 186 await queue.join() 187 total_slept_for = time.monotonic() - started_at 188 189 # Cancel our worker tasks. 190 for task in tasks: 191 task.cancel() 192 # Wait until all worker tasks are cancelled. 193 await asyncio.gather(*tasks, return_exceptions=True) 194 195 print('====') 196 print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds') 197 print(f'total expected sleep time: {total_sleep_time:.2f} seconds') 198 199 200 asyncio.run(main()) 201