1'''A multi-producer, multi-consumer queue.'''
2
3import threading
4from collections import deque
5from heapq import heappush, heappop
6from time import monotonic as time
7try:
8    from _queue import SimpleQueue
9except ImportError:
10    SimpleQueue = None
11
12__all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue', 'SimpleQueue']
13
14
15try:
16    from _queue import Empty
17except AttributeError:
18    class Empty(Exception):
19        'Exception raised by Queue.get(block=0)/get_nowait().'
20        pass
21
22class Full(Exception):
23    'Exception raised by Queue.put(block=0)/put_nowait().'
24    pass
25
26
27class Queue:
28    '''Create a queue object with a given maximum size.
29
30    If maxsize is <= 0, the queue size is infinite.
31    '''
32
33    def __init__(self, maxsize=0):
34        self.maxsize = maxsize
35        self._init(maxsize)
36
37        # mutex must be held whenever the queue is mutating.  All methods
38        # that acquire mutex must release it before returning.  mutex
39        # is shared between the three conditions, so acquiring and
40        # releasing the conditions also acquires and releases mutex.
41        self.mutex = threading.Lock()
42
43        # Notify not_empty whenever an item is added to the queue; a
44        # thread waiting to get is notified then.
45        self.not_empty = threading.Condition(self.mutex)
46
47        # Notify not_full whenever an item is removed from the queue;
48        # a thread waiting to put is notified then.
49        self.not_full = threading.Condition(self.mutex)
50
51        # Notify all_tasks_done whenever the number of unfinished tasks
52        # drops to zero; thread waiting to join() is notified to resume
53        self.all_tasks_done = threading.Condition(self.mutex)
54        self.unfinished_tasks = 0
55
56    def task_done(self):
57        '''Indicate that a formerly enqueued task is complete.
58
59        Used by Queue consumer threads.  For each get() used to fetch a task,
60        a subsequent call to task_done() tells the queue that the processing
61        on the task is complete.
62
63        If a join() is currently blocking, it will resume when all items
64        have been processed (meaning that a task_done() call was received
65        for every item that had been put() into the queue).
66
67        Raises a ValueError if called more times than there were items
68        placed in the queue.
69        '''
70        with self.all_tasks_done:
71            unfinished = self.unfinished_tasks - 1
72            if unfinished <= 0:
73                if unfinished < 0:
74                    raise ValueError('task_done() called too many times')
75                self.all_tasks_done.notify_all()
76            self.unfinished_tasks = unfinished
77
78    def join(self):
79        '''Blocks until all items in the Queue have been gotten and processed.
80
81        The count of unfinished tasks goes up whenever an item is added to the
82        queue. The count goes down whenever a consumer thread calls task_done()
83        to indicate the item was retrieved and all work on it is complete.
84
85        When the count of unfinished tasks drops to zero, join() unblocks.
86        '''
87        with self.all_tasks_done:
88            while self.unfinished_tasks:
89                self.all_tasks_done.wait()
90
91    def qsize(self):
92        '''Return the approximate size of the queue (not reliable!).'''
93        with self.mutex:
94            return self._qsize()
95
96    def empty(self):
97        '''Return True if the queue is empty, False otherwise (not reliable!).
98
99        This method is likely to be removed at some point.  Use qsize() == 0
100        as a direct substitute, but be aware that either approach risks a race
101        condition where a queue can grow before the result of empty() or
102        qsize() can be used.
103
104        To create code that needs to wait for all queued tasks to be
105        completed, the preferred technique is to use the join() method.
106        '''
107        with self.mutex:
108            return not self._qsize()
109
110    def full(self):
111        '''Return True if the queue is full, False otherwise (not reliable!).
112
113        This method is likely to be removed at some point.  Use qsize() >= n
114        as a direct substitute, but be aware that either approach risks a race
115        condition where a queue can shrink before the result of full() or
116        qsize() can be used.
117        '''
118        with self.mutex:
119            return 0 < self.maxsize <= self._qsize()
120
121    def put(self, item, block=True, timeout=None):
122        '''Put an item into the queue.
123
124        If optional args 'block' is true and 'timeout' is None (the default),
125        block if necessary until a free slot is available. If 'timeout' is
126        a non-negative number, it blocks at most 'timeout' seconds and raises
127        the Full exception if no free slot was available within that time.
128        Otherwise ('block' is false), put an item on the queue if a free slot
129        is immediately available, else raise the Full exception ('timeout'
130        is ignored in that case).
131        '''
132        with self.not_full:
133            if self.maxsize > 0:
134                if not block:
135                    if self._qsize() >= self.maxsize:
136                        raise Full
137                elif timeout is None:
138                    while self._qsize() >= self.maxsize:
139                        self.not_full.wait()
140                elif timeout < 0:
141                    raise ValueError("'timeout' must be a non-negative number")
142                else:
143                    endtime = time() + timeout
144                    while self._qsize() >= self.maxsize:
145                        remaining = endtime - time()
146                        if remaining <= 0.0:
147                            raise Full
148                        self.not_full.wait(remaining)
149            self._put(item)
150            self.unfinished_tasks += 1
151            self.not_empty.notify()
152
153    def get(self, block=True, timeout=None):
154        '''Remove and return an item from the queue.
155
156        If optional args 'block' is true and 'timeout' is None (the default),
157        block if necessary until an item is available. If 'timeout' is
158        a non-negative number, it blocks at most 'timeout' seconds and raises
159        the Empty exception if no item was available within that time.
160        Otherwise ('block' is false), return an item if one is immediately
161        available, else raise the Empty exception ('timeout' is ignored
162        in that case).
163        '''
164        with self.not_empty:
165            if not block:
166                if not self._qsize():
167                    raise Empty
168            elif timeout is None:
169                while not self._qsize():
170                    self.not_empty.wait()
171            elif timeout < 0:
172                raise ValueError("'timeout' must be a non-negative number")
173            else:
174                endtime = time() + timeout
175                while not self._qsize():
176                    remaining = endtime - time()
177                    if remaining <= 0.0:
178                        raise Empty
179                    self.not_empty.wait(remaining)
180            item = self._get()
181            self.not_full.notify()
182            return item
183
184    def put_nowait(self, item):
185        '''Put an item into the queue without blocking.
186
187        Only enqueue the item if a free slot is immediately available.
188        Otherwise raise the Full exception.
189        '''
190        return self.put(item, block=False)
191
192    def get_nowait(self):
193        '''Remove and return an item from the queue without blocking.
194
195        Only get an item if one is immediately available. Otherwise
196        raise the Empty exception.
197        '''
198        return self.get(block=False)
199
200    # Override these methods to implement other queue organizations
201    # (e.g. stack or priority queue).
202    # These will only be called with appropriate locks held
203
204    # Initialize the queue representation
205    def _init(self, maxsize):
206        self.queue = deque()
207
208    def _qsize(self):
209        return len(self.queue)
210
211    # Put a new item in the queue
212    def _put(self, item):
213        self.queue.append(item)
214
215    # Get an item from the queue
216    def _get(self):
217        return self.queue.popleft()
218
219
220class PriorityQueue(Queue):
221    '''Variant of Queue that retrieves open entries in priority order (lowest first).
222
223    Entries are typically tuples of the form:  (priority number, data).
224    '''
225
226    def _init(self, maxsize):
227        self.queue = []
228
229    def _qsize(self):
230        return len(self.queue)
231
232    def _put(self, item):
233        heappush(self.queue, item)
234
235    def _get(self):
236        return heappop(self.queue)
237
238
239class LifoQueue(Queue):
240    '''Variant of Queue that retrieves most recently added entries first.'''
241
242    def _init(self, maxsize):
243        self.queue = []
244
245    def _qsize(self):
246        return len(self.queue)
247
248    def _put(self, item):
249        self.queue.append(item)
250
251    def _get(self):
252        return self.queue.pop()
253
254
255class _PySimpleQueue:
256    '''Simple, unbounded FIFO queue.
257
258    This pure Python implementation is not reentrant.
259    '''
260    # Note: while this pure Python version provides fairness
261    # (by using a threading.Semaphore which is itself fair, being based
262    #  on threading.Condition), fairness is not part of the API contract.
263    # This allows the C version to use a different implementation.
264
265    def __init__(self):
266        self._queue = deque()
267        self._count = threading.Semaphore(0)
268
269    def put(self, item, block=True, timeout=None):
270        '''Put the item on the queue.
271
272        The optional 'block' and 'timeout' arguments are ignored, as this method
273        never blocks.  They are provided for compatibility with the Queue class.
274        '''
275        self._queue.append(item)
276        self._count.release()
277
278    def get(self, block=True, timeout=None):
279        '''Remove and return an item from the queue.
280
281        If optional args 'block' is true and 'timeout' is None (the default),
282        block if necessary until an item is available. If 'timeout' is
283        a non-negative number, it blocks at most 'timeout' seconds and raises
284        the Empty exception if no item was available within that time.
285        Otherwise ('block' is false), return an item if one is immediately
286        available, else raise the Empty exception ('timeout' is ignored
287        in that case).
288        '''
289        if timeout is not None and timeout < 0:
290            raise ValueError("'timeout' must be a non-negative number")
291        if not self._count.acquire(block, timeout):
292            raise Empty
293        return self._queue.popleft()
294
295    def put_nowait(self, item):
296        '''Put an item into the queue without blocking.
297
298        This is exactly equivalent to `put(item)` and is only provided
299        for compatibility with the Queue class.
300        '''
301        return self.put(item, block=False)
302
303    def get_nowait(self):
304        '''Remove and return an item from the queue without blocking.
305
306        Only get an item if one is immediately available. Otherwise
307        raise the Empty exception.
308        '''
309        return self.get(block=False)
310
311    def empty(self):
312        '''Return True if the queue is empty, False otherwise (not reliable!).'''
313        return len(self._queue) == 0
314
315    def qsize(self):
316        '''Return the approximate size of the queue (not reliable!).'''
317        return len(self._queue)
318
319
320if SimpleQueue is None:
321    SimpleQueue = _PySimpleQueue
322