1"""Tests for transports.py.""" 2 3import unittest 4from unittest import mock 5 6import asyncio 7from asyncio import transports 8 9 10class TransportTests(unittest.TestCase): 11 12 def test_ctor_extra_is_none(self): 13 transport = asyncio.Transport() 14 self.assertEqual(transport._extra, {}) 15 16 def test_get_extra_info(self): 17 transport = asyncio.Transport({'extra': 'info'}) 18 self.assertEqual('info', transport.get_extra_info('extra')) 19 self.assertIsNone(transport.get_extra_info('unknown')) 20 21 default = object() 22 self.assertIs(default, transport.get_extra_info('unknown', default)) 23 24 def test_writelines(self): 25 transport = asyncio.Transport() 26 transport.write = mock.Mock() 27 28 transport.writelines([b'line1', 29 bytearray(b'line2'), 30 memoryview(b'line3')]) 31 self.assertEqual(1, transport.write.call_count) 32 transport.write.assert_called_with(b'line1line2line3') 33 34 def test_not_implemented(self): 35 transport = asyncio.Transport() 36 37 self.assertRaises(NotImplementedError, 38 transport.set_write_buffer_limits) 39 self.assertRaises(NotImplementedError, transport.get_write_buffer_size) 40 self.assertRaises(NotImplementedError, transport.write, 'data') 41 self.assertRaises(NotImplementedError, transport.write_eof) 42 self.assertRaises(NotImplementedError, transport.can_write_eof) 43 self.assertRaises(NotImplementedError, transport.pause_reading) 44 self.assertRaises(NotImplementedError, transport.resume_reading) 45 self.assertRaises(NotImplementedError, transport.close) 46 self.assertRaises(NotImplementedError, transport.abort) 47 48 def test_dgram_not_implemented(self): 49 transport = asyncio.DatagramTransport() 50 51 self.assertRaises(NotImplementedError, transport.sendto, 'data') 52 self.assertRaises(NotImplementedError, transport.abort) 53 54 def test_subprocess_transport_not_implemented(self): 55 transport = asyncio.SubprocessTransport() 56 57 self.assertRaises(NotImplementedError, transport.get_pid) 58 self.assertRaises(NotImplementedError, transport.get_returncode) 59 self.assertRaises(NotImplementedError, transport.get_pipe_transport, 1) 60 self.assertRaises(NotImplementedError, transport.send_signal, 1) 61 self.assertRaises(NotImplementedError, transport.terminate) 62 self.assertRaises(NotImplementedError, transport.kill) 63 64 def test_flowcontrol_mixin_set_write_limits(self): 65 66 class MyTransport(transports._FlowControlMixin, 67 transports.Transport): 68 69 def get_write_buffer_size(self): 70 return 512 71 72 loop = mock.Mock() 73 transport = MyTransport(loop=loop) 74 transport._protocol = mock.Mock() 75 76 self.assertFalse(transport._protocol_paused) 77 78 with self.assertRaisesRegex(ValueError, 'high.*must be >= low'): 79 transport.set_write_buffer_limits(high=0, low=1) 80 81 transport.set_write_buffer_limits(high=1024, low=128) 82 self.assertFalse(transport._protocol_paused) 83 self.assertEqual(transport.get_write_buffer_limits(), (128, 1024)) 84 85 transport.set_write_buffer_limits(high=256, low=128) 86 self.assertTrue(transport._protocol_paused) 87 self.assertEqual(transport.get_write_buffer_limits(), (128, 256)) 88 89 90if __name__ == '__main__': 91 unittest.main() 92