1#!/usr/bin/env python 2 3# Copyright (c) 2013 The Chromium OS Authors. All rights reserved. 4# Use of this source code is governed by a BSD-style license that can be 5# found in the LICENSE file. 6 7import at_transceiver 8 9import logging 10import mox 11import os 12import unittest 13 14import at_channel 15import modem_configuration 16import task_loop 17import wardmodem_exceptions as wme 18 19class ATTransceiverTestCase(unittest.TestCase): 20 """ 21 Base test fixture for ATTransceiver class. 22 23 """ 24 class TestMachine(object): 25 """ Stub test machine used by tests below. """ 26 def test_function(self, _): 27 """ 28 A stub StateMachine API function. 29 30 wardmodem calls will be placed to this function. 31 32 @param _: Ignored. 33 34 """ 35 pass 36 37 38 # Needed in a test machine. 39 def get_well_known_name(self): 40 """ Get the well known name of this machine as str. """ 41 return "test_machine" 42 43 44 def setUp(self): 45 self._mox = mox.Mox() 46 47 # Create a temporary pty pair for the ATTransceiver constructor 48 master, slave = os.openpty() 49 50 self._modem_conf = modem_configuration.ModemConfiguration() 51 self._at_transceiver = at_transceiver.ATTransceiver(slave, 52 self._modem_conf, 53 slave) 54 55 # Now replace internal objects in _at_transceiver with mocks 56 self._at_transceiver._modem_response_timeout_milliseconds = 0 57 self._mock_modem_channel = self._mox.CreateMock(at_channel.ATChannel) 58 self._at_transceiver._modem_channel = self._mock_modem_channel 59 self._mock_mm_channel = self._mox.CreateMock(at_channel.ATChannel) 60 self._at_transceiver._mm_channel = self._mock_mm_channel 61 self._mock_task_loop = self._mox.CreateMock(task_loop.TaskLoop) 62 self._at_transceiver._task_loop = self._mock_task_loop 63 64 # Also empty out the internal maps, so that actual loaded configuration 65 # does not interfere with the test. 66 self._at_transceiver._at_to_wm_action_map = {} 67 self._at_transceiver._wm_response_to_at_map = {} 68 69 70class ATTransceiverCommonTestCase(ATTransceiverTestCase): 71 """ 72 Tests common to all three modes of ATTransceiver. 73 74 """ 75 76 def test_successful_mode_selection(self): 77 """ 78 Test that all modes can be selected, when both channels are provided. 79 80 """ 81 self._at_transceiver.mode = at_transceiver.ATTransceiverMode.WARDMODEM 82 self.assertEqual(self._at_transceiver.mode, 83 at_transceiver.ATTransceiverMode.WARDMODEM) 84 self._at_transceiver.mode = ( 85 at_transceiver.ATTransceiverMode.PASS_THROUGH) 86 self.assertEqual(self._at_transceiver.mode, 87 at_transceiver.ATTransceiverMode.PASS_THROUGH) 88 self._at_transceiver.mode = ( 89 at_transceiver.ATTransceiverMode.SPLIT_VERIFY) 90 self.assertEqual(self._at_transceiver.mode, 91 at_transceiver.ATTransceiverMode.SPLIT_VERIFY) 92 93 def test_unsuccessful_mode_selection(self): 94 """ 95 Test that only WARDMODEM mode can be selected if the modem channel is 96 missing. 97 98 """ 99 self._at_transceiver._modem_channel = None 100 self._at_transceiver.mode = at_transceiver.ATTransceiverMode.WARDMODEM 101 self.assertEqual(self._at_transceiver.mode, 102 at_transceiver.ATTransceiverMode.WARDMODEM) 103 self._at_transceiver.mode = ( 104 at_transceiver.ATTransceiverMode.PASS_THROUGH) 105 self.assertEqual(self._at_transceiver.mode, 106 at_transceiver.ATTransceiverMode.WARDMODEM) 107 self._at_transceiver.mode = ( 108 at_transceiver.ATTransceiverMode.SPLIT_VERIFY) 109 self.assertEqual(self._at_transceiver.mode, 110 at_transceiver.ATTransceiverMode.WARDMODEM) 111 112 113 def test_update_at_to_wm_action_map(self): 114 """ 115 Test that _at_to_wm_action_map is updated correctly under different 116 scenarios. 117 118 """ 119 # The diffs if this test fails can be rather long. 120 self.maxDiff = None 121 self._at_transceiver._at_to_wm_action_map = {} 122 123 # Test initialization 124 raw_map = {'AT1=': ('STATE_MACHINE1', 'function1'), 125 'AT2=1,2': ('STATE_MACHINE2', 'function2'), 126 'AT3=*,care,do': ('STATE_MACHINE3', 'function3', (0, 1)), 127 'AT4?': ('STATE_MACHINE4', 'function4'), 128 'AT5=': ('STATE_MACHINE5', 'function5', ()), 129 'AT5=*': ('STATE_MACHINE6', 'function6')} 130 parsed_map = {'AT1=': {(): ('STATE_MACHINE1', 'function1', ())}, 131 'AT2=': {('1','2'): ('STATE_MACHINE2', 'function2', ())}, 132 'AT3=': {('*','care','do'): ('STATE_MACHINE3', 133 'function3', (0, 1))}, 134 'AT4?': {(): ('STATE_MACHINE4', 'function4', ())}, 135 'AT5=': {(): ('STATE_MACHINE5', 'function5', ()), 136 ('*',): ('STATE_MACHINE6', 'function6', ())}} 137 138 self._at_transceiver._update_at_to_wm_action_map(raw_map) 139 self.assertEqual(parsed_map, self._at_transceiver._at_to_wm_action_map) 140 141 # Test update 142 raw_good_update = {'AT1=': ('STATE_MACHINE7', 'function7'), 143 'AT5=2': ('STATE_MACHINE8', 'function8', 0), 144 'AT6?': ('STATE_MACHINE9', 'function9')} 145 parsed_map = {'AT1=': {(): ('STATE_MACHINE7', 'function7', ())}, 146 'AT2=': {('1','2'): ('STATE_MACHINE2', 'function2', ())}, 147 'AT3=': {('*','care','do'): ('STATE_MACHINE3', 148 'function3', (0, 1))}, 149 'AT4?': {(): ('STATE_MACHINE4', 'function4', ())}, 150 'AT5=': {(): ('STATE_MACHINE5', 'function5', ()), 151 ('*',): ('STATE_MACHINE6', 'function6', ()), 152 ('2',): ('STATE_MACHINE8', 'function8', (0,))}, 153 'AT6?': {(): ('STATE_MACHINE9', 'function9', ())}} 154 self._at_transceiver._update_at_to_wm_action_map(raw_good_update) 155 self.assertEqual(parsed_map, self._at_transceiver._at_to_wm_action_map) 156 157 158 def test_find_wardmodem_action_for_at(self): 159 """ 160 Setup _at_to_wm_action_map in the test and then test whether we can find 161 actions for AT commands off of that map. 162 163 """ 164 raw_map = {'AT1=': ('STATE_MACHINE1', 'function1'), 165 'AT2=1,2': ('STATE_MACHINE2', 'function2'), 166 'AT3=*,b,c': ('STATE_MACHINE3', 'function3', (0, 1)), 167 'AT4?': ('STATE_MACHINE4', 'function4'), 168 'AT5=': ('STATE_MACHINE5', 'function5', ()), 169 'AT5=*': ('STATE_MACHINE6', 'function6')} 170 self._at_transceiver._update_at_to_wm_action_map(raw_map) 171 172 self.assertEqual( 173 ('STATE_MACHINE1', 'function1', ()), 174 self._at_transceiver._find_wardmodem_action_for_at('AT1=')) 175 self.assertEqual( 176 ('STATE_MACHINE2', 'function2', ()), 177 self._at_transceiver._find_wardmodem_action_for_at('AT2=1,2')) 178 self.assertEqual( 179 ('STATE_MACHINE3', 'function3', ('a','b')), 180 self._at_transceiver._find_wardmodem_action_for_at('AT3=a,b,c')) 181 self.assertEqual( 182 ('STATE_MACHINE3', 'function3', ('','b')), 183 self._at_transceiver._find_wardmodem_action_for_at('AT3=,b,c')) 184 self.assertEqual( 185 ('STATE_MACHINE5', 'function5', ()), 186 self._at_transceiver._find_wardmodem_action_for_at('AT5=')) 187 self.assertEqual( 188 ('STATE_MACHINE6', 'function6', ()), 189 self._at_transceiver._find_wardmodem_action_for_at('AT5=s')) 190 # Unsuccessful cases 191 self.assertRaises( 192 wme.ATTransceiverException, 193 self._at_transceiver._find_wardmodem_action_for_at, 194 'DOESNOTEXIST') 195 196 197 def test_find_wardmodem_action_for_at_returns_fallback(self): 198 """ 199 Test that when a fallback machine is setup, and unmatched AT command is 200 forwarded to this machine. 201 202 """ 203 mock_test_machine = self._mox.CreateMock(self.TestMachine) 204 mock_test_machine.get_well_known_name().MultipleTimes().AndReturn( 205 'FALLBACK_MACHINE') 206 self._mox.ReplayAll() 207 self._at_transceiver.register_state_machine(mock_test_machine) 208 self._at_transceiver.register_fallback_state_machine( 209 mock_test_machine.get_well_known_name(), 210 'act_on') 211 self.assertEqual( 212 ('FALLBACK_MACHINE', 'act_on', ('DOESNOTEXIST',)), 213 self._at_transceiver._find_wardmodem_action_for_at( 214 'DOESNOTEXIST')) 215 self._mox.VerifyAll() 216 217 218 def test_post_wardmodem_request(self): 219 """ 220 Test that a wardmodem request can be posted successfully end-to-end. 221 222 """ 223 raw_map = {'AT=*': ('TestMachine', 'test_function', 0)} 224 arg = 'fake_arg' 225 command = 'AT=' + arg 226 mock_test_machine = self._mox.CreateMock(self.TestMachine) 227 self._at_transceiver._update_at_to_wm_action_map(raw_map) 228 mock_test_machine.get_well_known_name().AndReturn('TestMachine') 229 self._mock_task_loop.post_task( 230 self._at_transceiver._execute_state_machine_function, 231 command, mox.IgnoreArg(), mock_test_machine.test_function, 232 arg) 233 234 self._mox.ReplayAll() 235 self._at_transceiver.register_state_machine(mock_test_machine) 236 self._at_transceiver._post_wardmodem_request(command) 237 self._mox.VerifyAll() 238 239 240 def test_update_wm_response_to_at_map(self): 241 """ 242 Test that the wm_response_to_at_map is correctly updated. 243 244 """ 245 raw_map = {'some_function': 'AT=some_function', 246 'some_other_function': 'AT=some_other_function'} 247 self._at_transceiver._update_wm_response_to_at_map(raw_map) 248 self.assertEqual(raw_map, 249 self._at_transceiver._wm_response_to_at_map) 250 251 raw_map = {'some_other_function': 'AT=overwritten_function', 252 'some_new_function': 'AT=this_is_new_too'} 253 updated_map = {'some_function': 'AT=some_function', 254 'some_other_function': 'AT=overwritten_function', 255 'some_new_function': 'AT=this_is_new_too'} 256 self._at_transceiver._update_wm_response_to_at_map(raw_map) 257 self.assertEqual(updated_map, 258 self._at_transceiver._wm_response_to_at_map) 259 260 261 def test_construct_at_response(self): 262 """ 263 Test that construct_at_response correctly replaces by actual arguments. 264 265 """ 266 self.assertEqual( 267 'AT=arg1,some,arg2', 268 self._at_transceiver._construct_at_response( 269 'AT=*,some,*', 'arg1','arg2')) 270 self.assertEqual( 271 'AT=1,some,thing', 272 self._at_transceiver._construct_at_response( 273 'AT=*,some,thing', 1)) 274 self.assertEqual( 275 'AT=some,other,thing', 276 self._at_transceiver._construct_at_response( 277 'AT=some,other,thing')) 278 self.assertEqual( 279 'AT=needsnone', 280 self._at_transceiver._construct_at_response( 281 'AT=needsnone', 'butonegiven')) 282 # Unsuccessful cases 283 self.assertRaises( 284 wme.ATTransceiverException, 285 self._at_transceiver._construct_at_response, 286 'AT=*,needstwo,*', 'onlyonegiven') 287 288 289 def test_process_wardmodem_response(self): 290 """ 291 A basic test for process_wardmodem_response. 292 293 """ 294 self._mox.StubOutWithMock(self._at_transceiver, 295 '_process_wardmodem_at_command') 296 raw_map = {'func1': 'AT=*,given,*', 297 'func2': 'AT=nothing,needed'} 298 self._at_transceiver._update_wm_response_to_at_map(raw_map) 299 300 self._at_transceiver._process_wardmodem_at_command('AT=a,given,2') 301 self._at_transceiver._process_wardmodem_at_command('AT=nothing,needed') 302 303 self._mox.ReplayAll() 304 self._at_transceiver.process_wardmodem_response('func1','a',2) 305 self._at_transceiver.process_wardmodem_response('func2') 306 self._mox.UnsetStubs() 307 self._mox.VerifyAll() 308 309 310class ATTransceiverWardModemTestCase(ATTransceiverTestCase): 311 """ 312 Test ATTransceiver class in the WARDMODEM mode. 313 314 """ 315 316 def setUp(self): 317 super(ATTransceiverWardModemTestCase, self).setUp() 318 self._at_transceiver.mode = at_transceiver.ATTransceiverMode.WARDMODEM 319 320 321 def test_wardmodem_at_command(self): 322 """ 323 Test the case when AT command is received from wardmodem. 324 325 """ 326 at_command = 'AT+commmmmmmmmand' 327 self._mock_mm_channel.send(at_command) 328 329 self._mox.ReplayAll() 330 self._at_transceiver._process_wardmodem_at_command(at_command) 331 self._mox.VerifyAll() 332 333 334 def test_mm_at_command(self): 335 """ 336 Test the case when AT command is received from modem manager. 337 338 """ 339 at_command = 'AT+commmmmmmmmand' 340 self._mox.StubOutWithMock(self._at_transceiver, 341 '_post_wardmodem_request') 342 343 self._at_transceiver._post_wardmodem_request(at_command) 344 345 self._mox.ReplayAll() 346 self._at_transceiver._process_mm_at_command(at_command) 347 self._mox.UnsetStubs() 348 self._mox.VerifyAll() 349 350 351class ATTransceiverPassThroughTestCase(ATTransceiverTestCase): 352 """ 353 Test ATTransceiver class in the PASS_THROUGH mode. 354 355 """ 356 357 def setUp(self): 358 super(ATTransceiverPassThroughTestCase, self).setUp() 359 self._at_transceiver.mode = ( 360 at_transceiver.ATTransceiverMode.PASS_THROUGH) 361 362 363 def test_modem_at_command(self): 364 """ 365 Test the case when AT command received from physical modem. 366 367 """ 368 at_command = 'AT+commmmmmmmmand' 369 self._mock_mm_channel.send(at_command) 370 371 self._mox.ReplayAll() 372 self._at_transceiver._process_modem_at_command(at_command) 373 self._mox.VerifyAll() 374 375 376 def test_mm_at_command(self): 377 """ 378 Test the case when AT command is received from modem manager. 379 380 """ 381 at_command = 'AT+commmmmmmmmand' 382 self._mock_modem_channel.send(at_command) 383 384 self._mox.ReplayAll() 385 self._at_transceiver._process_mm_at_command(at_command) 386 self._mox.VerifyAll() 387 388 389class ATTransceiverSplitVerifyTestCase(ATTransceiverTestCase): 390 """ 391 Test ATTransceiver class in the SPLIT_VERIFY mode. 392 393 """ 394 395 def setUp(self): 396 super(ATTransceiverSplitVerifyTestCase, self).setUp() 397 self._at_transceiver.mode = ( 398 at_transceiver.ATTransceiverMode.SPLIT_VERIFY) 399 400 401 def test_mm_at_command(self): 402 """ 403 Test that that incoming modem manager command is multiplexed to 404 wardmodem and physical modem. 405 406 """ 407 at_command = 'AT+commmmmmmmmand' 408 self._mox.StubOutWithMock(self._at_transceiver, 409 '_post_wardmodem_request') 410 self._mock_modem_channel.send(at_command).InAnyOrder() 411 self._at_transceiver._post_wardmodem_request(at_command).InAnyOrder() 412 413 self._mox.ReplayAll() 414 self._at_transceiver._process_mm_at_command(at_command) 415 self._mox.UnsetStubs() 416 self._mox.VerifyAll() 417 418 419 def test_successful_single_at_response_modem_wardmodem(self): 420 """ 421 Test the case when one AT response is received successfully. 422 In this case, physical modem command comes first. 423 424 """ 425 at_command = 'AT+commmmmmmmmand' 426 self._mock_mm_channel.send(at_command) 427 428 self._mox.ReplayAll() 429 self._at_transceiver._process_modem_at_command(at_command) 430 self._at_transceiver._process_wardmodem_at_command(at_command) 431 self._mox.VerifyAll() 432 433 434 def test_successful_single_at_response_wardmodem_modem(self): 435 """ 436 Test the case when one AT response is received successfully. 437 In this case, wardmodem command comes first. 438 439 """ 440 at_command = 'AT+commmmmmmmmand' 441 task_id = 3 442 self._mock_task_loop.post_task_after_delay( 443 self._at_transceiver._modem_response_timed_out, 444 mox.IgnoreArg()).AndReturn(task_id) 445 self._mock_task_loop.cancel_posted_task(task_id) 446 self._mock_mm_channel.send(at_command) 447 448 self._mox.ReplayAll() 449 self._at_transceiver._process_wardmodem_at_command(at_command) 450 self._at_transceiver._process_modem_at_command(at_command) 451 self._mox.VerifyAll() 452 453 def test_mismatched_at_response(self): 454 """ 455 Test the case when both responses arrive, but are not identical. 456 457 """ 458 wardmodem_command = 'AT+wardmodem' 459 modem_command = 'AT+modem' 460 self._mox.StubOutWithMock(self._at_transceiver, 461 '_report_verification_failure') 462 self._at_transceiver._report_verification_failure( 463 self._at_transceiver.VERIFICATION_FAILED_MISMATCH, 464 modem_command, 465 wardmodem_command) 466 self._mock_mm_channel.send(wardmodem_command) 467 468 self._mox.ReplayAll() 469 self._at_transceiver._process_modem_at_command(modem_command) 470 self._at_transceiver._process_wardmodem_at_command(wardmodem_command) 471 self._mox.UnsetStubs() 472 self._mox.VerifyAll() 473 474 475 def test_modem_response_times_out(self): 476 """ 477 Test the case when the physical modem fails to respond. 478 479 """ 480 at_command = 'AT+commmmmmmmmand' 481 task_id = 3 482 self._mox.StubOutWithMock(self._at_transceiver, 483 '_report_verification_failure') 484 485 self._mock_task_loop.post_task_after_delay( 486 self._at_transceiver._modem_response_timed_out, 487 mox.IgnoreArg()).AndReturn(task_id) 488 self._at_transceiver._report_verification_failure( 489 self._at_transceiver.VERIFICATION_FAILED_TIME_OUT, 490 None, 491 at_command) 492 self._mock_mm_channel.send(at_command) 493 494 self._mox.ReplayAll() 495 self._at_transceiver._process_wardmodem_at_command(at_command) 496 self._at_transceiver._modem_response_timed_out() 497 self._mox.UnsetStubs() 498 self._mox.VerifyAll() 499 500 501 def test_multiple_successful_responses(self): 502 """ 503 Test the case two wardmodem responses are queued, and then two matching 504 modem responses are received. 505 506 """ 507 first_at_command = 'AT+first' 508 second_at_command = 'AT+second' 509 first_task_id = 3 510 second_task_id = 4 511 512 self._mock_task_loop.post_task_after_delay( 513 self._at_transceiver._modem_response_timed_out, 514 mox.IgnoreArg()).AndReturn(first_task_id) 515 self._mock_task_loop.cancel_posted_task(first_task_id) 516 self._mock_mm_channel.send(first_at_command) 517 self._mock_task_loop.post_task_after_delay( 518 self._at_transceiver._modem_response_timed_out, 519 mox.IgnoreArg()).AndReturn(second_task_id) 520 self._mock_task_loop.cancel_posted_task(second_task_id) 521 self._mock_mm_channel.send(second_at_command) 522 523 self._mox.ReplayAll() 524 self._at_transceiver._process_wardmodem_at_command(first_at_command) 525 self._at_transceiver._process_wardmodem_at_command(second_at_command) 526 self._at_transceiver._process_modem_at_command(first_at_command) 527 self._at_transceiver._process_modem_at_command(second_at_command) 528 self._mox.VerifyAll() 529 530 531if __name__ == '__main__': 532 logging.basicConfig(level=logging.DEBUG) 533 unittest.main() 534