1"""Tests for proactor_events.py"""
2
3import io
4import socket
5import unittest
6import sys
7from unittest import mock
8
9import asyncio
10from asyncio import events
11from asyncio.proactor_events import BaseProactorEventLoop
12from asyncio.proactor_events import _ProactorSocketTransport
13from asyncio.proactor_events import _ProactorWritePipeTransport
14from asyncio.proactor_events import _ProactorDuplexPipeTransport
15from test import support
16from test.test_asyncio import utils as test_utils
17
18
19def close_transport(transport):
20    # Don't call transport.close() because the event loop and the IOCP proactor
21    # are mocked
22    if transport._sock is None:
23        return
24    transport._sock.close()
25    transport._sock = None
26
27
28class ProactorSocketTransportTests(test_utils.TestCase):
29
30    def setUp(self):
31        super().setUp()
32        self.loop = self.new_test_loop()
33        self.addCleanup(self.loop.close)
34        self.proactor = mock.Mock()
35        self.loop._proactor = self.proactor
36        self.protocol = test_utils.make_test_protocol(asyncio.Protocol)
37        self.sock = mock.Mock(socket.socket)
38
39    def socket_transport(self, waiter=None):
40        transport = _ProactorSocketTransport(self.loop, self.sock,
41                                             self.protocol, waiter=waiter)
42        self.addCleanup(close_transport, transport)
43        return transport
44
45    def test_ctor(self):
46        fut = asyncio.Future(loop=self.loop)
47        tr = self.socket_transport(waiter=fut)
48        test_utils.run_briefly(self.loop)
49        self.assertIsNone(fut.result())
50        self.protocol.connection_made(tr)
51        self.proactor.recv.assert_called_with(self.sock, 32768)
52
53    def test_loop_reading(self):
54        tr = self.socket_transport()
55        tr._loop_reading()
56        self.loop._proactor.recv.assert_called_with(self.sock, 32768)
57        self.assertFalse(self.protocol.data_received.called)
58        self.assertFalse(self.protocol.eof_received.called)
59
60    def test_loop_reading_data(self):
61        res = asyncio.Future(loop=self.loop)
62        res.set_result(b'data')
63
64        tr = self.socket_transport()
65        tr._read_fut = res
66        tr._loop_reading(res)
67        self.loop._proactor.recv.assert_called_with(self.sock, 32768)
68        self.protocol.data_received.assert_called_with(b'data')
69
70    def test_loop_reading_no_data(self):
71        res = asyncio.Future(loop=self.loop)
72        res.set_result(b'')
73
74        tr = self.socket_transport()
75        self.assertRaises(AssertionError, tr._loop_reading, res)
76
77        tr.close = mock.Mock()
78        tr._read_fut = res
79        tr._loop_reading(res)
80        self.assertFalse(self.loop._proactor.recv.called)
81        self.assertTrue(self.protocol.eof_received.called)
82        self.assertTrue(tr.close.called)
83
84    def test_loop_reading_aborted(self):
85        err = self.loop._proactor.recv.side_effect = ConnectionAbortedError()
86
87        tr = self.socket_transport()
88        tr._fatal_error = mock.Mock()
89        tr._loop_reading()
90        tr._fatal_error.assert_called_with(
91                            err,
92                            'Fatal read error on pipe transport')
93
94    def test_loop_reading_aborted_closing(self):
95        self.loop._proactor.recv.side_effect = ConnectionAbortedError()
96
97        tr = self.socket_transport()
98        tr._closing = True
99        tr._fatal_error = mock.Mock()
100        tr._loop_reading()
101        self.assertFalse(tr._fatal_error.called)
102
103    def test_loop_reading_aborted_is_fatal(self):
104        self.loop._proactor.recv.side_effect = ConnectionAbortedError()
105        tr = self.socket_transport()
106        tr._closing = False
107        tr._fatal_error = mock.Mock()
108        tr._loop_reading()
109        self.assertTrue(tr._fatal_error.called)
110
111    def test_loop_reading_conn_reset_lost(self):
112        err = self.loop._proactor.recv.side_effect = ConnectionResetError()
113
114        tr = self.socket_transport()
115        tr._closing = False
116        tr._fatal_error = mock.Mock()
117        tr._force_close = mock.Mock()
118        tr._loop_reading()
119        self.assertFalse(tr._fatal_error.called)
120        tr._force_close.assert_called_with(err)
121
122    def test_loop_reading_exception(self):
123        err = self.loop._proactor.recv.side_effect = (OSError())
124
125        tr = self.socket_transport()
126        tr._fatal_error = mock.Mock()
127        tr._loop_reading()
128        tr._fatal_error.assert_called_with(
129                            err,
130                            'Fatal read error on pipe transport')
131
132    def test_write(self):
133        tr = self.socket_transport()
134        tr._loop_writing = mock.Mock()
135        tr.write(b'data')
136        self.assertEqual(tr._buffer, None)
137        tr._loop_writing.assert_called_with(data=b'data')
138
139    def test_write_no_data(self):
140        tr = self.socket_transport()
141        tr.write(b'')
142        self.assertFalse(tr._buffer)
143
144    def test_write_more(self):
145        tr = self.socket_transport()
146        tr._write_fut = mock.Mock()
147        tr._loop_writing = mock.Mock()
148        tr.write(b'data')
149        self.assertEqual(tr._buffer, b'data')
150        self.assertFalse(tr._loop_writing.called)
151
152    def test_loop_writing(self):
153        tr = self.socket_transport()
154        tr._buffer = bytearray(b'data')
155        tr._loop_writing()
156        self.loop._proactor.send.assert_called_with(self.sock, b'data')
157        self.loop._proactor.send.return_value.add_done_callback.\
158            assert_called_with(tr._loop_writing)
159
160    @mock.patch('asyncio.proactor_events.logger')
161    def test_loop_writing_err(self, m_log):
162        err = self.loop._proactor.send.side_effect = OSError()
163        tr = self.socket_transport()
164        tr._fatal_error = mock.Mock()
165        tr._buffer = [b'da', b'ta']
166        tr._loop_writing()
167        tr._fatal_error.assert_called_with(
168                            err,
169                            'Fatal write error on pipe transport')
170        tr._conn_lost = 1
171
172        tr.write(b'data')
173        tr.write(b'data')
174        tr.write(b'data')
175        tr.write(b'data')
176        tr.write(b'data')
177        self.assertEqual(tr._buffer, None)
178        m_log.warning.assert_called_with('socket.send() raised exception.')
179
180    def test_loop_writing_stop(self):
181        fut = asyncio.Future(loop=self.loop)
182        fut.set_result(b'data')
183
184        tr = self.socket_transport()
185        tr._write_fut = fut
186        tr._loop_writing(fut)
187        self.assertIsNone(tr._write_fut)
188
189    def test_loop_writing_closing(self):
190        fut = asyncio.Future(loop=self.loop)
191        fut.set_result(1)
192
193        tr = self.socket_transport()
194        tr._write_fut = fut
195        tr.close()
196        tr._loop_writing(fut)
197        self.assertIsNone(tr._write_fut)
198        test_utils.run_briefly(self.loop)
199        self.protocol.connection_lost.assert_called_with(None)
200
201    def test_abort(self):
202        tr = self.socket_transport()
203        tr._force_close = mock.Mock()
204        tr.abort()
205        tr._force_close.assert_called_with(None)
206
207    def test_close(self):
208        tr = self.socket_transport()
209        tr.close()
210        test_utils.run_briefly(self.loop)
211        self.protocol.connection_lost.assert_called_with(None)
212        self.assertTrue(tr.is_closing())
213        self.assertEqual(tr._conn_lost, 1)
214
215        self.protocol.connection_lost.reset_mock()
216        tr.close()
217        test_utils.run_briefly(self.loop)
218        self.assertFalse(self.protocol.connection_lost.called)
219
220    def test_close_write_fut(self):
221        tr = self.socket_transport()
222        tr._write_fut = mock.Mock()
223        tr.close()
224        test_utils.run_briefly(self.loop)
225        self.assertFalse(self.protocol.connection_lost.called)
226
227    def test_close_buffer(self):
228        tr = self.socket_transport()
229        tr._buffer = [b'data']
230        tr.close()
231        test_utils.run_briefly(self.loop)
232        self.assertFalse(self.protocol.connection_lost.called)
233
234    @mock.patch('asyncio.base_events.logger')
235    def test_fatal_error(self, m_logging):
236        tr = self.socket_transport()
237        tr._force_close = mock.Mock()
238        tr._fatal_error(None)
239        self.assertTrue(tr._force_close.called)
240        self.assertTrue(m_logging.error.called)
241
242    def test_force_close(self):
243        tr = self.socket_transport()
244        tr._buffer = [b'data']
245        read_fut = tr._read_fut = mock.Mock()
246        write_fut = tr._write_fut = mock.Mock()
247        tr._force_close(None)
248
249        read_fut.cancel.assert_called_with()
250        write_fut.cancel.assert_called_with()
251        test_utils.run_briefly(self.loop)
252        self.protocol.connection_lost.assert_called_with(None)
253        self.assertEqual(None, tr._buffer)
254        self.assertEqual(tr._conn_lost, 1)
255
256    def test_loop_writing_force_close(self):
257        exc_handler = mock.Mock()
258        self.loop.set_exception_handler(exc_handler)
259        fut = asyncio.Future(loop=self.loop)
260        fut.set_result(1)
261        self.proactor.send.return_value = fut
262
263        tr = self.socket_transport()
264        tr.write(b'data')
265        tr._force_close(None)
266        test_utils.run_briefly(self.loop)
267        exc_handler.assert_not_called()
268
269    def test_force_close_idempotent(self):
270        tr = self.socket_transport()
271        tr._closing = True
272        tr._force_close(None)
273        test_utils.run_briefly(self.loop)
274        self.assertFalse(self.protocol.connection_lost.called)
275
276    def test_fatal_error_2(self):
277        tr = self.socket_transport()
278        tr._buffer = [b'data']
279        tr._force_close(None)
280
281        test_utils.run_briefly(self.loop)
282        self.protocol.connection_lost.assert_called_with(None)
283        self.assertEqual(None, tr._buffer)
284
285    def test_call_connection_lost(self):
286        tr = self.socket_transport()
287        tr._call_connection_lost(None)
288        self.assertTrue(self.protocol.connection_lost.called)
289        self.assertTrue(self.sock.close.called)
290
291    def test_write_eof(self):
292        tr = self.socket_transport()
293        self.assertTrue(tr.can_write_eof())
294        tr.write_eof()
295        self.sock.shutdown.assert_called_with(socket.SHUT_WR)
296        tr.write_eof()
297        self.assertEqual(self.sock.shutdown.call_count, 1)
298        tr.close()
299
300    def test_write_eof_buffer(self):
301        tr = self.socket_transport()
302        f = asyncio.Future(loop=self.loop)
303        tr._loop._proactor.send.return_value = f
304        tr.write(b'data')
305        tr.write_eof()
306        self.assertTrue(tr._eof_written)
307        self.assertFalse(self.sock.shutdown.called)
308        tr._loop._proactor.send.assert_called_with(self.sock, b'data')
309        f.set_result(4)
310        self.loop._run_once()
311        self.sock.shutdown.assert_called_with(socket.SHUT_WR)
312        tr.close()
313
314    def test_write_eof_write_pipe(self):
315        tr = _ProactorWritePipeTransport(
316            self.loop, self.sock, self.protocol)
317        self.assertTrue(tr.can_write_eof())
318        tr.write_eof()
319        self.assertTrue(tr.is_closing())
320        self.loop._run_once()
321        self.assertTrue(self.sock.close.called)
322        tr.close()
323
324    def test_write_eof_buffer_write_pipe(self):
325        tr = _ProactorWritePipeTransport(self.loop, self.sock, self.protocol)
326        f = asyncio.Future(loop=self.loop)
327        tr._loop._proactor.send.return_value = f
328        tr.write(b'data')
329        tr.write_eof()
330        self.assertTrue(tr.is_closing())
331        self.assertFalse(self.sock.shutdown.called)
332        tr._loop._proactor.send.assert_called_with(self.sock, b'data')
333        f.set_result(4)
334        self.loop._run_once()
335        self.loop._run_once()
336        self.assertTrue(self.sock.close.called)
337        tr.close()
338
339    def test_write_eof_duplex_pipe(self):
340        tr = _ProactorDuplexPipeTransport(
341            self.loop, self.sock, self.protocol)
342        self.assertFalse(tr.can_write_eof())
343        with self.assertRaises(NotImplementedError):
344            tr.write_eof()
345        close_transport(tr)
346
347    def test_pause_resume_reading(self):
348        tr = self.socket_transport()
349        futures = []
350        for msg in [b'data1', b'data2', b'data3', b'data4', b'data5', b'']:
351            f = asyncio.Future(loop=self.loop)
352            f.set_result(msg)
353            futures.append(f)
354
355        self.loop._proactor.recv.side_effect = futures
356        self.loop._run_once()
357        self.assertFalse(tr._paused)
358        self.assertTrue(tr.is_reading())
359        self.loop._run_once()
360        self.protocol.data_received.assert_called_with(b'data1')
361        self.loop._run_once()
362        self.protocol.data_received.assert_called_with(b'data2')
363
364        tr.pause_reading()
365        tr.pause_reading()
366        self.assertTrue(tr._paused)
367        self.assertFalse(tr.is_reading())
368        for i in range(10):
369            self.loop._run_once()
370        self.protocol.data_received.assert_called_with(b'data2')
371
372        tr.resume_reading()
373        tr.resume_reading()
374        self.assertFalse(tr._paused)
375        self.assertTrue(tr.is_reading())
376        self.loop._run_once()
377        self.protocol.data_received.assert_called_with(b'data3')
378        self.loop._run_once()
379        self.protocol.data_received.assert_called_with(b'data4')
380
381        tr.pause_reading()
382        tr.resume_reading()
383        self.loop.call_exception_handler = mock.Mock()
384        self.loop._run_once()
385        self.loop.call_exception_handler.assert_not_called()
386        self.protocol.data_received.assert_called_with(b'data5')
387        tr.close()
388
389        self.assertFalse(tr.is_reading())
390
391
392    def pause_writing_transport(self, high):
393        tr = self.socket_transport()
394        tr.set_write_buffer_limits(high=high)
395
396        self.assertEqual(tr.get_write_buffer_size(), 0)
397        self.assertFalse(self.protocol.pause_writing.called)
398        self.assertFalse(self.protocol.resume_writing.called)
399        return tr
400
401    def test_pause_resume_writing(self):
402        tr = self.pause_writing_transport(high=4)
403
404        # write a large chunk, must pause writing
405        fut = asyncio.Future(loop=self.loop)
406        self.loop._proactor.send.return_value = fut
407        tr.write(b'large data')
408        self.loop._run_once()
409        self.assertTrue(self.protocol.pause_writing.called)
410
411        # flush the buffer
412        fut.set_result(None)
413        self.loop._run_once()
414        self.assertEqual(tr.get_write_buffer_size(), 0)
415        self.assertTrue(self.protocol.resume_writing.called)
416
417    def test_pause_writing_2write(self):
418        tr = self.pause_writing_transport(high=4)
419
420        # first short write, the buffer is not full (3 <= 4)
421        fut1 = asyncio.Future(loop=self.loop)
422        self.loop._proactor.send.return_value = fut1
423        tr.write(b'123')
424        self.loop._run_once()
425        self.assertEqual(tr.get_write_buffer_size(), 3)
426        self.assertFalse(self.protocol.pause_writing.called)
427
428        # fill the buffer, must pause writing (6 > 4)
429        tr.write(b'abc')
430        self.loop._run_once()
431        self.assertEqual(tr.get_write_buffer_size(), 6)
432        self.assertTrue(self.protocol.pause_writing.called)
433
434    def test_pause_writing_3write(self):
435        tr = self.pause_writing_transport(high=4)
436
437        # first short write, the buffer is not full (1 <= 4)
438        fut = asyncio.Future(loop=self.loop)
439        self.loop._proactor.send.return_value = fut
440        tr.write(b'1')
441        self.loop._run_once()
442        self.assertEqual(tr.get_write_buffer_size(), 1)
443        self.assertFalse(self.protocol.pause_writing.called)
444
445        # second short write, the buffer is not full (3 <= 4)
446        tr.write(b'23')
447        self.loop._run_once()
448        self.assertEqual(tr.get_write_buffer_size(), 3)
449        self.assertFalse(self.protocol.pause_writing.called)
450
451        # fill the buffer, must pause writing (6 > 4)
452        tr.write(b'abc')
453        self.loop._run_once()
454        self.assertEqual(tr.get_write_buffer_size(), 6)
455        self.assertTrue(self.protocol.pause_writing.called)
456
457    def test_dont_pause_writing(self):
458        tr = self.pause_writing_transport(high=4)
459
460        # write a large chunk which completes immediately,
461        # it should not pause writing
462        fut = asyncio.Future(loop=self.loop)
463        fut.set_result(None)
464        self.loop._proactor.send.return_value = fut
465        tr.write(b'very large data')
466        self.loop._run_once()
467        self.assertEqual(tr.get_write_buffer_size(), 0)
468        self.assertFalse(self.protocol.pause_writing.called)
469
470
471@unittest.skip('FIXME: bpo-33694: these tests are too close '
472               'to the implementation and should be refactored or removed')
473class ProactorSocketTransportBufferedProtoTests(test_utils.TestCase):
474
475    def setUp(self):
476        super().setUp()
477        self.loop = self.new_test_loop()
478        self.addCleanup(self.loop.close)
479        self.proactor = mock.Mock()
480        self.loop._proactor = self.proactor
481
482        self.protocol = test_utils.make_test_protocol(asyncio.BufferedProtocol)
483        self.buf = bytearray(1)
484        self.protocol.get_buffer.side_effect = lambda hint: self.buf
485
486        self.sock = mock.Mock(socket.socket)
487
488    def socket_transport(self, waiter=None):
489        transport = _ProactorSocketTransport(self.loop, self.sock,
490                                             self.protocol, waiter=waiter)
491        self.addCleanup(close_transport, transport)
492        return transport
493
494    def test_ctor(self):
495        fut = asyncio.Future(loop=self.loop)
496        tr = self.socket_transport(waiter=fut)
497        test_utils.run_briefly(self.loop)
498        self.assertIsNone(fut.result())
499        self.protocol.connection_made(tr)
500        self.proactor.recv_into.assert_called_with(self.sock, self.buf)
501
502    def test_loop_reading(self):
503        tr = self.socket_transport()
504        tr._loop_reading()
505        self.loop._proactor.recv_into.assert_called_with(self.sock, self.buf)
506        self.assertTrue(self.protocol.get_buffer.called)
507        self.assertFalse(self.protocol.buffer_updated.called)
508        self.assertFalse(self.protocol.eof_received.called)
509
510    def test_get_buffer_error(self):
511        transport = self.socket_transport()
512        transport._fatal_error = mock.Mock()
513
514        self.loop.call_exception_handler = mock.Mock()
515        self.protocol.get_buffer.side_effect = LookupError()
516
517        transport._loop_reading()
518
519        self.assertTrue(transport._fatal_error.called)
520        self.assertTrue(self.protocol.get_buffer.called)
521        self.assertFalse(self.protocol.buffer_updated.called)
522
523    def test_get_buffer_zerosized(self):
524        transport = self.socket_transport()
525        transport._fatal_error = mock.Mock()
526
527        self.loop.call_exception_handler = mock.Mock()
528        self.protocol.get_buffer.side_effect = lambda hint: bytearray(0)
529
530        transport._loop_reading()
531
532        self.assertTrue(transport._fatal_error.called)
533        self.assertTrue(self.protocol.get_buffer.called)
534        self.assertFalse(self.protocol.buffer_updated.called)
535
536    def test_proto_type_switch(self):
537        self.protocol = test_utils.make_test_protocol(asyncio.Protocol)
538        tr = self.socket_transport()
539
540        res = asyncio.Future(loop=self.loop)
541        res.set_result(b'data')
542
543        tr = self.socket_transport()
544        tr._read_fut = res
545        tr._loop_reading(res)
546        self.loop._proactor.recv.assert_called_with(self.sock, 32768)
547        self.protocol.data_received.assert_called_with(b'data')
548
549        # switch protocol to a BufferedProtocol
550
551        buf_proto = test_utils.make_test_protocol(asyncio.BufferedProtocol)
552        buf = bytearray(4)
553        buf_proto.get_buffer.side_effect = lambda hint: buf
554
555        tr.set_protocol(buf_proto)
556        test_utils.run_briefly(self.loop)
557        res = asyncio.Future(loop=self.loop)
558        res.set_result(4)
559
560        tr._read_fut = res
561        tr._loop_reading(res)
562        self.loop._proactor.recv_into.assert_called_with(self.sock, buf)
563        buf_proto.buffer_updated.assert_called_with(4)
564
565    @unittest.skip('FIXME: bpo-33694: this test is too close to the '
566                   'implementation and should be refactored or removed')
567    def test_proto_buf_switch(self):
568        tr = self.socket_transport()
569        test_utils.run_briefly(self.loop)
570        self.protocol.get_buffer.assert_called_with(-1)
571
572        # switch protocol to *another* BufferedProtocol
573
574        buf_proto = test_utils.make_test_protocol(asyncio.BufferedProtocol)
575        buf = bytearray(4)
576        buf_proto.get_buffer.side_effect = lambda hint: buf
577        tr._read_fut.done.side_effect = lambda: False
578        tr.set_protocol(buf_proto)
579        self.assertFalse(buf_proto.get_buffer.called)
580        test_utils.run_briefly(self.loop)
581        buf_proto.get_buffer.assert_called_with(-1)
582
583    def test_buffer_updated_error(self):
584        transport = self.socket_transport()
585        transport._fatal_error = mock.Mock()
586
587        self.loop.call_exception_handler = mock.Mock()
588        self.protocol.buffer_updated.side_effect = LookupError()
589
590        res = asyncio.Future(loop=self.loop)
591        res.set_result(10)
592        transport._read_fut = res
593        transport._loop_reading(res)
594
595        self.assertTrue(transport._fatal_error.called)
596        self.assertFalse(self.protocol.get_buffer.called)
597        self.assertTrue(self.protocol.buffer_updated.called)
598
599    def test_loop_eof_received_error(self):
600        res = asyncio.Future(loop=self.loop)
601        res.set_result(0)
602
603        self.protocol.eof_received.side_effect = LookupError()
604
605        tr = self.socket_transport()
606        tr._fatal_error = mock.Mock()
607
608        tr.close = mock.Mock()
609        tr._read_fut = res
610        tr._loop_reading(res)
611        self.assertFalse(self.loop._proactor.recv_into.called)
612        self.assertTrue(self.protocol.eof_received.called)
613        self.assertTrue(tr._fatal_error.called)
614
615    def test_loop_reading_data(self):
616        res = asyncio.Future(loop=self.loop)
617        res.set_result(4)
618
619        tr = self.socket_transport()
620        tr._read_fut = res
621        tr._loop_reading(res)
622        self.loop._proactor.recv_into.assert_called_with(self.sock, self.buf)
623        self.protocol.buffer_updated.assert_called_with(4)
624
625    def test_loop_reading_no_data(self):
626        res = asyncio.Future(loop=self.loop)
627        res.set_result(0)
628
629        tr = self.socket_transport()
630        self.assertRaises(AssertionError, tr._loop_reading, res)
631
632        tr.close = mock.Mock()
633        tr._read_fut = res
634        tr._loop_reading(res)
635        self.assertFalse(self.loop._proactor.recv_into.called)
636        self.assertTrue(self.protocol.eof_received.called)
637        self.assertTrue(tr.close.called)
638
639    def test_loop_reading_aborted(self):
640        err = self.loop._proactor.recv_into.side_effect = \
641            ConnectionAbortedError()
642
643        tr = self.socket_transport()
644        tr._fatal_error = mock.Mock()
645        tr._loop_reading()
646        tr._fatal_error.assert_called_with(
647            err, 'Fatal read error on pipe transport')
648
649    def test_loop_reading_aborted_closing(self):
650        self.loop._proactor.recv.side_effect = ConnectionAbortedError()
651
652        tr = self.socket_transport()
653        tr._closing = True
654        tr._fatal_error = mock.Mock()
655        tr._loop_reading()
656        self.assertFalse(tr._fatal_error.called)
657
658    def test_loop_reading_aborted_is_fatal(self):
659        self.loop._proactor.recv_into.side_effect = ConnectionAbortedError()
660        tr = self.socket_transport()
661        tr._closing = False
662        tr._fatal_error = mock.Mock()
663        tr._loop_reading()
664        self.assertTrue(tr._fatal_error.called)
665
666    def test_loop_reading_conn_reset_lost(self):
667        err = self.loop._proactor.recv_into.side_effect = ConnectionResetError()
668
669        tr = self.socket_transport()
670        tr._closing = False
671        tr._fatal_error = mock.Mock()
672        tr._force_close = mock.Mock()
673        tr._loop_reading()
674        self.assertFalse(tr._fatal_error.called)
675        tr._force_close.assert_called_with(err)
676
677    def test_loop_reading_exception(self):
678        err = self.loop._proactor.recv_into.side_effect = OSError()
679
680        tr = self.socket_transport()
681        tr._fatal_error = mock.Mock()
682        tr._loop_reading()
683        tr._fatal_error.assert_called_with(
684            err, 'Fatal read error on pipe transport')
685
686    def test_pause_resume_reading(self):
687        tr = self.socket_transport()
688        futures = []
689        for msg in [10, 20, 30, 40, 0]:
690            f = asyncio.Future(loop=self.loop)
691            f.set_result(msg)
692            futures.append(f)
693
694        self.loop._proactor.recv_into.side_effect = futures
695        self.loop._run_once()
696        self.assertFalse(tr._paused)
697        self.assertTrue(tr.is_reading())
698        self.loop._run_once()
699        self.protocol.buffer_updated.assert_called_with(10)
700        self.loop._run_once()
701        self.protocol.buffer_updated.assert_called_with(20)
702
703        tr.pause_reading()
704        tr.pause_reading()
705        self.assertTrue(tr._paused)
706        self.assertFalse(tr.is_reading())
707        for i in range(10):
708            self.loop._run_once()
709        self.protocol.buffer_updated.assert_called_with(20)
710
711        tr.resume_reading()
712        tr.resume_reading()
713        self.assertFalse(tr._paused)
714        self.assertTrue(tr.is_reading())
715        self.loop._run_once()
716        self.protocol.buffer_updated.assert_called_with(30)
717        self.loop._run_once()
718        self.protocol.buffer_updated.assert_called_with(40)
719        tr.close()
720
721        self.assertFalse(tr.is_reading())
722
723
724class BaseProactorEventLoopTests(test_utils.TestCase):
725
726    def setUp(self):
727        super().setUp()
728
729        self.sock = test_utils.mock_nonblocking_socket()
730        self.proactor = mock.Mock()
731
732        self.ssock, self.csock = mock.Mock(), mock.Mock()
733
734        with mock.patch('asyncio.proactor_events.socket.socketpair',
735                        return_value=(self.ssock, self.csock)):
736            self.loop = BaseProactorEventLoop(self.proactor)
737        self.set_event_loop(self.loop)
738
739    @mock.patch.object(BaseProactorEventLoop, 'call_soon')
740    @mock.patch('asyncio.proactor_events.socket.socketpair')
741    def test_ctor(self, socketpair, call_soon):
742        ssock, csock = socketpair.return_value = (
743            mock.Mock(), mock.Mock())
744        loop = BaseProactorEventLoop(self.proactor)
745        self.assertIs(loop._ssock, ssock)
746        self.assertIs(loop._csock, csock)
747        self.assertEqual(loop._internal_fds, 1)
748        call_soon.assert_called_with(loop._loop_self_reading)
749        loop.close()
750
751    def test_close_self_pipe(self):
752        self.loop._close_self_pipe()
753        self.assertEqual(self.loop._internal_fds, 0)
754        self.assertTrue(self.ssock.close.called)
755        self.assertTrue(self.csock.close.called)
756        self.assertIsNone(self.loop._ssock)
757        self.assertIsNone(self.loop._csock)
758
759        # Don't call close(): _close_self_pipe() cannot be called twice
760        self.loop._closed = True
761
762    def test_close(self):
763        self.loop._close_self_pipe = mock.Mock()
764        self.loop.close()
765        self.assertTrue(self.loop._close_self_pipe.called)
766        self.assertTrue(self.proactor.close.called)
767        self.assertIsNone(self.loop._proactor)
768
769        self.loop._close_self_pipe.reset_mock()
770        self.loop.close()
771        self.assertFalse(self.loop._close_self_pipe.called)
772
773    def test_make_socket_transport(self):
774        tr = self.loop._make_socket_transport(self.sock, asyncio.Protocol())
775        self.assertIsInstance(tr, _ProactorSocketTransport)
776        close_transport(tr)
777
778    def test_loop_self_reading(self):
779        self.loop._loop_self_reading()
780        self.proactor.recv.assert_called_with(self.ssock, 4096)
781        self.proactor.recv.return_value.add_done_callback.assert_called_with(
782            self.loop._loop_self_reading)
783
784    def test_loop_self_reading_fut(self):
785        fut = mock.Mock()
786        self.loop._loop_self_reading(fut)
787        self.assertTrue(fut.result.called)
788        self.proactor.recv.assert_called_with(self.ssock, 4096)
789        self.proactor.recv.return_value.add_done_callback.assert_called_with(
790            self.loop._loop_self_reading)
791
792    def test_loop_self_reading_exception(self):
793        self.loop.call_exception_handler = mock.Mock()
794        self.proactor.recv.side_effect = OSError()
795        self.loop._loop_self_reading()
796        self.assertTrue(self.loop.call_exception_handler.called)
797
798    def test_write_to_self(self):
799        self.loop._write_to_self()
800        self.csock.send.assert_called_with(b'\0')
801
802    def test_process_events(self):
803        self.loop._process_events([])
804
805    @mock.patch('asyncio.base_events.logger')
806    def test_create_server(self, m_log):
807        pf = mock.Mock()
808        call_soon = self.loop.call_soon = mock.Mock()
809
810        self.loop._start_serving(pf, self.sock)
811        self.assertTrue(call_soon.called)
812
813        # callback
814        loop = call_soon.call_args[0][0]
815        loop()
816        self.proactor.accept.assert_called_with(self.sock)
817
818        # conn
819        fut = mock.Mock()
820        fut.result.return_value = (mock.Mock(), mock.Mock())
821
822        make_tr = self.loop._make_socket_transport = mock.Mock()
823        loop(fut)
824        self.assertTrue(fut.result.called)
825        self.assertTrue(make_tr.called)
826
827        # exception
828        fut.result.side_effect = OSError()
829        loop(fut)
830        self.assertTrue(self.sock.close.called)
831        self.assertTrue(m_log.error.called)
832
833    def test_create_server_cancel(self):
834        pf = mock.Mock()
835        call_soon = self.loop.call_soon = mock.Mock()
836
837        self.loop._start_serving(pf, self.sock)
838        loop = call_soon.call_args[0][0]
839
840        # cancelled
841        fut = asyncio.Future(loop=self.loop)
842        fut.cancel()
843        loop(fut)
844        self.assertTrue(self.sock.close.called)
845
846    def test_stop_serving(self):
847        sock1 = mock.Mock()
848        future1 = mock.Mock()
849        sock2 = mock.Mock()
850        future2 = mock.Mock()
851        self.loop._accept_futures = {
852            sock1.fileno(): future1,
853            sock2.fileno(): future2
854        }
855
856        self.loop._stop_serving(sock1)
857        self.assertTrue(sock1.close.called)
858        self.assertTrue(future1.cancel.called)
859        self.proactor._stop_serving.assert_called_with(sock1)
860        self.assertFalse(sock2.close.called)
861        self.assertFalse(future2.cancel.called)
862
863
864@unittest.skipIf(sys.platform != 'win32',
865                 'Proactor is supported on Windows only')
866class ProactorEventLoopUnixSockSendfileTests(test_utils.TestCase):
867    DATA = b"12345abcde" * 16 * 1024  # 160 KiB
868
869    class MyProto(asyncio.Protocol):
870
871        def __init__(self, loop):
872            self.started = False
873            self.closed = False
874            self.data = bytearray()
875            self.fut = loop.create_future()
876            self.transport = None
877
878        def connection_made(self, transport):
879            self.started = True
880            self.transport = transport
881
882        def data_received(self, data):
883            self.data.extend(data)
884
885        def connection_lost(self, exc):
886            self.closed = True
887            self.fut.set_result(None)
888
889        async def wait_closed(self):
890            await self.fut
891
892    @classmethod
893    def setUpClass(cls):
894        with open(support.TESTFN, 'wb') as fp:
895            fp.write(cls.DATA)
896        super().setUpClass()
897
898    @classmethod
899    def tearDownClass(cls):
900        support.unlink(support.TESTFN)
901        super().tearDownClass()
902
903    def setUp(self):
904        self.loop = asyncio.ProactorEventLoop()
905        self.set_event_loop(self.loop)
906        self.addCleanup(self.loop.close)
907        self.file = open(support.TESTFN, 'rb')
908        self.addCleanup(self.file.close)
909        super().setUp()
910
911    def make_socket(self, cleanup=True):
912        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
913        sock.setblocking(False)
914        sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1024)
915        sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024)
916        if cleanup:
917            self.addCleanup(sock.close)
918        return sock
919
920    def run_loop(self, coro):
921        return self.loop.run_until_complete(coro)
922
923    def prepare(self):
924        sock = self.make_socket()
925        proto = self.MyProto(self.loop)
926        port = support.find_unused_port()
927        srv_sock = self.make_socket(cleanup=False)
928        srv_sock.bind(('127.0.0.1', port))
929        server = self.run_loop(self.loop.create_server(
930            lambda: proto, sock=srv_sock))
931        self.run_loop(self.loop.sock_connect(sock, srv_sock.getsockname()))
932
933        def cleanup():
934            if proto.transport is not None:
935                # can be None if the task was cancelled before
936                # connection_made callback
937                proto.transport.close()
938                self.run_loop(proto.wait_closed())
939
940            server.close()
941            self.run_loop(server.wait_closed())
942
943        self.addCleanup(cleanup)
944
945        return sock, proto
946
947    def test_sock_sendfile_not_a_file(self):
948        sock, proto = self.prepare()
949        f = object()
950        with self.assertRaisesRegex(events.SendfileNotAvailableError,
951                                    "not a regular file"):
952            self.run_loop(self.loop._sock_sendfile_native(sock, f,
953                                                          0, None))
954        self.assertEqual(self.file.tell(), 0)
955
956    def test_sock_sendfile_iobuffer(self):
957        sock, proto = self.prepare()
958        f = io.BytesIO()
959        with self.assertRaisesRegex(events.SendfileNotAvailableError,
960                                    "not a regular file"):
961            self.run_loop(self.loop._sock_sendfile_native(sock, f,
962                                                          0, None))
963        self.assertEqual(self.file.tell(), 0)
964
965    def test_sock_sendfile_not_regular_file(self):
966        sock, proto = self.prepare()
967        f = mock.Mock()
968        f.fileno.return_value = -1
969        with self.assertRaisesRegex(events.SendfileNotAvailableError,
970                                    "not a regular file"):
971            self.run_loop(self.loop._sock_sendfile_native(sock, f,
972                                                          0, None))
973        self.assertEqual(self.file.tell(), 0)
974
975
976if __name__ == '__main__':
977    unittest.main()
978