1"""Tests for proactor_events.py""" 2 3import io 4import socket 5import unittest 6import sys 7from unittest import mock 8 9import asyncio 10from asyncio import events 11from asyncio.proactor_events import BaseProactorEventLoop 12from asyncio.proactor_events import _ProactorSocketTransport 13from asyncio.proactor_events import _ProactorWritePipeTransport 14from asyncio.proactor_events import _ProactorDuplexPipeTransport 15from test import support 16from test.test_asyncio import utils as test_utils 17 18 19def close_transport(transport): 20 # Don't call transport.close() because the event loop and the IOCP proactor 21 # are mocked 22 if transport._sock is None: 23 return 24 transport._sock.close() 25 transport._sock = None 26 27 28class ProactorSocketTransportTests(test_utils.TestCase): 29 30 def setUp(self): 31 super().setUp() 32 self.loop = self.new_test_loop() 33 self.addCleanup(self.loop.close) 34 self.proactor = mock.Mock() 35 self.loop._proactor = self.proactor 36 self.protocol = test_utils.make_test_protocol(asyncio.Protocol) 37 self.sock = mock.Mock(socket.socket) 38 39 def socket_transport(self, waiter=None): 40 transport = _ProactorSocketTransport(self.loop, self.sock, 41 self.protocol, waiter=waiter) 42 self.addCleanup(close_transport, transport) 43 return transport 44 45 def test_ctor(self): 46 fut = asyncio.Future(loop=self.loop) 47 tr = self.socket_transport(waiter=fut) 48 test_utils.run_briefly(self.loop) 49 self.assertIsNone(fut.result()) 50 self.protocol.connection_made(tr) 51 self.proactor.recv.assert_called_with(self.sock, 32768) 52 53 def test_loop_reading(self): 54 tr = self.socket_transport() 55 tr._loop_reading() 56 self.loop._proactor.recv.assert_called_with(self.sock, 32768) 57 self.assertFalse(self.protocol.data_received.called) 58 self.assertFalse(self.protocol.eof_received.called) 59 60 def test_loop_reading_data(self): 61 res = asyncio.Future(loop=self.loop) 62 res.set_result(b'data') 63 64 tr = self.socket_transport() 65 tr._read_fut = res 66 tr._loop_reading(res) 67 self.loop._proactor.recv.assert_called_with(self.sock, 32768) 68 self.protocol.data_received.assert_called_with(b'data') 69 70 def test_loop_reading_no_data(self): 71 res = asyncio.Future(loop=self.loop) 72 res.set_result(b'') 73 74 tr = self.socket_transport() 75 self.assertRaises(AssertionError, tr._loop_reading, res) 76 77 tr.close = mock.Mock() 78 tr._read_fut = res 79 tr._loop_reading(res) 80 self.assertFalse(self.loop._proactor.recv.called) 81 self.assertTrue(self.protocol.eof_received.called) 82 self.assertTrue(tr.close.called) 83 84 def test_loop_reading_aborted(self): 85 err = self.loop._proactor.recv.side_effect = ConnectionAbortedError() 86 87 tr = self.socket_transport() 88 tr._fatal_error = mock.Mock() 89 tr._loop_reading() 90 tr._fatal_error.assert_called_with( 91 err, 92 'Fatal read error on pipe transport') 93 94 def test_loop_reading_aborted_closing(self): 95 self.loop._proactor.recv.side_effect = ConnectionAbortedError() 96 97 tr = self.socket_transport() 98 tr._closing = True 99 tr._fatal_error = mock.Mock() 100 tr._loop_reading() 101 self.assertFalse(tr._fatal_error.called) 102 103 def test_loop_reading_aborted_is_fatal(self): 104 self.loop._proactor.recv.side_effect = ConnectionAbortedError() 105 tr = self.socket_transport() 106 tr._closing = False 107 tr._fatal_error = mock.Mock() 108 tr._loop_reading() 109 self.assertTrue(tr._fatal_error.called) 110 111 def test_loop_reading_conn_reset_lost(self): 112 err = self.loop._proactor.recv.side_effect = ConnectionResetError() 113 114 tr = self.socket_transport() 115 tr._closing = False 116 tr._fatal_error = mock.Mock() 117 tr._force_close = mock.Mock() 118 tr._loop_reading() 119 self.assertFalse(tr._fatal_error.called) 120 tr._force_close.assert_called_with(err) 121 122 def test_loop_reading_exception(self): 123 err = self.loop._proactor.recv.side_effect = (OSError()) 124 125 tr = self.socket_transport() 126 tr._fatal_error = mock.Mock() 127 tr._loop_reading() 128 tr._fatal_error.assert_called_with( 129 err, 130 'Fatal read error on pipe transport') 131 132 def test_write(self): 133 tr = self.socket_transport() 134 tr._loop_writing = mock.Mock() 135 tr.write(b'data') 136 self.assertEqual(tr._buffer, None) 137 tr._loop_writing.assert_called_with(data=b'data') 138 139 def test_write_no_data(self): 140 tr = self.socket_transport() 141 tr.write(b'') 142 self.assertFalse(tr._buffer) 143 144 def test_write_more(self): 145 tr = self.socket_transport() 146 tr._write_fut = mock.Mock() 147 tr._loop_writing = mock.Mock() 148 tr.write(b'data') 149 self.assertEqual(tr._buffer, b'data') 150 self.assertFalse(tr._loop_writing.called) 151 152 def test_loop_writing(self): 153 tr = self.socket_transport() 154 tr._buffer = bytearray(b'data') 155 tr._loop_writing() 156 self.loop._proactor.send.assert_called_with(self.sock, b'data') 157 self.loop._proactor.send.return_value.add_done_callback.\ 158 assert_called_with(tr._loop_writing) 159 160 @mock.patch('asyncio.proactor_events.logger') 161 def test_loop_writing_err(self, m_log): 162 err = self.loop._proactor.send.side_effect = OSError() 163 tr = self.socket_transport() 164 tr._fatal_error = mock.Mock() 165 tr._buffer = [b'da', b'ta'] 166 tr._loop_writing() 167 tr._fatal_error.assert_called_with( 168 err, 169 'Fatal write error on pipe transport') 170 tr._conn_lost = 1 171 172 tr.write(b'data') 173 tr.write(b'data') 174 tr.write(b'data') 175 tr.write(b'data') 176 tr.write(b'data') 177 self.assertEqual(tr._buffer, None) 178 m_log.warning.assert_called_with('socket.send() raised exception.') 179 180 def test_loop_writing_stop(self): 181 fut = asyncio.Future(loop=self.loop) 182 fut.set_result(b'data') 183 184 tr = self.socket_transport() 185 tr._write_fut = fut 186 tr._loop_writing(fut) 187 self.assertIsNone(tr._write_fut) 188 189 def test_loop_writing_closing(self): 190 fut = asyncio.Future(loop=self.loop) 191 fut.set_result(1) 192 193 tr = self.socket_transport() 194 tr._write_fut = fut 195 tr.close() 196 tr._loop_writing(fut) 197 self.assertIsNone(tr._write_fut) 198 test_utils.run_briefly(self.loop) 199 self.protocol.connection_lost.assert_called_with(None) 200 201 def test_abort(self): 202 tr = self.socket_transport() 203 tr._force_close = mock.Mock() 204 tr.abort() 205 tr._force_close.assert_called_with(None) 206 207 def test_close(self): 208 tr = self.socket_transport() 209 tr.close() 210 test_utils.run_briefly(self.loop) 211 self.protocol.connection_lost.assert_called_with(None) 212 self.assertTrue(tr.is_closing()) 213 self.assertEqual(tr._conn_lost, 1) 214 215 self.protocol.connection_lost.reset_mock() 216 tr.close() 217 test_utils.run_briefly(self.loop) 218 self.assertFalse(self.protocol.connection_lost.called) 219 220 def test_close_write_fut(self): 221 tr = self.socket_transport() 222 tr._write_fut = mock.Mock() 223 tr.close() 224 test_utils.run_briefly(self.loop) 225 self.assertFalse(self.protocol.connection_lost.called) 226 227 def test_close_buffer(self): 228 tr = self.socket_transport() 229 tr._buffer = [b'data'] 230 tr.close() 231 test_utils.run_briefly(self.loop) 232 self.assertFalse(self.protocol.connection_lost.called) 233 234 @mock.patch('asyncio.base_events.logger') 235 def test_fatal_error(self, m_logging): 236 tr = self.socket_transport() 237 tr._force_close = mock.Mock() 238 tr._fatal_error(None) 239 self.assertTrue(tr._force_close.called) 240 self.assertTrue(m_logging.error.called) 241 242 def test_force_close(self): 243 tr = self.socket_transport() 244 tr._buffer = [b'data'] 245 read_fut = tr._read_fut = mock.Mock() 246 write_fut = tr._write_fut = mock.Mock() 247 tr._force_close(None) 248 249 read_fut.cancel.assert_called_with() 250 write_fut.cancel.assert_called_with() 251 test_utils.run_briefly(self.loop) 252 self.protocol.connection_lost.assert_called_with(None) 253 self.assertEqual(None, tr._buffer) 254 self.assertEqual(tr._conn_lost, 1) 255 256 def test_loop_writing_force_close(self): 257 exc_handler = mock.Mock() 258 self.loop.set_exception_handler(exc_handler) 259 fut = asyncio.Future(loop=self.loop) 260 fut.set_result(1) 261 self.proactor.send.return_value = fut 262 263 tr = self.socket_transport() 264 tr.write(b'data') 265 tr._force_close(None) 266 test_utils.run_briefly(self.loop) 267 exc_handler.assert_not_called() 268 269 def test_force_close_idempotent(self): 270 tr = self.socket_transport() 271 tr._closing = True 272 tr._force_close(None) 273 test_utils.run_briefly(self.loop) 274 self.assertFalse(self.protocol.connection_lost.called) 275 276 def test_fatal_error_2(self): 277 tr = self.socket_transport() 278 tr._buffer = [b'data'] 279 tr._force_close(None) 280 281 test_utils.run_briefly(self.loop) 282 self.protocol.connection_lost.assert_called_with(None) 283 self.assertEqual(None, tr._buffer) 284 285 def test_call_connection_lost(self): 286 tr = self.socket_transport() 287 tr._call_connection_lost(None) 288 self.assertTrue(self.protocol.connection_lost.called) 289 self.assertTrue(self.sock.close.called) 290 291 def test_write_eof(self): 292 tr = self.socket_transport() 293 self.assertTrue(tr.can_write_eof()) 294 tr.write_eof() 295 self.sock.shutdown.assert_called_with(socket.SHUT_WR) 296 tr.write_eof() 297 self.assertEqual(self.sock.shutdown.call_count, 1) 298 tr.close() 299 300 def test_write_eof_buffer(self): 301 tr = self.socket_transport() 302 f = asyncio.Future(loop=self.loop) 303 tr._loop._proactor.send.return_value = f 304 tr.write(b'data') 305 tr.write_eof() 306 self.assertTrue(tr._eof_written) 307 self.assertFalse(self.sock.shutdown.called) 308 tr._loop._proactor.send.assert_called_with(self.sock, b'data') 309 f.set_result(4) 310 self.loop._run_once() 311 self.sock.shutdown.assert_called_with(socket.SHUT_WR) 312 tr.close() 313 314 def test_write_eof_write_pipe(self): 315 tr = _ProactorWritePipeTransport( 316 self.loop, self.sock, self.protocol) 317 self.assertTrue(tr.can_write_eof()) 318 tr.write_eof() 319 self.assertTrue(tr.is_closing()) 320 self.loop._run_once() 321 self.assertTrue(self.sock.close.called) 322 tr.close() 323 324 def test_write_eof_buffer_write_pipe(self): 325 tr = _ProactorWritePipeTransport(self.loop, self.sock, self.protocol) 326 f = asyncio.Future(loop=self.loop) 327 tr._loop._proactor.send.return_value = f 328 tr.write(b'data') 329 tr.write_eof() 330 self.assertTrue(tr.is_closing()) 331 self.assertFalse(self.sock.shutdown.called) 332 tr._loop._proactor.send.assert_called_with(self.sock, b'data') 333 f.set_result(4) 334 self.loop._run_once() 335 self.loop._run_once() 336 self.assertTrue(self.sock.close.called) 337 tr.close() 338 339 def test_write_eof_duplex_pipe(self): 340 tr = _ProactorDuplexPipeTransport( 341 self.loop, self.sock, self.protocol) 342 self.assertFalse(tr.can_write_eof()) 343 with self.assertRaises(NotImplementedError): 344 tr.write_eof() 345 close_transport(tr) 346 347 def test_pause_resume_reading(self): 348 tr = self.socket_transport() 349 futures = [] 350 for msg in [b'data1', b'data2', b'data3', b'data4', b'data5', b'']: 351 f = asyncio.Future(loop=self.loop) 352 f.set_result(msg) 353 futures.append(f) 354 355 self.loop._proactor.recv.side_effect = futures 356 self.loop._run_once() 357 self.assertFalse(tr._paused) 358 self.assertTrue(tr.is_reading()) 359 self.loop._run_once() 360 self.protocol.data_received.assert_called_with(b'data1') 361 self.loop._run_once() 362 self.protocol.data_received.assert_called_with(b'data2') 363 364 tr.pause_reading() 365 tr.pause_reading() 366 self.assertTrue(tr._paused) 367 self.assertFalse(tr.is_reading()) 368 for i in range(10): 369 self.loop._run_once() 370 self.protocol.data_received.assert_called_with(b'data2') 371 372 tr.resume_reading() 373 tr.resume_reading() 374 self.assertFalse(tr._paused) 375 self.assertTrue(tr.is_reading()) 376 self.loop._run_once() 377 self.protocol.data_received.assert_called_with(b'data3') 378 self.loop._run_once() 379 self.protocol.data_received.assert_called_with(b'data4') 380 381 tr.pause_reading() 382 tr.resume_reading() 383 self.loop.call_exception_handler = mock.Mock() 384 self.loop._run_once() 385 self.loop.call_exception_handler.assert_not_called() 386 self.protocol.data_received.assert_called_with(b'data5') 387 tr.close() 388 389 self.assertFalse(tr.is_reading()) 390 391 392 def pause_writing_transport(self, high): 393 tr = self.socket_transport() 394 tr.set_write_buffer_limits(high=high) 395 396 self.assertEqual(tr.get_write_buffer_size(), 0) 397 self.assertFalse(self.protocol.pause_writing.called) 398 self.assertFalse(self.protocol.resume_writing.called) 399 return tr 400 401 def test_pause_resume_writing(self): 402 tr = self.pause_writing_transport(high=4) 403 404 # write a large chunk, must pause writing 405 fut = asyncio.Future(loop=self.loop) 406 self.loop._proactor.send.return_value = fut 407 tr.write(b'large data') 408 self.loop._run_once() 409 self.assertTrue(self.protocol.pause_writing.called) 410 411 # flush the buffer 412 fut.set_result(None) 413 self.loop._run_once() 414 self.assertEqual(tr.get_write_buffer_size(), 0) 415 self.assertTrue(self.protocol.resume_writing.called) 416 417 def test_pause_writing_2write(self): 418 tr = self.pause_writing_transport(high=4) 419 420 # first short write, the buffer is not full (3 <= 4) 421 fut1 = asyncio.Future(loop=self.loop) 422 self.loop._proactor.send.return_value = fut1 423 tr.write(b'123') 424 self.loop._run_once() 425 self.assertEqual(tr.get_write_buffer_size(), 3) 426 self.assertFalse(self.protocol.pause_writing.called) 427 428 # fill the buffer, must pause writing (6 > 4) 429 tr.write(b'abc') 430 self.loop._run_once() 431 self.assertEqual(tr.get_write_buffer_size(), 6) 432 self.assertTrue(self.protocol.pause_writing.called) 433 434 def test_pause_writing_3write(self): 435 tr = self.pause_writing_transport(high=4) 436 437 # first short write, the buffer is not full (1 <= 4) 438 fut = asyncio.Future(loop=self.loop) 439 self.loop._proactor.send.return_value = fut 440 tr.write(b'1') 441 self.loop._run_once() 442 self.assertEqual(tr.get_write_buffer_size(), 1) 443 self.assertFalse(self.protocol.pause_writing.called) 444 445 # second short write, the buffer is not full (3 <= 4) 446 tr.write(b'23') 447 self.loop._run_once() 448 self.assertEqual(tr.get_write_buffer_size(), 3) 449 self.assertFalse(self.protocol.pause_writing.called) 450 451 # fill the buffer, must pause writing (6 > 4) 452 tr.write(b'abc') 453 self.loop._run_once() 454 self.assertEqual(tr.get_write_buffer_size(), 6) 455 self.assertTrue(self.protocol.pause_writing.called) 456 457 def test_dont_pause_writing(self): 458 tr = self.pause_writing_transport(high=4) 459 460 # write a large chunk which completes immediately, 461 # it should not pause writing 462 fut = asyncio.Future(loop=self.loop) 463 fut.set_result(None) 464 self.loop._proactor.send.return_value = fut 465 tr.write(b'very large data') 466 self.loop._run_once() 467 self.assertEqual(tr.get_write_buffer_size(), 0) 468 self.assertFalse(self.protocol.pause_writing.called) 469 470 471@unittest.skip('FIXME: bpo-33694: these tests are too close ' 472 'to the implementation and should be refactored or removed') 473class ProactorSocketTransportBufferedProtoTests(test_utils.TestCase): 474 475 def setUp(self): 476 super().setUp() 477 self.loop = self.new_test_loop() 478 self.addCleanup(self.loop.close) 479 self.proactor = mock.Mock() 480 self.loop._proactor = self.proactor 481 482 self.protocol = test_utils.make_test_protocol(asyncio.BufferedProtocol) 483 self.buf = bytearray(1) 484 self.protocol.get_buffer.side_effect = lambda hint: self.buf 485 486 self.sock = mock.Mock(socket.socket) 487 488 def socket_transport(self, waiter=None): 489 transport = _ProactorSocketTransport(self.loop, self.sock, 490 self.protocol, waiter=waiter) 491 self.addCleanup(close_transport, transport) 492 return transport 493 494 def test_ctor(self): 495 fut = asyncio.Future(loop=self.loop) 496 tr = self.socket_transport(waiter=fut) 497 test_utils.run_briefly(self.loop) 498 self.assertIsNone(fut.result()) 499 self.protocol.connection_made(tr) 500 self.proactor.recv_into.assert_called_with(self.sock, self.buf) 501 502 def test_loop_reading(self): 503 tr = self.socket_transport() 504 tr._loop_reading() 505 self.loop._proactor.recv_into.assert_called_with(self.sock, self.buf) 506 self.assertTrue(self.protocol.get_buffer.called) 507 self.assertFalse(self.protocol.buffer_updated.called) 508 self.assertFalse(self.protocol.eof_received.called) 509 510 def test_get_buffer_error(self): 511 transport = self.socket_transport() 512 transport._fatal_error = mock.Mock() 513 514 self.loop.call_exception_handler = mock.Mock() 515 self.protocol.get_buffer.side_effect = LookupError() 516 517 transport._loop_reading() 518 519 self.assertTrue(transport._fatal_error.called) 520 self.assertTrue(self.protocol.get_buffer.called) 521 self.assertFalse(self.protocol.buffer_updated.called) 522 523 def test_get_buffer_zerosized(self): 524 transport = self.socket_transport() 525 transport._fatal_error = mock.Mock() 526 527 self.loop.call_exception_handler = mock.Mock() 528 self.protocol.get_buffer.side_effect = lambda hint: bytearray(0) 529 530 transport._loop_reading() 531 532 self.assertTrue(transport._fatal_error.called) 533 self.assertTrue(self.protocol.get_buffer.called) 534 self.assertFalse(self.protocol.buffer_updated.called) 535 536 def test_proto_type_switch(self): 537 self.protocol = test_utils.make_test_protocol(asyncio.Protocol) 538 tr = self.socket_transport() 539 540 res = asyncio.Future(loop=self.loop) 541 res.set_result(b'data') 542 543 tr = self.socket_transport() 544 tr._read_fut = res 545 tr._loop_reading(res) 546 self.loop._proactor.recv.assert_called_with(self.sock, 32768) 547 self.protocol.data_received.assert_called_with(b'data') 548 549 # switch protocol to a BufferedProtocol 550 551 buf_proto = test_utils.make_test_protocol(asyncio.BufferedProtocol) 552 buf = bytearray(4) 553 buf_proto.get_buffer.side_effect = lambda hint: buf 554 555 tr.set_protocol(buf_proto) 556 test_utils.run_briefly(self.loop) 557 res = asyncio.Future(loop=self.loop) 558 res.set_result(4) 559 560 tr._read_fut = res 561 tr._loop_reading(res) 562 self.loop._proactor.recv_into.assert_called_with(self.sock, buf) 563 buf_proto.buffer_updated.assert_called_with(4) 564 565 @unittest.skip('FIXME: bpo-33694: this test is too close to the ' 566 'implementation and should be refactored or removed') 567 def test_proto_buf_switch(self): 568 tr = self.socket_transport() 569 test_utils.run_briefly(self.loop) 570 self.protocol.get_buffer.assert_called_with(-1) 571 572 # switch protocol to *another* BufferedProtocol 573 574 buf_proto = test_utils.make_test_protocol(asyncio.BufferedProtocol) 575 buf = bytearray(4) 576 buf_proto.get_buffer.side_effect = lambda hint: buf 577 tr._read_fut.done.side_effect = lambda: False 578 tr.set_protocol(buf_proto) 579 self.assertFalse(buf_proto.get_buffer.called) 580 test_utils.run_briefly(self.loop) 581 buf_proto.get_buffer.assert_called_with(-1) 582 583 def test_buffer_updated_error(self): 584 transport = self.socket_transport() 585 transport._fatal_error = mock.Mock() 586 587 self.loop.call_exception_handler = mock.Mock() 588 self.protocol.buffer_updated.side_effect = LookupError() 589 590 res = asyncio.Future(loop=self.loop) 591 res.set_result(10) 592 transport._read_fut = res 593 transport._loop_reading(res) 594 595 self.assertTrue(transport._fatal_error.called) 596 self.assertFalse(self.protocol.get_buffer.called) 597 self.assertTrue(self.protocol.buffer_updated.called) 598 599 def test_loop_eof_received_error(self): 600 res = asyncio.Future(loop=self.loop) 601 res.set_result(0) 602 603 self.protocol.eof_received.side_effect = LookupError() 604 605 tr = self.socket_transport() 606 tr._fatal_error = mock.Mock() 607 608 tr.close = mock.Mock() 609 tr._read_fut = res 610 tr._loop_reading(res) 611 self.assertFalse(self.loop._proactor.recv_into.called) 612 self.assertTrue(self.protocol.eof_received.called) 613 self.assertTrue(tr._fatal_error.called) 614 615 def test_loop_reading_data(self): 616 res = asyncio.Future(loop=self.loop) 617 res.set_result(4) 618 619 tr = self.socket_transport() 620 tr._read_fut = res 621 tr._loop_reading(res) 622 self.loop._proactor.recv_into.assert_called_with(self.sock, self.buf) 623 self.protocol.buffer_updated.assert_called_with(4) 624 625 def test_loop_reading_no_data(self): 626 res = asyncio.Future(loop=self.loop) 627 res.set_result(0) 628 629 tr = self.socket_transport() 630 self.assertRaises(AssertionError, tr._loop_reading, res) 631 632 tr.close = mock.Mock() 633 tr._read_fut = res 634 tr._loop_reading(res) 635 self.assertFalse(self.loop._proactor.recv_into.called) 636 self.assertTrue(self.protocol.eof_received.called) 637 self.assertTrue(tr.close.called) 638 639 def test_loop_reading_aborted(self): 640 err = self.loop._proactor.recv_into.side_effect = \ 641 ConnectionAbortedError() 642 643 tr = self.socket_transport() 644 tr._fatal_error = mock.Mock() 645 tr._loop_reading() 646 tr._fatal_error.assert_called_with( 647 err, 'Fatal read error on pipe transport') 648 649 def test_loop_reading_aborted_closing(self): 650 self.loop._proactor.recv.side_effect = ConnectionAbortedError() 651 652 tr = self.socket_transport() 653 tr._closing = True 654 tr._fatal_error = mock.Mock() 655 tr._loop_reading() 656 self.assertFalse(tr._fatal_error.called) 657 658 def test_loop_reading_aborted_is_fatal(self): 659 self.loop._proactor.recv_into.side_effect = ConnectionAbortedError() 660 tr = self.socket_transport() 661 tr._closing = False 662 tr._fatal_error = mock.Mock() 663 tr._loop_reading() 664 self.assertTrue(tr._fatal_error.called) 665 666 def test_loop_reading_conn_reset_lost(self): 667 err = self.loop._proactor.recv_into.side_effect = ConnectionResetError() 668 669 tr = self.socket_transport() 670 tr._closing = False 671 tr._fatal_error = mock.Mock() 672 tr._force_close = mock.Mock() 673 tr._loop_reading() 674 self.assertFalse(tr._fatal_error.called) 675 tr._force_close.assert_called_with(err) 676 677 def test_loop_reading_exception(self): 678 err = self.loop._proactor.recv_into.side_effect = OSError() 679 680 tr = self.socket_transport() 681 tr._fatal_error = mock.Mock() 682 tr._loop_reading() 683 tr._fatal_error.assert_called_with( 684 err, 'Fatal read error on pipe transport') 685 686 def test_pause_resume_reading(self): 687 tr = self.socket_transport() 688 futures = [] 689 for msg in [10, 20, 30, 40, 0]: 690 f = asyncio.Future(loop=self.loop) 691 f.set_result(msg) 692 futures.append(f) 693 694 self.loop._proactor.recv_into.side_effect = futures 695 self.loop._run_once() 696 self.assertFalse(tr._paused) 697 self.assertTrue(tr.is_reading()) 698 self.loop._run_once() 699 self.protocol.buffer_updated.assert_called_with(10) 700 self.loop._run_once() 701 self.protocol.buffer_updated.assert_called_with(20) 702 703 tr.pause_reading() 704 tr.pause_reading() 705 self.assertTrue(tr._paused) 706 self.assertFalse(tr.is_reading()) 707 for i in range(10): 708 self.loop._run_once() 709 self.protocol.buffer_updated.assert_called_with(20) 710 711 tr.resume_reading() 712 tr.resume_reading() 713 self.assertFalse(tr._paused) 714 self.assertTrue(tr.is_reading()) 715 self.loop._run_once() 716 self.protocol.buffer_updated.assert_called_with(30) 717 self.loop._run_once() 718 self.protocol.buffer_updated.assert_called_with(40) 719 tr.close() 720 721 self.assertFalse(tr.is_reading()) 722 723 724class BaseProactorEventLoopTests(test_utils.TestCase): 725 726 def setUp(self): 727 super().setUp() 728 729 self.sock = test_utils.mock_nonblocking_socket() 730 self.proactor = mock.Mock() 731 732 self.ssock, self.csock = mock.Mock(), mock.Mock() 733 734 with mock.patch('asyncio.proactor_events.socket.socketpair', 735 return_value=(self.ssock, self.csock)): 736 self.loop = BaseProactorEventLoop(self.proactor) 737 self.set_event_loop(self.loop) 738 739 @mock.patch.object(BaseProactorEventLoop, 'call_soon') 740 @mock.patch('asyncio.proactor_events.socket.socketpair') 741 def test_ctor(self, socketpair, call_soon): 742 ssock, csock = socketpair.return_value = ( 743 mock.Mock(), mock.Mock()) 744 loop = BaseProactorEventLoop(self.proactor) 745 self.assertIs(loop._ssock, ssock) 746 self.assertIs(loop._csock, csock) 747 self.assertEqual(loop._internal_fds, 1) 748 call_soon.assert_called_with(loop._loop_self_reading) 749 loop.close() 750 751 def test_close_self_pipe(self): 752 self.loop._close_self_pipe() 753 self.assertEqual(self.loop._internal_fds, 0) 754 self.assertTrue(self.ssock.close.called) 755 self.assertTrue(self.csock.close.called) 756 self.assertIsNone(self.loop._ssock) 757 self.assertIsNone(self.loop._csock) 758 759 # Don't call close(): _close_self_pipe() cannot be called twice 760 self.loop._closed = True 761 762 def test_close(self): 763 self.loop._close_self_pipe = mock.Mock() 764 self.loop.close() 765 self.assertTrue(self.loop._close_self_pipe.called) 766 self.assertTrue(self.proactor.close.called) 767 self.assertIsNone(self.loop._proactor) 768 769 self.loop._close_self_pipe.reset_mock() 770 self.loop.close() 771 self.assertFalse(self.loop._close_self_pipe.called) 772 773 def test_make_socket_transport(self): 774 tr = self.loop._make_socket_transport(self.sock, asyncio.Protocol()) 775 self.assertIsInstance(tr, _ProactorSocketTransport) 776 close_transport(tr) 777 778 def test_loop_self_reading(self): 779 self.loop._loop_self_reading() 780 self.proactor.recv.assert_called_with(self.ssock, 4096) 781 self.proactor.recv.return_value.add_done_callback.assert_called_with( 782 self.loop._loop_self_reading) 783 784 def test_loop_self_reading_fut(self): 785 fut = mock.Mock() 786 self.loop._loop_self_reading(fut) 787 self.assertTrue(fut.result.called) 788 self.proactor.recv.assert_called_with(self.ssock, 4096) 789 self.proactor.recv.return_value.add_done_callback.assert_called_with( 790 self.loop._loop_self_reading) 791 792 def test_loop_self_reading_exception(self): 793 self.loop.call_exception_handler = mock.Mock() 794 self.proactor.recv.side_effect = OSError() 795 self.loop._loop_self_reading() 796 self.assertTrue(self.loop.call_exception_handler.called) 797 798 def test_write_to_self(self): 799 self.loop._write_to_self() 800 self.csock.send.assert_called_with(b'\0') 801 802 def test_process_events(self): 803 self.loop._process_events([]) 804 805 @mock.patch('asyncio.base_events.logger') 806 def test_create_server(self, m_log): 807 pf = mock.Mock() 808 call_soon = self.loop.call_soon = mock.Mock() 809 810 self.loop._start_serving(pf, self.sock) 811 self.assertTrue(call_soon.called) 812 813 # callback 814 loop = call_soon.call_args[0][0] 815 loop() 816 self.proactor.accept.assert_called_with(self.sock) 817 818 # conn 819 fut = mock.Mock() 820 fut.result.return_value = (mock.Mock(), mock.Mock()) 821 822 make_tr = self.loop._make_socket_transport = mock.Mock() 823 loop(fut) 824 self.assertTrue(fut.result.called) 825 self.assertTrue(make_tr.called) 826 827 # exception 828 fut.result.side_effect = OSError() 829 loop(fut) 830 self.assertTrue(self.sock.close.called) 831 self.assertTrue(m_log.error.called) 832 833 def test_create_server_cancel(self): 834 pf = mock.Mock() 835 call_soon = self.loop.call_soon = mock.Mock() 836 837 self.loop._start_serving(pf, self.sock) 838 loop = call_soon.call_args[0][0] 839 840 # cancelled 841 fut = asyncio.Future(loop=self.loop) 842 fut.cancel() 843 loop(fut) 844 self.assertTrue(self.sock.close.called) 845 846 def test_stop_serving(self): 847 sock1 = mock.Mock() 848 future1 = mock.Mock() 849 sock2 = mock.Mock() 850 future2 = mock.Mock() 851 self.loop._accept_futures = { 852 sock1.fileno(): future1, 853 sock2.fileno(): future2 854 } 855 856 self.loop._stop_serving(sock1) 857 self.assertTrue(sock1.close.called) 858 self.assertTrue(future1.cancel.called) 859 self.proactor._stop_serving.assert_called_with(sock1) 860 self.assertFalse(sock2.close.called) 861 self.assertFalse(future2.cancel.called) 862 863 864@unittest.skipIf(sys.platform != 'win32', 865 'Proactor is supported on Windows only') 866class ProactorEventLoopUnixSockSendfileTests(test_utils.TestCase): 867 DATA = b"12345abcde" * 16 * 1024 # 160 KiB 868 869 class MyProto(asyncio.Protocol): 870 871 def __init__(self, loop): 872 self.started = False 873 self.closed = False 874 self.data = bytearray() 875 self.fut = loop.create_future() 876 self.transport = None 877 878 def connection_made(self, transport): 879 self.started = True 880 self.transport = transport 881 882 def data_received(self, data): 883 self.data.extend(data) 884 885 def connection_lost(self, exc): 886 self.closed = True 887 self.fut.set_result(None) 888 889 async def wait_closed(self): 890 await self.fut 891 892 @classmethod 893 def setUpClass(cls): 894 with open(support.TESTFN, 'wb') as fp: 895 fp.write(cls.DATA) 896 super().setUpClass() 897 898 @classmethod 899 def tearDownClass(cls): 900 support.unlink(support.TESTFN) 901 super().tearDownClass() 902 903 def setUp(self): 904 self.loop = asyncio.ProactorEventLoop() 905 self.set_event_loop(self.loop) 906 self.addCleanup(self.loop.close) 907 self.file = open(support.TESTFN, 'rb') 908 self.addCleanup(self.file.close) 909 super().setUp() 910 911 def make_socket(self, cleanup=True): 912 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 913 sock.setblocking(False) 914 sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1024) 915 sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024) 916 if cleanup: 917 self.addCleanup(sock.close) 918 return sock 919 920 def run_loop(self, coro): 921 return self.loop.run_until_complete(coro) 922 923 def prepare(self): 924 sock = self.make_socket() 925 proto = self.MyProto(self.loop) 926 port = support.find_unused_port() 927 srv_sock = self.make_socket(cleanup=False) 928 srv_sock.bind(('127.0.0.1', port)) 929 server = self.run_loop(self.loop.create_server( 930 lambda: proto, sock=srv_sock)) 931 self.run_loop(self.loop.sock_connect(sock, srv_sock.getsockname())) 932 933 def cleanup(): 934 if proto.transport is not None: 935 # can be None if the task was cancelled before 936 # connection_made callback 937 proto.transport.close() 938 self.run_loop(proto.wait_closed()) 939 940 server.close() 941 self.run_loop(server.wait_closed()) 942 943 self.addCleanup(cleanup) 944 945 return sock, proto 946 947 def test_sock_sendfile_not_a_file(self): 948 sock, proto = self.prepare() 949 f = object() 950 with self.assertRaisesRegex(events.SendfileNotAvailableError, 951 "not a regular file"): 952 self.run_loop(self.loop._sock_sendfile_native(sock, f, 953 0, None)) 954 self.assertEqual(self.file.tell(), 0) 955 956 def test_sock_sendfile_iobuffer(self): 957 sock, proto = self.prepare() 958 f = io.BytesIO() 959 with self.assertRaisesRegex(events.SendfileNotAvailableError, 960 "not a regular file"): 961 self.run_loop(self.loop._sock_sendfile_native(sock, f, 962 0, None)) 963 self.assertEqual(self.file.tell(), 0) 964 965 def test_sock_sendfile_not_regular_file(self): 966 sock, proto = self.prepare() 967 f = mock.Mock() 968 f.fileno.return_value = -1 969 with self.assertRaisesRegex(events.SendfileNotAvailableError, 970 "not a regular file"): 971 self.run_loop(self.loop._sock_sendfile_native(sock, f, 972 0, None)) 973 self.assertEqual(self.file.tell(), 0) 974 975 976if __name__ == '__main__': 977 unittest.main() 978