1"""A Future class similar to the one in PEP 3148."""
2
3__all__ = (
4    'Future', 'wrap_future', 'isfuture',
5)
6
7import concurrent.futures
8import contextvars
9import logging
10import sys
11
12from . import base_futures
13from . import events
14from . import exceptions
15from . import format_helpers
16
17
18isfuture = base_futures.isfuture
19
20
21_PENDING = base_futures._PENDING
22_CANCELLED = base_futures._CANCELLED
23_FINISHED = base_futures._FINISHED
24
25
26STACK_DEBUG = logging.DEBUG - 1  # heavy-duty debugging
27
28
29class Future:
30    """This class is *almost* compatible with concurrent.futures.Future.
31
32    Differences:
33
34    - This class is not thread-safe.
35
36    - result() and exception() do not take a timeout argument and
37      raise an exception when the future isn't done yet.
38
39    - Callbacks registered with add_done_callback() are always called
40      via the event loop's call_soon().
41
42    - This class is not compatible with the wait() and as_completed()
43      methods in the concurrent.futures package.
44
45    (In Python 3.4 or later we may be able to unify the implementations.)
46    """
47
48    # Class variables serving as defaults for instance variables.
49    _state = _PENDING
50    _result = None
51    _exception = None
52    _loop = None
53    _source_traceback = None
54    _cancel_message = None
55    # A saved CancelledError for later chaining as an exception context.
56    _cancelled_exc = None
57
58    # This field is used for a dual purpose:
59    # - Its presence is a marker to declare that a class implements
60    #   the Future protocol (i.e. is intended to be duck-type compatible).
61    #   The value must also be not-None, to enable a subclass to declare
62    #   that it is not compatible by setting this to None.
63    # - It is set by __iter__() below so that Task._step() can tell
64    #   the difference between
65    #   `await Future()` or`yield from Future()` (correct) vs.
66    #   `yield Future()` (incorrect).
67    _asyncio_future_blocking = False
68
69    __log_traceback = False
70
71    def __init__(self, *, loop=None):
72        """Initialize the future.
73
74        The optional event_loop argument allows explicitly setting the event
75        loop object used by the future. If it's not provided, the future uses
76        the default event loop.
77        """
78        if loop is None:
79            self._loop = events.get_event_loop()
80        else:
81            self._loop = loop
82        self._callbacks = []
83        if self._loop.get_debug():
84            self._source_traceback = format_helpers.extract_stack(
85                sys._getframe(1))
86
87    _repr_info = base_futures._future_repr_info
88
89    def __repr__(self):
90        return '<{} {}>'.format(self.__class__.__name__,
91                                ' '.join(self._repr_info()))
92
93    def __del__(self):
94        if not self.__log_traceback:
95            # set_exception() was not called, or result() or exception()
96            # has consumed the exception
97            return
98        exc = self._exception
99        context = {
100            'message':
101                f'{self.__class__.__name__} exception was never retrieved',
102            'exception': exc,
103            'future': self,
104        }
105        if self._source_traceback:
106            context['source_traceback'] = self._source_traceback
107        self._loop.call_exception_handler(context)
108
109    def __class_getitem__(cls, type):
110        return cls
111
112    @property
113    def _log_traceback(self):
114        return self.__log_traceback
115
116    @_log_traceback.setter
117    def _log_traceback(self, val):
118        if bool(val):
119            raise ValueError('_log_traceback can only be set to False')
120        self.__log_traceback = False
121
122    def get_loop(self):
123        """Return the event loop the Future is bound to."""
124        loop = self._loop
125        if loop is None:
126            raise RuntimeError("Future object is not initialized.")
127        return loop
128
129    def _make_cancelled_error(self):
130        """Create the CancelledError to raise if the Future is cancelled.
131
132        This should only be called once when handling a cancellation since
133        it erases the saved context exception value.
134        """
135        if self._cancel_message is None:
136            exc = exceptions.CancelledError()
137        else:
138            exc = exceptions.CancelledError(self._cancel_message)
139        exc.__context__ = self._cancelled_exc
140        # Remove the reference since we don't need this anymore.
141        self._cancelled_exc = None
142        return exc
143
144    def cancel(self, msg=None):
145        """Cancel the future and schedule callbacks.
146
147        If the future is already done or cancelled, return False.  Otherwise,
148        change the future's state to cancelled, schedule the callbacks and
149        return True.
150        """
151        self.__log_traceback = False
152        if self._state != _PENDING:
153            return False
154        self._state = _CANCELLED
155        self._cancel_message = msg
156        self.__schedule_callbacks()
157        return True
158
159    def __schedule_callbacks(self):
160        """Internal: Ask the event loop to call all callbacks.
161
162        The callbacks are scheduled to be called as soon as possible. Also
163        clears the callback list.
164        """
165        callbacks = self._callbacks[:]
166        if not callbacks:
167            return
168
169        self._callbacks[:] = []
170        for callback, ctx in callbacks:
171            self._loop.call_soon(callback, self, context=ctx)
172
173    def cancelled(self):
174        """Return True if the future was cancelled."""
175        return self._state == _CANCELLED
176
177    # Don't implement running(); see http://bugs.python.org/issue18699
178
179    def done(self):
180        """Return True if the future is done.
181
182        Done means either that a result / exception are available, or that the
183        future was cancelled.
184        """
185        return self._state != _PENDING
186
187    def result(self):
188        """Return the result this future represents.
189
190        If the future has been cancelled, raises CancelledError.  If the
191        future's result isn't yet available, raises InvalidStateError.  If
192        the future is done and has an exception set, this exception is raised.
193        """
194        if self._state == _CANCELLED:
195            exc = self._make_cancelled_error()
196            raise exc
197        if self._state != _FINISHED:
198            raise exceptions.InvalidStateError('Result is not ready.')
199        self.__log_traceback = False
200        if self._exception is not None:
201            raise self._exception
202        return self._result
203
204    def exception(self):
205        """Return the exception that was set on this future.
206
207        The exception (or None if no exception was set) is returned only if
208        the future is done.  If the future has been cancelled, raises
209        CancelledError.  If the future isn't done yet, raises
210        InvalidStateError.
211        """
212        if self._state == _CANCELLED:
213            exc = self._make_cancelled_error()
214            raise exc
215        if self._state != _FINISHED:
216            raise exceptions.InvalidStateError('Exception is not set.')
217        self.__log_traceback = False
218        return self._exception
219
220    def add_done_callback(self, fn, *, context=None):
221        """Add a callback to be run when the future becomes done.
222
223        The callback is called with a single argument - the future object. If
224        the future is already done when this is called, the callback is
225        scheduled with call_soon.
226        """
227        if self._state != _PENDING:
228            self._loop.call_soon(fn, self, context=context)
229        else:
230            if context is None:
231                context = contextvars.copy_context()
232            self._callbacks.append((fn, context))
233
234    # New method not in PEP 3148.
235
236    def remove_done_callback(self, fn):
237        """Remove all instances of a callback from the "call when done" list.
238
239        Returns the number of callbacks removed.
240        """
241        filtered_callbacks = [(f, ctx)
242                              for (f, ctx) in self._callbacks
243                              if f != fn]
244        removed_count = len(self._callbacks) - len(filtered_callbacks)
245        if removed_count:
246            self._callbacks[:] = filtered_callbacks
247        return removed_count
248
249    # So-called internal methods (note: no set_running_or_notify_cancel()).
250
251    def set_result(self, result):
252        """Mark the future done and set its result.
253
254        If the future is already done when this method is called, raises
255        InvalidStateError.
256        """
257        if self._state != _PENDING:
258            raise exceptions.InvalidStateError(f'{self._state}: {self!r}')
259        self._result = result
260        self._state = _FINISHED
261        self.__schedule_callbacks()
262
263    def set_exception(self, exception):
264        """Mark the future done and set an exception.
265
266        If the future is already done when this method is called, raises
267        InvalidStateError.
268        """
269        if self._state != _PENDING:
270            raise exceptions.InvalidStateError(f'{self._state}: {self!r}')
271        if isinstance(exception, type):
272            exception = exception()
273        if type(exception) is StopIteration:
274            raise TypeError("StopIteration interacts badly with generators "
275                            "and cannot be raised into a Future")
276        self._exception = exception
277        self._state = _FINISHED
278        self.__schedule_callbacks()
279        self.__log_traceback = True
280
281    def __await__(self):
282        if not self.done():
283            self._asyncio_future_blocking = True
284            yield self  # This tells Task to wait for completion.
285        if not self.done():
286            raise RuntimeError("await wasn't used with future")
287        return self.result()  # May raise too.
288
289    __iter__ = __await__  # make compatible with 'yield from'.
290
291
292# Needed for testing purposes.
293_PyFuture = Future
294
295
296def _get_loop(fut):
297    # Tries to call Future.get_loop() if it's available.
298    # Otherwise fallbacks to using the old '_loop' property.
299    try:
300        get_loop = fut.get_loop
301    except AttributeError:
302        pass
303    else:
304        return get_loop()
305    return fut._loop
306
307
308def _set_result_unless_cancelled(fut, result):
309    """Helper setting the result only if the future was not cancelled."""
310    if fut.cancelled():
311        return
312    fut.set_result(result)
313
314
315def _convert_future_exc(exc):
316    exc_class = type(exc)
317    if exc_class is concurrent.futures.CancelledError:
318        return exceptions.CancelledError(*exc.args)
319    elif exc_class is concurrent.futures.TimeoutError:
320        return exceptions.TimeoutError(*exc.args)
321    elif exc_class is concurrent.futures.InvalidStateError:
322        return exceptions.InvalidStateError(*exc.args)
323    else:
324        return exc
325
326
327def _set_concurrent_future_state(concurrent, source):
328    """Copy state from a future to a concurrent.futures.Future."""
329    assert source.done()
330    if source.cancelled():
331        concurrent.cancel()
332    if not concurrent.set_running_or_notify_cancel():
333        return
334    exception = source.exception()
335    if exception is not None:
336        concurrent.set_exception(_convert_future_exc(exception))
337    else:
338        result = source.result()
339        concurrent.set_result(result)
340
341
342def _copy_future_state(source, dest):
343    """Internal helper to copy state from another Future.
344
345    The other Future may be a concurrent.futures.Future.
346    """
347    assert source.done()
348    if dest.cancelled():
349        return
350    assert not dest.done()
351    if source.cancelled():
352        dest.cancel()
353    else:
354        exception = source.exception()
355        if exception is not None:
356            dest.set_exception(_convert_future_exc(exception))
357        else:
358            result = source.result()
359            dest.set_result(result)
360
361
362def _chain_future(source, destination):
363    """Chain two futures so that when one completes, so does the other.
364
365    The result (or exception) of source will be copied to destination.
366    If destination is cancelled, source gets cancelled too.
367    Compatible with both asyncio.Future and concurrent.futures.Future.
368    """
369    if not isfuture(source) and not isinstance(source,
370                                               concurrent.futures.Future):
371        raise TypeError('A future is required for source argument')
372    if not isfuture(destination) and not isinstance(destination,
373                                                    concurrent.futures.Future):
374        raise TypeError('A future is required for destination argument')
375    source_loop = _get_loop(source) if isfuture(source) else None
376    dest_loop = _get_loop(destination) if isfuture(destination) else None
377
378    def _set_state(future, other):
379        if isfuture(future):
380            _copy_future_state(other, future)
381        else:
382            _set_concurrent_future_state(future, other)
383
384    def _call_check_cancel(destination):
385        if destination.cancelled():
386            if source_loop is None or source_loop is dest_loop:
387                source.cancel()
388            else:
389                source_loop.call_soon_threadsafe(source.cancel)
390
391    def _call_set_state(source):
392        if (destination.cancelled() and
393                dest_loop is not None and dest_loop.is_closed()):
394            return
395        if dest_loop is None or dest_loop is source_loop:
396            _set_state(destination, source)
397        else:
398            dest_loop.call_soon_threadsafe(_set_state, destination, source)
399
400    destination.add_done_callback(_call_check_cancel)
401    source.add_done_callback(_call_set_state)
402
403
404def wrap_future(future, *, loop=None):
405    """Wrap concurrent.futures.Future object."""
406    if isfuture(future):
407        return future
408    assert isinstance(future, concurrent.futures.Future), \
409        f'concurrent.futures.Future is expected, got {future!r}'
410    if loop is None:
411        loop = events.get_event_loop()
412    new_future = loop.create_future()
413    _chain_future(future, new_future)
414    return new_future
415
416
417try:
418    import _asyncio
419except ImportError:
420    pass
421else:
422    # _CFuture is needed for tests.
423    Future = _CFuture = _asyncio.Future
424