1.. currentmodule:: asyncio
2
3.. _asyncio-queues:
4
5======
6Queues
7======
8
9asyncio queues are designed to be similar to classes of the
10:mod:`queue` module.  Although asyncio queues are not thread-safe,
11they are designed to be used specifically in async/await code.
12
13Note that methods of asyncio queues don't have a *timeout* parameter;
14use :func:`asyncio.wait_for` function to do queue operations with a
15timeout.
16
17See also the `Examples`_ section below.
18
19Queue
20=====
21
22.. class:: Queue(maxsize=0, \*, loop=None)
23
24   A first in, first out (FIFO) queue.
25
26   If *maxsize* is less than or equal to zero, the queue size is
27   infinite.  If it is an integer greater than ``0``, then
28   ``await put()`` blocks when the queue reaches *maxsize*
29   until an item is removed by :meth:`get`.
30
31   Unlike the standard library threading :mod:`queue`, the size of
32   the queue is always known and can be returned by calling the
33   :meth:`qsize` method.
34
35   This class is :ref:`not thread safe <asyncio-multithreading>`.
36
37   .. attribute:: maxsize
38
39      Number of items allowed in the queue.
40
41   .. method:: empty()
42
43      Return ``True`` if the queue is empty, ``False`` otherwise.
44
45   .. method:: full()
46
47      Return ``True`` if there are :attr:`maxsize` items in the queue.
48
49      If the queue was initialized with ``maxsize=0`` (the default),
50      then :meth:`full()` never returns ``True``.
51
52   .. coroutinemethod:: get()
53
54      Remove and return an item from the queue. If queue is empty,
55      wait until an item is available.
56
57   .. method:: get_nowait()
58
59      Return an item if one is immediately available, else raise
60      :exc:`QueueEmpty`.
61
62   .. coroutinemethod:: join()
63
64      Block until all items in the queue have been received and processed.
65
66      The count of unfinished tasks goes up whenever an item is added
67      to the queue. The count goes down whenever a consumer coroutine calls
68      :meth:`task_done` to indicate that the item was retrieved and all
69      work on it is complete.  When the count of unfinished tasks drops
70      to zero, :meth:`join` unblocks.
71
72   .. coroutinemethod:: put(item)
73
74      Put an item into the queue. If the queue is full, wait until a
75      free slot is available before adding the item.
76
77   .. method:: put_nowait(item)
78
79      Put an item into the queue without blocking.
80
81      If no free slot is immediately available, raise :exc:`QueueFull`.
82
83   .. method:: qsize()
84
85      Return the number of items in the queue.
86
87   .. method:: task_done()
88
89      Indicate that a formerly enqueued task is complete.
90
91      Used by queue consumers. For each :meth:`~Queue.get` used to
92      fetch a task, a subsequent call to :meth:`task_done` tells the
93      queue that the processing on the task is complete.
94
95      If a :meth:`join` is currently blocking, it will resume when all
96      items have been processed (meaning that a :meth:`task_done`
97      call was received for every item that had been :meth:`~Queue.put`
98      into the queue).
99
100      Raises :exc:`ValueError` if called more times than there were
101      items placed in the queue.
102
103
104Priority Queue
105==============
106
107.. class:: PriorityQueue
108
109   A variant of :class:`Queue`; retrieves entries in priority order
110   (lowest first).
111
112   Entries are typically tuples of the form
113   ``(priority_number, data)``.
114
115
116LIFO Queue
117==========
118
119.. class:: LifoQueue
120
121   A variant of :class:`Queue` that retrieves most recently added
122   entries first (last in, first out).
123
124
125Exceptions
126==========
127
128.. exception:: QueueEmpty
129
130   This exception is raised when the :meth:`~Queue.get_nowait` method
131   is called on an empty queue.
132
133
134.. exception:: QueueFull
135
136   Exception raised when the :meth:`~Queue.put_nowait` method is called
137   on a queue that has reached its *maxsize*.
138
139
140Examples
141========
142
143.. _asyncio_example_queue_dist:
144
145Queues can be used to distribute workload between several
146concurrent tasks::
147
148   import asyncio
149   import random
150   import time
151
152
153   async def worker(name, queue):
154       while True:
155           # Get a "work item" out of the queue.
156           sleep_for = await queue.get()
157
158           # Sleep for the "sleep_for" seconds.
159           await asyncio.sleep(sleep_for)
160
161           # Notify the queue that the "work item" has been processed.
162           queue.task_done()
163
164           print(f'{name} has slept for {sleep_for:.2f} seconds')
165
166
167   async def main():
168       # Create a queue that we will use to store our "workload".
169       queue = asyncio.Queue()
170
171       # Generate random timings and put them into the queue.
172       total_sleep_time = 0
173       for _ in range(20):
174           sleep_for = random.uniform(0.05, 1.0)
175           total_sleep_time += sleep_for
176           queue.put_nowait(sleep_for)
177
178       # Create three worker tasks to process the queue concurrently.
179       tasks = []
180       for i in range(3):
181           task = asyncio.create_task(worker(f'worker-{i}', queue))
182           tasks.append(task)
183
184       # Wait until the queue is fully processed.
185       started_at = time.monotonic()
186       await queue.join()
187       total_slept_for = time.monotonic() - started_at
188
189       # Cancel our worker tasks.
190       for task in tasks:
191           task.cancel()
192       # Wait until all worker tasks are cancelled.
193       await asyncio.gather(*tasks, return_exceptions=True)
194
195       print('====')
196       print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
197       print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
198
199
200   asyncio.run(main())
201