1# Copyright 2009 Brian Quinlan. All Rights Reserved.
2# Licensed to PSF under a Contributor Agreement.
3
4__author__ = 'Brian Quinlan (brian@sweetapp.com)'
5
6import collections
7import logging
8import threading
9import time
10
11FIRST_COMPLETED = 'FIRST_COMPLETED'
12FIRST_EXCEPTION = 'FIRST_EXCEPTION'
13ALL_COMPLETED = 'ALL_COMPLETED'
14_AS_COMPLETED = '_AS_COMPLETED'
15
16# Possible future states (for internal use by the futures package).
17PENDING = 'PENDING'
18RUNNING = 'RUNNING'
19# The future was cancelled by the user...
20CANCELLED = 'CANCELLED'
21# ...and _Waiter.add_cancelled() was called by a worker.
22CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED'
23FINISHED = 'FINISHED'
24
25_FUTURE_STATES = [
26    PENDING,
27    RUNNING,
28    CANCELLED,
29    CANCELLED_AND_NOTIFIED,
30    FINISHED
31]
32
33_STATE_TO_DESCRIPTION_MAP = {
34    PENDING: "pending",
35    RUNNING: "running",
36    CANCELLED: "cancelled",
37    CANCELLED_AND_NOTIFIED: "cancelled",
38    FINISHED: "finished"
39}
40
41# Logger for internal use by the futures package.
42LOGGER = logging.getLogger("concurrent.futures")
43
44class Error(Exception):
45    """Base class for all future-related exceptions."""
46    pass
47
48class CancelledError(Error):
49    """The Future was cancelled."""
50    pass
51
52class TimeoutError(Error):
53    """The operation exceeded the given deadline."""
54    pass
55
56class _Waiter(object):
57    """Provides the event that wait() and as_completed() block on."""
58    def __init__(self):
59        self.event = threading.Event()
60        self.finished_futures = []
61
62    def add_result(self, future):
63        self.finished_futures.append(future)
64
65    def add_exception(self, future):
66        self.finished_futures.append(future)
67
68    def add_cancelled(self, future):
69        self.finished_futures.append(future)
70
71class _AsCompletedWaiter(_Waiter):
72    """Used by as_completed()."""
73
74    def __init__(self):
75        super(_AsCompletedWaiter, self).__init__()
76        self.lock = threading.Lock()
77
78    def add_result(self, future):
79        with self.lock:
80            super(_AsCompletedWaiter, self).add_result(future)
81            self.event.set()
82
83    def add_exception(self, future):
84        with self.lock:
85            super(_AsCompletedWaiter, self).add_exception(future)
86            self.event.set()
87
88    def add_cancelled(self, future):
89        with self.lock:
90            super(_AsCompletedWaiter, self).add_cancelled(future)
91            self.event.set()
92
93class _FirstCompletedWaiter(_Waiter):
94    """Used by wait(return_when=FIRST_COMPLETED)."""
95
96    def add_result(self, future):
97        super().add_result(future)
98        self.event.set()
99
100    def add_exception(self, future):
101        super().add_exception(future)
102        self.event.set()
103
104    def add_cancelled(self, future):
105        super().add_cancelled(future)
106        self.event.set()
107
108class _AllCompletedWaiter(_Waiter):
109    """Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED)."""
110
111    def __init__(self, num_pending_calls, stop_on_exception):
112        self.num_pending_calls = num_pending_calls
113        self.stop_on_exception = stop_on_exception
114        self.lock = threading.Lock()
115        super().__init__()
116
117    def _decrement_pending_calls(self):
118        with self.lock:
119            self.num_pending_calls -= 1
120            if not self.num_pending_calls:
121                self.event.set()
122
123    def add_result(self, future):
124        super().add_result(future)
125        self._decrement_pending_calls()
126
127    def add_exception(self, future):
128        super().add_exception(future)
129        if self.stop_on_exception:
130            self.event.set()
131        else:
132            self._decrement_pending_calls()
133
134    def add_cancelled(self, future):
135        super().add_cancelled(future)
136        self._decrement_pending_calls()
137
138class _AcquireFutures(object):
139    """A context manager that does an ordered acquire of Future conditions."""
140
141    def __init__(self, futures):
142        self.futures = sorted(futures, key=id)
143
144    def __enter__(self):
145        for future in self.futures:
146            future._condition.acquire()
147
148    def __exit__(self, *args):
149        for future in self.futures:
150            future._condition.release()
151
152def _create_and_install_waiters(fs, return_when):
153    if return_when == _AS_COMPLETED:
154        waiter = _AsCompletedWaiter()
155    elif return_when == FIRST_COMPLETED:
156        waiter = _FirstCompletedWaiter()
157    else:
158        pending_count = sum(
159                f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs)
160
161        if return_when == FIRST_EXCEPTION:
162            waiter = _AllCompletedWaiter(pending_count, stop_on_exception=True)
163        elif return_when == ALL_COMPLETED:
164            waiter = _AllCompletedWaiter(pending_count, stop_on_exception=False)
165        else:
166            raise ValueError("Invalid return condition: %r" % return_when)
167
168    for f in fs:
169        f._waiters.append(waiter)
170
171    return waiter
172
173
174def _yield_finished_futures(fs, waiter, ref_collect):
175    """
176    Iterate on the list *fs*, yielding finished futures one by one in
177    reverse order.
178    Before yielding a future, *waiter* is removed from its waiters
179    and the future is removed from each set in the collection of sets
180    *ref_collect*.
181
182    The aim of this function is to avoid keeping stale references after
183    the future is yielded and before the iterator resumes.
184    """
185    while fs:
186        f = fs[-1]
187        for futures_set in ref_collect:
188            futures_set.remove(f)
189        with f._condition:
190            f._waiters.remove(waiter)
191        del f
192        # Careful not to keep a reference to the popped value
193        yield fs.pop()
194
195
196def as_completed(fs, timeout=None):
197    """An iterator over the given futures that yields each as it completes.
198
199    Args:
200        fs: The sequence of Futures (possibly created by different Executors) to
201            iterate over.
202        timeout: The maximum number of seconds to wait. If None, then there
203            is no limit on the wait time.
204
205    Returns:
206        An iterator that yields the given Futures as they complete (finished or
207        cancelled). If any given Futures are duplicated, they will be returned
208        once.
209
210    Raises:
211        TimeoutError: If the entire result iterator could not be generated
212            before the given timeout.
213    """
214    if timeout is not None:
215        end_time = timeout + time.monotonic()
216
217    fs = set(fs)
218    total_futures = len(fs)
219    with _AcquireFutures(fs):
220        finished = set(
221                f for f in fs
222                if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
223        pending = fs - finished
224        waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
225    finished = list(finished)
226    try:
227        yield from _yield_finished_futures(finished, waiter,
228                                           ref_collect=(fs,))
229
230        while pending:
231            if timeout is None:
232                wait_timeout = None
233            else:
234                wait_timeout = end_time - time.monotonic()
235                if wait_timeout < 0:
236                    raise TimeoutError(
237                            '%d (of %d) futures unfinished' % (
238                            len(pending), total_futures))
239
240            waiter.event.wait(wait_timeout)
241
242            with waiter.lock:
243                finished = waiter.finished_futures
244                waiter.finished_futures = []
245                waiter.event.clear()
246
247            # reverse to keep finishing order
248            finished.reverse()
249            yield from _yield_finished_futures(finished, waiter,
250                                               ref_collect=(fs, pending))
251
252    finally:
253        # Remove waiter from unfinished futures
254        for f in fs:
255            with f._condition:
256                f._waiters.remove(waiter)
257
258DoneAndNotDoneFutures = collections.namedtuple(
259        'DoneAndNotDoneFutures', 'done not_done')
260def wait(fs, timeout=None, return_when=ALL_COMPLETED):
261    """Wait for the futures in the given sequence to complete.
262
263    Args:
264        fs: The sequence of Futures (possibly created by different Executors) to
265            wait upon.
266        timeout: The maximum number of seconds to wait. If None, then there
267            is no limit on the wait time.
268        return_when: Indicates when this function should return. The options
269            are:
270
271            FIRST_COMPLETED - Return when any future finishes or is
272                              cancelled.
273            FIRST_EXCEPTION - Return when any future finishes by raising an
274                              exception. If no future raises an exception
275                              then it is equivalent to ALL_COMPLETED.
276            ALL_COMPLETED -   Return when all futures finish or are cancelled.
277
278    Returns:
279        A named 2-tuple of sets. The first set, named 'done', contains the
280        futures that completed (is finished or cancelled) before the wait
281        completed. The second set, named 'not_done', contains uncompleted
282        futures.
283    """
284    with _AcquireFutures(fs):
285        done = set(f for f in fs
286                   if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
287        not_done = set(fs) - done
288
289        if (return_when == FIRST_COMPLETED) and done:
290            return DoneAndNotDoneFutures(done, not_done)
291        elif (return_when == FIRST_EXCEPTION) and done:
292            if any(f for f in done
293                   if not f.cancelled() and f.exception() is not None):
294                return DoneAndNotDoneFutures(done, not_done)
295
296        if len(done) == len(fs):
297            return DoneAndNotDoneFutures(done, not_done)
298
299        waiter = _create_and_install_waiters(fs, return_when)
300
301    waiter.event.wait(timeout)
302    for f in fs:
303        with f._condition:
304            f._waiters.remove(waiter)
305
306    done.update(waiter.finished_futures)
307    return DoneAndNotDoneFutures(done, set(fs) - done)
308
309class Future(object):
310    """Represents the result of an asynchronous computation."""
311
312    def __init__(self):
313        """Initializes the future. Should not be called by clients."""
314        self._condition = threading.Condition()
315        self._state = PENDING
316        self._result = None
317        self._exception = None
318        self._waiters = []
319        self._done_callbacks = []
320
321    def _invoke_callbacks(self):
322        for callback in self._done_callbacks:
323            try:
324                callback(self)
325            except Exception:
326                LOGGER.exception('exception calling callback for %r', self)
327
328    def __repr__(self):
329        with self._condition:
330            if self._state == FINISHED:
331                if self._exception:
332                    return '<%s at %#x state=%s raised %s>' % (
333                        self.__class__.__name__,
334                        id(self),
335                        _STATE_TO_DESCRIPTION_MAP[self._state],
336                        self._exception.__class__.__name__)
337                else:
338                    return '<%s at %#x state=%s returned %s>' % (
339                        self.__class__.__name__,
340                        id(self),
341                        _STATE_TO_DESCRIPTION_MAP[self._state],
342                        self._result.__class__.__name__)
343            return '<%s at %#x state=%s>' % (
344                    self.__class__.__name__,
345                    id(self),
346                   _STATE_TO_DESCRIPTION_MAP[self._state])
347
348    def cancel(self):
349        """Cancel the future if possible.
350
351        Returns True if the future was cancelled, False otherwise. A future
352        cannot be cancelled if it is running or has already completed.
353        """
354        with self._condition:
355            if self._state in [RUNNING, FINISHED]:
356                return False
357
358            if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
359                return True
360
361            self._state = CANCELLED
362            self._condition.notify_all()
363
364        self._invoke_callbacks()
365        return True
366
367    def cancelled(self):
368        """Return True if the future was cancelled."""
369        with self._condition:
370            return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]
371
372    def running(self):
373        """Return True if the future is currently executing."""
374        with self._condition:
375            return self._state == RUNNING
376
377    def done(self):
378        """Return True of the future was cancelled or finished executing."""
379        with self._condition:
380            return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]
381
382    def __get_result(self):
383        if self._exception:
384            raise self._exception
385        else:
386            return self._result
387
388    def add_done_callback(self, fn):
389        """Attaches a callable that will be called when the future finishes.
390
391        Args:
392            fn: A callable that will be called with this future as its only
393                argument when the future completes or is cancelled. The callable
394                will always be called by a thread in the same process in which
395                it was added. If the future has already completed or been
396                cancelled then the callable will be called immediately. These
397                callables are called in the order that they were added.
398        """
399        with self._condition:
400            if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]:
401                self._done_callbacks.append(fn)
402                return
403        fn(self)
404
405    def result(self, timeout=None):
406        """Return the result of the call that the future represents.
407
408        Args:
409            timeout: The number of seconds to wait for the result if the future
410                isn't done. If None, then there is no limit on the wait time.
411
412        Returns:
413            The result of the call that the future represents.
414
415        Raises:
416            CancelledError: If the future was cancelled.
417            TimeoutError: If the future didn't finish executing before the given
418                timeout.
419            Exception: If the call raised then that exception will be raised.
420        """
421        with self._condition:
422            if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
423                raise CancelledError()
424            elif self._state == FINISHED:
425                return self.__get_result()
426
427            self._condition.wait(timeout)
428
429            if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
430                raise CancelledError()
431            elif self._state == FINISHED:
432                return self.__get_result()
433            else:
434                raise TimeoutError()
435
436    def exception(self, timeout=None):
437        """Return the exception raised by the call that the future represents.
438
439        Args:
440            timeout: The number of seconds to wait for the exception if the
441                future isn't done. If None, then there is no limit on the wait
442                time.
443
444        Returns:
445            The exception raised by the call that the future represents or None
446            if the call completed without raising.
447
448        Raises:
449            CancelledError: If the future was cancelled.
450            TimeoutError: If the future didn't finish executing before the given
451                timeout.
452        """
453
454        with self._condition:
455            if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
456                raise CancelledError()
457            elif self._state == FINISHED:
458                return self._exception
459
460            self._condition.wait(timeout)
461
462            if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
463                raise CancelledError()
464            elif self._state == FINISHED:
465                return self._exception
466            else:
467                raise TimeoutError()
468
469    # The following methods should only be used by Executors and in tests.
470    def set_running_or_notify_cancel(self):
471        """Mark the future as running or process any cancel notifications.
472
473        Should only be used by Executor implementations and unit tests.
474
475        If the future has been cancelled (cancel() was called and returned
476        True) then any threads waiting on the future completing (though calls
477        to as_completed() or wait()) are notified and False is returned.
478
479        If the future was not cancelled then it is put in the running state
480        (future calls to running() will return True) and True is returned.
481
482        This method should be called by Executor implementations before
483        executing the work associated with this future. If this method returns
484        False then the work should not be executed.
485
486        Returns:
487            False if the Future was cancelled, True otherwise.
488
489        Raises:
490            RuntimeError: if this method was already called or if set_result()
491                or set_exception() was called.
492        """
493        with self._condition:
494            if self._state == CANCELLED:
495                self._state = CANCELLED_AND_NOTIFIED
496                for waiter in self._waiters:
497                    waiter.add_cancelled(self)
498                # self._condition.notify_all() is not necessary because
499                # self.cancel() triggers a notification.
500                return False
501            elif self._state == PENDING:
502                self._state = RUNNING
503                return True
504            else:
505                LOGGER.critical('Future %s in unexpected state: %s',
506                                id(self),
507                                self._state)
508                raise RuntimeError('Future in unexpected state')
509
510    def set_result(self, result):
511        """Sets the return value of work associated with the future.
512
513        Should only be used by Executor implementations and unit tests.
514        """
515        with self._condition:
516            self._result = result
517            self._state = FINISHED
518            for waiter in self._waiters:
519                waiter.add_result(self)
520            self._condition.notify_all()
521        self._invoke_callbacks()
522
523    def set_exception(self, exception):
524        """Sets the result of the future as being the given exception.
525
526        Should only be used by Executor implementations and unit tests.
527        """
528        with self._condition:
529            self._exception = exception
530            self._state = FINISHED
531            for waiter in self._waiters:
532                waiter.add_exception(self)
533            self._condition.notify_all()
534        self._invoke_callbacks()
535
536class Executor(object):
537    """This is an abstract base class for concrete asynchronous executors."""
538
539    def submit(self, fn, *args, **kwargs):
540        """Submits a callable to be executed with the given arguments.
541
542        Schedules the callable to be executed as fn(*args, **kwargs) and returns
543        a Future instance representing the execution of the callable.
544
545        Returns:
546            A Future representing the given call.
547        """
548        raise NotImplementedError()
549
550    def map(self, fn, *iterables, timeout=None, chunksize=1):
551        """Returns an iterator equivalent to map(fn, iter).
552
553        Args:
554            fn: A callable that will take as many arguments as there are
555                passed iterables.
556            timeout: The maximum number of seconds to wait. If None, then there
557                is no limit on the wait time.
558            chunksize: The size of the chunks the iterable will be broken into
559                before being passed to a child process. This argument is only
560                used by ProcessPoolExecutor; it is ignored by
561                ThreadPoolExecutor.
562
563        Returns:
564            An iterator equivalent to: map(func, *iterables) but the calls may
565            be evaluated out-of-order.
566
567        Raises:
568            TimeoutError: If the entire result iterator could not be generated
569                before the given timeout.
570            Exception: If fn(*args) raises for any values.
571        """
572        if timeout is not None:
573            end_time = timeout + time.monotonic()
574
575        fs = [self.submit(fn, *args) for args in zip(*iterables)]
576
577        # Yield must be hidden in closure so that the futures are submitted
578        # before the first iterator value is required.
579        def result_iterator():
580            try:
581                # reverse to keep finishing order
582                fs.reverse()
583                while fs:
584                    # Careful not to keep a reference to the popped future
585                    if timeout is None:
586                        yield fs.pop().result()
587                    else:
588                        yield fs.pop().result(end_time - time.monotonic())
589            finally:
590                for future in fs:
591                    future.cancel()
592        return result_iterator()
593
594    def shutdown(self, wait=True):
595        """Clean-up the resources associated with the Executor.
596
597        It is safe to call this method several times. Otherwise, no other
598        methods can be called after this one.
599
600        Args:
601            wait: If True then shutdown will not return until all running
602                futures have finished executing and the resources used by the
603                executor have been reclaimed.
604        """
605        pass
606
607    def __enter__(self):
608        return self
609
610    def __exit__(self, exc_type, exc_val, exc_tb):
611        self.shutdown(wait=True)
612        return False
613
614
615class BrokenExecutor(RuntimeError):
616    """
617    Raised when a executor has become non-functional after a severe failure.
618    """
619