1import ast
2import asyncio
3import code
4import concurrent.futures
5import inspect
6import sys
7import threading
8import types
9import warnings
10
11from . import futures
12
13
14class AsyncIOInteractiveConsole(code.InteractiveConsole):
15
16    def __init__(self, locals, loop):
17        super().__init__(locals)
18        self.compile.compiler.flags |= ast.PyCF_ALLOW_TOP_LEVEL_AWAIT
19
20        self.loop = loop
21
22    def runcode(self, code):
23        future = concurrent.futures.Future()
24
25        def callback():
26            global repl_future
27            global repl_future_interrupted
28
29            repl_future = None
30            repl_future_interrupted = False
31
32            func = types.FunctionType(code, self.locals)
33            try:
34                coro = func()
35            except SystemExit:
36                raise
37            except KeyboardInterrupt as ex:
38                repl_future_interrupted = True
39                future.set_exception(ex)
40                return
41            except BaseException as ex:
42                future.set_exception(ex)
43                return
44
45            if not inspect.iscoroutine(coro):
46                future.set_result(coro)
47                return
48
49            try:
50                repl_future = self.loop.create_task(coro)
51                futures._chain_future(repl_future, future)
52            except BaseException as exc:
53                future.set_exception(exc)
54
55        loop.call_soon_threadsafe(callback)
56
57        try:
58            return future.result()
59        except SystemExit:
60            raise
61        except BaseException:
62            if repl_future_interrupted:
63                self.write("\nKeyboardInterrupt\n")
64            else:
65                self.showtraceback()
66
67
68class REPLThread(threading.Thread):
69
70    def run(self):
71        try:
72            banner = (
73                f'asyncio REPL {sys.version} on {sys.platform}\n'
74                f'Use "await" directly instead of "asyncio.run()".\n'
75                f'Type "help", "copyright", "credits" or "license" '
76                f'for more information.\n'
77                f'{getattr(sys, "ps1", ">>> ")}import asyncio'
78            )
79
80            console.interact(
81                banner=banner,
82                exitmsg='exiting asyncio REPL...')
83        finally:
84            warnings.filterwarnings(
85                'ignore',
86                message=r'^coroutine .* was never awaited$',
87                category=RuntimeWarning)
88
89            loop.call_soon_threadsafe(loop.stop)
90
91
92if __name__ == '__main__':
93    loop = asyncio.new_event_loop()
94    asyncio.set_event_loop(loop)
95
96    repl_locals = {'asyncio': asyncio}
97    for key in {'__name__', '__package__',
98                '__loader__', '__spec__',
99                '__builtins__', '__file__'}:
100        repl_locals[key] = locals()[key]
101
102    console = AsyncIOInteractiveConsole(repl_locals, loop)
103
104    repl_future = None
105    repl_future_interrupted = False
106
107    try:
108        import readline  # NoQA
109    except ImportError:
110        pass
111
112    repl_thread = REPLThread()
113    repl_thread.daemon = True
114    repl_thread.start()
115
116    while True:
117        try:
118            loop.run_forever()
119        except KeyboardInterrupt:
120            if repl_future and not repl_future.done():
121                repl_future.cancel()
122                repl_future_interrupted = True
123            continue
124        else:
125            break
126