1# Copyright 2017 The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Unit test for atft."""
16import types
17import unittest
18
19import atft
20from atftman import ProvisionStatus
21from atftman import ProvisionState
22import fastboot_exceptions
23from mock import call
24from mock import MagicMock
25from mock import patch
26import wx
27
28
29class MockAtft(atft.Atft):
30
31  def __init__(self):
32    self.InitializeUI = MagicMock()
33    self.StartRefreshingDevices = MagicMock()
34    self.ChooseProduct = MagicMock()
35    self.CreateAtftManager = MagicMock()
36    self.CreateAtftLog = MagicMock()
37    self.ParseConfigFile = self._MockParseConfig
38    self._SendPrintEvent = MagicMock()
39    atft.Atft.__init__(self)
40
41  def _MockParseConfig(self):
42    self.ATFT_VERSION = 'vTest'
43    self.COMPATIBLE_ATFA_VERSION = 'v1'
44    self.DEVICE_REFRESH_INTERVAL = 1.0
45    self.DEFAULT_KEY_THRESHOLD = 0
46    self.LOG_DIR = 'test_log_dir'
47    self.LOG_SIZE = 1000
48    self.LOG_FILE_NUMBER = 2
49    self.LANGUAGE = 'ENG'
50    self.REBOOT_TIMEOUT = 1.0
51    self.PRODUCT_ATTRIBUTE_FILE_EXTENSION = '*.atpa'
52
53    return {}
54
55
56class TestDeviceInfo(object):
57
58  def __init__(self, serial_number, location=None, provision_status=None):
59    self.serial_number = serial_number
60    self.location = location
61    self.provision_status = provision_status
62    self.provision_state = ProvisionState()
63    self.time_set = False
64
65  def __eq__(self, other):
66    return (self.serial_number == other.serial_number and
67            self.location == other.location)
68
69  def __ne__(self, other):
70    return not self.__eq__(other)
71
72  def Copy(self):
73    return TestDeviceInfo(self.serial_number, self.location,
74                          self.provision_status)
75
76
77class AtftTest(unittest.TestCase):
78  TEST_SERIAL1 = 'test-serial1'
79  TEST_LOCATION1 = 'test-location1'
80  TEST_SERIAL2 = 'test-serial2'
81  TEST_LOCATION2 = 'test-location2'
82  TEST_SERIAL3 = 'test-serial3'
83  TEST_SERIAL4 = 'test-serial4'
84  TEST_TEXT = 'test-text'
85  TEST_TEXT2 = 'test-text2'
86
87  def setUp(self):
88    self.test_target_devs = []
89    self.test_dev1 = TestDeviceInfo(
90        self.TEST_SERIAL1, self.TEST_LOCATION1, ProvisionStatus.IDLE)
91    self.test_dev2 = TestDeviceInfo(
92        self.TEST_SERIAL2, self.TEST_LOCATION2, ProvisionStatus.IDLE)
93    self.test_text_window = ''
94    self.atfa_keys = None
95    self.device_map = {}
96    # Disable the test mode. (This mode is just for usage test, not unit test)
97    atft.TEST_MODE = False
98
99  def AppendTargetDevice(self, device):
100    self.test_target_devs.append(device)
101
102  def DeleteAllItems(self):
103    self.test_target_devs = []
104
105  # Test atft._DeviceListedEventHandler
106  def testDeviceListedEventHandler(self):
107    mock_atft = MockAtft()
108    mock_atft.atfa_devs_output = MagicMock()
109    mock_atft.last_target_list = []
110    mock_atft.target_devs_output = MagicMock()
111    mock_atft.atft_manager = MagicMock()
112    mock_atft.atft_manager.atfa_dev = None
113    mock_atft.PrintToWindow = MagicMock()
114    mock_atft._HandleKeysLeft = MagicMock()
115    mock_atft.target_devs_output.Append.side_effect = self.AppendTargetDevice
116    (mock_atft.target_devs_output.DeleteAllItems.side_effect
117    ) = self.DeleteAllItems
118    mock_atft.atft_manager.target_devs = []
119    mock_atft._DeviceListedEventHandler(None)
120    mock_atft.atft_manager.target_devs = [self.test_dev1]
121    mock_atft._DeviceListedEventHandler(None)
122    mock_atft.target_devs_output.Append.assert_called_once()
123    self.assertEqual(1, len(self.test_target_devs))
124    self.assertEqual(self.test_dev1.serial_number, self.test_target_devs[0][0])
125    mock_atft.atft_manager.target_devs = [self.test_dev1, self.test_dev2]
126    mock_atft._DeviceListedEventHandler(None)
127    self.assertEqual(2, len(self.test_target_devs))
128    self.assertEqual(self.test_dev2.serial_number, self.test_target_devs[1][0])
129    mock_atft.atft_manager.target_devs = [self.test_dev1, self.test_dev2]
130    mock_atft.target_devs_output.Append.reset_mock()
131    mock_atft._DeviceListedEventHandler(None)
132    mock_atft.target_devs_output.Append.assert_not_called()
133    mock_atft.atft_manager.target_devs = [self.test_dev2, self.test_dev1]
134    mock_atft.target_devs_output.Append.reset_mock()
135    mock_atft._DeviceListedEventHandler(None)
136    mock_atft.target_devs_output.Append.assert_called()
137    self.assertEqual(2, len(self.test_target_devs))
138    mock_atft.atft_manager.target_devs = [self.test_dev2]
139    mock_atft._DeviceListedEventHandler(None)
140    self.assertEqual(1, len(self.test_target_devs))
141    self.assertEqual(self.test_dev2.serial_number, self.test_target_devs[0][0])
142    mock_atft.atft_manager.target_devs = []
143    mock_atft._DeviceListedEventHandler(None)
144    self.assertEqual(0, len(self.test_target_devs))
145
146  # Test atft._SelectFileEventHandler
147  @patch('wx.FileDialog')
148  def testSelectFileEventHandler(self, mock_file_dialog):
149    mock_atft = MockAtft()
150    mock_event = MagicMock()
151    mock_callback = MagicMock()
152    mock_dialog = MagicMock()
153    mock_instance = MagicMock()
154    mock_file_dialog.return_value = mock_instance
155    mock_instance.__enter__.return_value = mock_dialog
156    mock_event.GetValue.return_value = (mock_atft.SelectFileArg(
157        self.TEST_TEXT, self.TEST_TEXT2, mock_callback))
158    mock_dialog.ShowModal.return_value = wx.ID_OK
159    mock_dialog.GetPath.return_value = self.TEST_TEXT
160    mock_atft._SelectFileEventHandler(mock_event)
161    mock_callback.assert_called_once_with(self.TEST_TEXT)
162
163  @patch('wx.FileDialog')
164  def testSelectFileEventHandlerCancel(self, mock_file_dialog):
165    mock_atft = MockAtft()
166    mock_event = MagicMock()
167    mock_callback = MagicMock()
168    mock_dialog = MagicMock()
169    mock_instance = MagicMock()
170    mock_file_dialog.return_value = mock_instance
171    mock_instance.__enter__.return_value = mock_dialog
172    mock_event.GetValue.return_value = (mock_atft.SelectFileArg(
173        self.TEST_TEXT, self.TEST_TEXT2, mock_callback))
174    mock_dialog.ShowModal.return_value = wx.ID_CANCEL
175    mock_dialog.GetPath.return_value = self.TEST_TEXT
176    mock_atft._SelectFileEventHandler(mock_event)
177    mock_callback.assert_not_called()
178
179  # Test atft.PrintToWindow
180  def MockAppendText(self, text):
181    self.test_text_window += text
182
183  def MockClear(self):
184    self.test_text_window = ''
185
186  def MockGetValue(self):
187    return self.test_text_window
188
189  def testPrintToWindow(self):
190    self.test_text_window = ''
191    mock_atft = MockAtft()
192    mock_text_entry = MagicMock()
193    mock_text_entry.AppendText.side_effect = self.MockAppendText
194    mock_text_entry.Clear.side_effect = self.MockClear
195    mock_text_entry.GetValue.side_effect = self.MockGetValue
196    mock_atft.PrintToWindow(mock_text_entry, self.TEST_TEXT)
197    self.assertEqual(self.TEST_TEXT, self.test_text_window)
198    mock_atft.PrintToWindow(mock_text_entry, self.TEST_TEXT2)
199    self.assertEqual(self.TEST_TEXT2, self.test_text_window)
200    mock_text_entry.AppendText.reset_mock()
201    mock_atft.PrintToWindow(mock_text_entry, self.TEST_TEXT2)
202    self.assertEqual(False, mock_text_entry.AppendText.called)
203    self.assertEqual(self.TEST_TEXT2, self.test_text_window)
204    mock_text_entry.Clear()
205    mock_atft.PrintToWindow(mock_text_entry, self.TEST_TEXT, True)
206    mock_atft.PrintToWindow(mock_text_entry, self.TEST_TEXT2, True)
207    self.assertEqual(self.TEST_TEXT + self.TEST_TEXT2, self.test_text_window)
208
209  # Test atft.StartRefreshingDevices(), atft.StopRefresh()
210  # Test atft.PauseRefresh(), atft.ResumeRefresh()
211
212  @patch('threading.Timer')
213  @patch('wx.QueueEvent')
214  def testStartRefreshingDevice(self, mock_queue_event, mock_timer):
215    mock_atft = MockAtft()
216    mock_atft.StartRefreshingDevices = types.MethodType(
217        atft.Atft.StartRefreshingDevices, mock_atft, atft.Atft)
218    mock_atft.DEVICE_REFRESH_INTERVAL = 0.01
219    mock_atft._ListDevices = MagicMock()
220    mock_atft.dev_listed_event = MagicMock()
221    mock_timer.side_effect = MagicMock()
222
223    mock_atft.StartRefreshingDevices()
224
225    mock_atft._ListDevices.assert_called()
226    mock_atft.StopRefresh()
227    self.assertEqual(None, mock_atft.refresh_timer)
228
229  @patch('threading.Timer')
230  def testPauseResumeRefreshingDevice(self, mock_timer):
231    mock_atft = MockAtft()
232    mock_atft.StartRefreshingDevices = types.MethodType(
233        atft.Atft.StartRefreshingDevices, mock_atft, atft.Atft)
234    mock_atft.DEVICE_REFRESH_INTERVAL = 0.01
235    mock_atft._ListDevices = MagicMock()
236    mock_atft.dev_listed_event = MagicMock()
237    mock_atft._SendDeviceListedEvent = MagicMock()
238    mock_timer.side_effect = MagicMock()
239
240    mock_atft.PauseRefresh()
241    mock_atft.StartRefreshingDevices()
242    mock_atft._ListDevices.assert_not_called()
243    mock_atft._SendDeviceListedEvent.assert_called()
244    mock_atft.ResumeRefresh()
245    mock_atft.StartRefreshingDevices()
246    mock_atft._ListDevices.assert_called()
247    mock_atft.StopRefresh()
248
249  # Test atft.OnToggleAutoProv
250  def testOnEnterAutoProvNormal(self):
251    mock_atft = MockAtft()
252    mock_atft.StartRefreshingDevices = types.MethodType(
253        atft.Atft.StartRefreshingDevices, mock_atft, atft.Atft)
254    mock_atft.toolbar = MagicMock()
255    mock_atft.toolbar_auto_provision = MagicMock()
256    mock_atft.toolbar_auto_provision.IsToggled.return_value = True
257    mock_atft.atft_manager.atfa_dev = MagicMock()
258    mock_atft.atft_manager.product_info = MagicMock()
259    mock_atft._ToggleToolbarMenu = MagicMock()
260    mock_atft.PrintToCommandWindow = MagicMock()
261    mock_atft._CreateThread = MagicMock()
262    mock_atft._CheckLowKeyAlert = MagicMock()
263    mock_atft.OnToggleAutoProv(None)
264    self.assertEqual(True, mock_atft.auto_prov)
265
266  def testOnEnterAutoProvNoAtfa(self):
267    # Cannot enter auto provisioning mode without an ATFA device
268    mock_atft = MockAtft()
269    mock_atft.toolbar = MagicMock()
270    mock_atft.toolbar_auto_provision = MagicMock()
271    mock_atft.toolbar_auto_provision.IsToggled.return_value = True
272    # No ATFA device
273    mock_atft.atft_manager.atfa_dev = None
274    mock_atft.atft_manager.product_info = MagicMock()
275    mock_atft._ToggleToolbarMenu = MagicMock()
276    mock_atft.PrintToCommandWindow = MagicMock()
277    mock_atft._CreateThread = MagicMock()
278    mock_atft._CheckLowKeyAlert = MagicMock()
279    mock_atft._SendAlertEvent = MagicMock()
280    mock_atft.OnToggleAutoProv(None)
281    self.assertEqual(False, mock_atft.auto_prov)
282
283  def testOnEnterAutoProvNoProduct(self):
284    # Cannot enter auto provisioning mode without an ATFA device
285    mock_atft = MockAtft()
286    mock_atft.toolbar = MagicMock()
287    mock_atft.toolbar_auto_provision = MagicMock()
288    mock_atft.toolbar_auto_provision.IsToggled.return_value = True
289    mock_atft.atft_manager.atfa_dev = MagicMock()
290    # No product info
291    mock_atft.atft_manager.product_info = None
292    mock_atft._ToggleToolbarMenu = MagicMock()
293    mock_atft.PrintToCommandWindow = MagicMock()
294    mock_atft._CreateThread = MagicMock()
295    mock_atft._CheckLowKeyAlert = MagicMock()
296    mock_atft._SendAlertEvent = MagicMock()
297    mock_atft.OnToggleAutoProv(None)
298    self.assertEqual(False, mock_atft.auto_prov)
299
300  def testLeaveAutoProvNormal(self):
301    mock_atft = MockAtft()
302    mock_atft.toolbar = MagicMock()
303    mock_atft.toolbar_auto_provision = MagicMock()
304    mock_atft.toolbar_auto_provision.IsToggled.return_value = False
305    mock_atft.atft_manager.atfa_dev = MagicMock()
306    mock_atft.atft_manager.product_info = MagicMock()
307    mock_atft._ToggleToolbarMenu = MagicMock()
308    mock_atft.PrintToCommandWindow = MagicMock()
309    mock_atft._CreateThread = MagicMock()
310    mock_atft._CheckLowKeyAlert = MagicMock()
311    mock_atft._SendAlertEvent = MagicMock()
312    mock_atft.atft_manager.target_devs = []
313    test_dev1 = TestDeviceInfo(self.TEST_SERIAL1, self.TEST_LOCATION1,
314                               ProvisionStatus.PROVISION_SUCCESS)
315    test_dev2 = TestDeviceInfo(self.TEST_SERIAL2, self.TEST_LOCATION1,
316                               ProvisionStatus.WAITING)
317    mock_atft.atft_manager.target_devs.append(test_dev1)
318    mock_atft.atft_manager.target_devs.append(test_dev2)
319    mock_atft.atft_manager.CheckProvisionStatus.side_effect = (
320        lambda target=test_dev2, state=ProvisionStatus.LOCKAVB_SUCCESS:
321        self.MockStateChange(target, state))
322    mock_atft.OnToggleAutoProv(None)
323    self.assertEqual(False, mock_atft.auto_prov)
324    self.assertEqual(test_dev1.provision_status,
325                     ProvisionStatus.PROVISION_SUCCESS)
326    mock_atft.atft_manager.CheckProvisionStatus.assert_called_once()
327    self.assertEqual(test_dev2.provision_status, ProvisionStatus.LOCKAVB_SUCCESS)
328
329  # Test atft.OnChangeKeyThreshold
330  def testOnChangeKeyThreshold(self):
331    mock_atft = MockAtft()
332    mock_atft.change_threshold_dialog = MagicMock()
333    mock_atft.change_threshold_dialog.ShowModal.return_value = wx.ID_OK
334    mock_atft.change_threshold_dialog.GetValue.return_value = '100'
335    mock_atft.OnChangeKeyThreshold(None)
336    self.assertEqual(100, mock_atft.key_threshold)
337
338  def testOnChangeKeyThresholdNotInt(self):
339    mock_atft = MockAtft()
340    mock_atft.key_threshold = 100
341    mock_atft.change_threshold_dialog = MagicMock()
342    mock_atft.change_threshold_dialog.ShowModal.return_value = wx.ID_OK
343    mock_atft.change_threshold_dialog.GetValue.return_value = 'ABC'
344    mock_atft.OnChangeKeyThreshold(None)
345    self.assertEqual(100, mock_atft.key_threshold)
346    mock_atft.change_threshold_dialog.GetValue.return_value = '2'
347    mock_atft.OnChangeKeyThreshold(None)
348    self.assertEqual(2, mock_atft.key_threshold)
349    mock_atft.change_threshold_dialog.GetValue.return_value = '-10'
350    mock_atft.OnChangeKeyThreshold(None)
351    self.assertEqual(2, mock_atft.key_threshold)
352
353  # Test atft._HandleAutoProv
354  def testHandleAutoProv(self):
355    mock_atft = MockAtft()
356    test_dev1 = TestDeviceInfo(self.TEST_SERIAL1, self.TEST_LOCATION1,
357                               ProvisionStatus.PROVISION_SUCCESS)
358    test_dev2 = TestDeviceInfo(self.TEST_SERIAL2, self.TEST_LOCATION1,
359                               ProvisionStatus.IDLE)
360    mock_atft.atft_manager.target_devs = []
361    mock_atft.atft_manager.target_devs.append(test_dev1)
362    mock_atft.atft_manager.target_devs.append(test_dev2)
363    mock_atft._CreateThread = MagicMock()
364    mock_atft._HandleStateTransition = MagicMock()
365    mock_atft._HandleAutoProv()
366    self.assertEqual(test_dev2.provision_status, ProvisionStatus.WAITING)
367    self.assertEqual(1, mock_atft._CreateThread.call_count)
368
369  # Test atft._HandleKeysLeft
370  def MockGetKeysLeft(self, keys_left_array):
371    if keys_left_array:
372      return keys_left_array[0]
373    else:
374      return None
375
376  def MockSetKeysLeft(self, keys_left_array):
377    keys_left_array.append(10)
378
379  def testHandleKeysLeft(self):
380    mock_atft = MockAtft()
381    keys_left_array = []
382    mock_atft.atft_manager.GetATFAKeysLeft = MagicMock()
383    mock_atft.atft_manager.GetATFAKeysLeft.side_effect = (
384        lambda: self.MockGetKeysLeft(keys_left_array))
385    mock_atft.atft_manager.CheckATFAStatus.side_effect = (
386        lambda: self.MockSetKeysLeft(keys_left_array))
387    mock_atft.keys_left_display = MagicMock()
388    mock_atft._HandleKeysLeft()
389    mock_atft.keys_left_display.SetLabelText.assert_called_once_with('10')
390
391  def testHandleKeysLeftKeysNotNone(self):
392    mock_atft = MockAtft()
393    keys_left_array = [10]
394    mock_atft.atft_manager.GetATFAKeysLeft = MagicMock()
395    mock_atft.atft_manager.GetATFAKeysLeft.side_effect = (
396        lambda: self.MockGetKeysLeft(keys_left_array))
397    mock_atft.keys_left_display = MagicMock()
398    mock_atft._HandleKeysLeft()
399    mock_atft.keys_left_display.SetLabelText.assert_called_once_with('10')
400    mock_atft.atft_manager.CheckATFAStatus.assert_not_called()
401
402  def testHandleKeysLeftKeysNone(self):
403    mock_atft = MockAtft()
404    keys_left_array = []
405    mock_atft.atft_manager.GetATFAKeysLeft = MagicMock()
406    mock_atft.atft_manager.GetATFAKeysLeft.side_effect = (
407        lambda: self.MockGetKeysLeft(keys_left_array))
408    mock_atft.keys_left_display = MagicMock()
409    mock_atft._HandleKeysLeft()
410    mock_atft.keys_left_display.SetLabelText.assert_called_once_with('')
411
412  # Test atft._HandleStateTransition
413  def MockStateChange(self, target, state):
414    target.provision_status = state
415    if state == ProvisionStatus.REBOOT_SUCCESS:
416      target.provision_state.bootloader_locked = True
417    if state == ProvisionStatus.FUSEATTR_SUCCESS:
418      target.provision_state.avb_perm_attr_set = True
419    if state == ProvisionStatus.LOCKAVB_SUCCESS:
420      target.provision_state.avb_locked = True
421    if state == ProvisionStatus.PROVISION_SUCCESS:
422      target.provision_state.provisioned = True
423
424  def testHandleStateTransition(self):
425    mock_atft = MockAtft()
426    test_dev1 = TestDeviceInfo(self.TEST_SERIAL1, self.TEST_LOCATION1,
427                               ProvisionStatus.WAITING)
428    mock_atft._FuseVbootKeyTarget = MagicMock()
429    mock_atft._FuseVbootKeyTarget.side_effect = (
430        lambda target=mock_atft, state=ProvisionStatus.REBOOT_SUCCESS:
431        self.MockStateChange(target, state))
432    mock_atft._FusePermAttrTarget = MagicMock()
433    mock_atft._FusePermAttrTarget.side_effect = (
434        lambda target=mock_atft, state=ProvisionStatus.FUSEATTR_SUCCESS:
435        self.MockStateChange(target, state))
436    mock_atft._LockAvbTarget = MagicMock()
437    mock_atft._LockAvbTarget.side_effect = (
438        lambda target=mock_atft, state=ProvisionStatus.LOCKAVB_SUCCESS:
439        self.MockStateChange(target, state))
440    mock_atft._ProvisionTarget = MagicMock()
441    mock_atft._ProvisionTarget.side_effect = (
442        lambda target=mock_atft, state=ProvisionStatus.PROVISION_SUCCESS:
443        self.MockStateChange(target, state))
444    mock_atft.auto_dev_serials = [self.TEST_SERIAL1]
445    mock_atft.auto_prov = True
446    mock_atft.atft_manager = MagicMock()
447    mock_atft.atft_manager.GetTargetDevice = MagicMock()
448    mock_atft.atft_manager.GetTargetDevice.return_value = test_dev1
449    mock_atft._HandleStateTransition(test_dev1)
450    self.assertEqual(ProvisionStatus.PROVISION_SUCCESS,
451                     test_dev1.provision_status)
452
453  def testHandleStateTransitionFuseVbootFail(self):
454    mock_atft = MockAtft()
455    test_dev1 = TestDeviceInfo(self.TEST_SERIAL1, self.TEST_LOCATION1,
456                               ProvisionStatus.WAITING)
457    mock_atft._FuseVbootKeyTarget = MagicMock()
458    mock_atft._FuseVbootKeyTarget.side_effect = (
459        lambda target=mock_atft, state=ProvisionStatus.FUSEVBOOT_FAILED:
460            self.MockStateChange(target, state))
461    mock_atft._FusePermAttrTarget = MagicMock()
462    mock_atft._FusePermAttrTarget.side_effect = (
463        lambda target=mock_atft, state=ProvisionStatus.FUSEATTR_SUCCESS:
464            self.MockStateChange(target, state))
465    mock_atft._LockAvbTarget = MagicMock()
466    mock_atft._LockAvbTarget.side_effect = (
467        lambda target=mock_atft, state=ProvisionStatus.LOCKAVB_SUCCESS:
468            self.MockStateChange(target, state))
469    mock_atft._ProvisionTarget = MagicMock()
470    mock_atft._ProvisionTarget.side_effect = (
471        lambda target=mock_atft, state=ProvisionStatus.PROVISION_SUCCESS:
472            self.MockStateChange(target, state))
473    mock_atft.auto_dev_serials = [self.TEST_SERIAL1]
474    mock_atft.auto_prov = True
475    mock_atft.atft_manager = MagicMock()
476    mock_atft.atft_manager.GetTargetDevice = MagicMock()
477    mock_atft.atft_manager.GetTargetDevice.return_value = test_dev1
478    mock_atft._HandleStateTransition(test_dev1)
479    self.assertEqual(ProvisionStatus.FUSEVBOOT_FAILED,
480                     test_dev1.provision_status)
481
482  def testHandleStateTransitionRebootFail(self):
483    mock_atft = MockAtft()
484    test_dev1 = TestDeviceInfo(self.TEST_SERIAL1, self.TEST_LOCATION1,
485                               ProvisionStatus.WAITING)
486    mock_atft._FuseVbootKeyTarget = MagicMock()
487    mock_atft._FuseVbootKeyTarget.side_effect = (
488        lambda target=mock_atft, state=ProvisionStatus.REBOOT_FAILED:
489        self.MockStateChange(target, state))
490    mock_atft._FusePermAttrTarget = MagicMock()
491    mock_atft._FusePermAttrTarget.side_effect = (
492        lambda target=mock_atft, state=ProvisionStatus.FUSEATTR_SUCCESS:
493        self.MockStateChange(target, state))
494    mock_atft._LockAvbTarget = MagicMock()
495    mock_atft._LockAvbTarget.side_effect = (
496        lambda target=mock_atft, state=ProvisionStatus.LOCKAVB_SUCCESS:
497        self.MockStateChange(target, state))
498    mock_atft._ProvisionTarget = MagicMock()
499    mock_atft._ProvisionTarget.side_effect = (
500        lambda target=mock_atft, state=ProvisionStatus.PROVISION_SUCCESS:
501        self.MockStateChange(target, state))
502    mock_atft.auto_dev_serials = [self.TEST_SERIAL1]
503    mock_atft.auto_prov = True
504    mock_atft.atft_manager = MagicMock()
505    mock_atft.atft_manager.GetTargetDevice = MagicMock()
506    mock_atft.atft_manager.GetTargetDevice.return_value = test_dev1
507    mock_atft._HandleStateTransition(test_dev1)
508    self.assertEqual(ProvisionStatus.REBOOT_FAILED, test_dev1.provision_status)
509
510  def testHandleStateTransitionFuseAttrFail(self):
511    mock_atft = MockAtft()
512    test_dev1 = TestDeviceInfo(self.TEST_SERIAL1, self.TEST_LOCATION1,
513                               ProvisionStatus.WAITING)
514    mock_atft._FuseVbootKeyTarget = MagicMock()
515    mock_atft._FuseVbootKeyTarget.side_effect = (
516        lambda target=mock_atft, state=ProvisionStatus.REBOOT_SUCCESS:
517        self.MockStateChange(target, state))
518    mock_atft._FusePermAttrTarget = MagicMock()
519    mock_atft._FusePermAttrTarget.side_effect = (
520        lambda target=mock_atft, state=ProvisionStatus.FUSEATTR_FAILED:
521        self.MockStateChange(target, state))
522    mock_atft._LockAvbTarget = MagicMock()
523    mock_atft._LockAvbTarget.side_effect = (
524        lambda target=mock_atft, state=ProvisionStatus.LOCKAVB_SUCCESS:
525        self.MockStateChange(target, state))
526    mock_atft._ProvisionTarget = MagicMock()
527    mock_atft._ProvisionTarget.side_effect = (
528        lambda target=mock_atft, state=ProvisionStatus.PROVISION_SUCCESS:
529        self.MockStateChange(target, state))
530    mock_atft.auto_dev_serials = [self.TEST_SERIAL1]
531    mock_atft.auto_prov = True
532    mock_atft.atft_manager = MagicMock()
533    mock_atft.atft_manager.GetTargetDevice = MagicMock()
534    mock_atft.atft_manager.GetTargetDevice.return_value = test_dev1
535    mock_atft._HandleStateTransition(test_dev1)
536    self.assertEqual(ProvisionStatus.FUSEATTR_FAILED,
537                     test_dev1.provision_status)
538
539  def testHandleStateTransitionLockAVBFail(self):
540    mock_atft = MockAtft()
541    test_dev1 = TestDeviceInfo(self.TEST_SERIAL1, self.TEST_LOCATION1,
542                               ProvisionStatus.WAITING)
543    mock_atft._FuseVbootKeyTarget = MagicMock()
544    mock_atft._FuseVbootKeyTarget.side_effect = (
545        lambda target=mock_atft, state=ProvisionStatus.REBOOT_SUCCESS:
546        self.MockStateChange(target, state))
547    mock_atft._FusePermAttrTarget = MagicMock()
548    mock_atft._FusePermAttrTarget.side_effect = (
549        lambda target=mock_atft, state=ProvisionStatus.FUSEATTR_SUCCESS:
550        self.MockStateChange(target, state))
551    mock_atft._LockAvbTarget = MagicMock()
552    mock_atft._LockAvbTarget.side_effect = (
553        lambda target=mock_atft, state=ProvisionStatus.LOCKAVB_FAILED:
554        self.MockStateChange(target, state))
555    mock_atft._ProvisionTarget = MagicMock()
556    mock_atft._ProvisionTarget.side_effect = (
557        lambda target=mock_atft, state=ProvisionStatus.PROVISION_SUCCESS:
558        self.MockStateChange(target, state))
559    mock_atft.auto_dev_serials = [self.TEST_SERIAL1]
560    mock_atft.auto_prov = True
561    mock_atft.atft_manager = MagicMock()
562    mock_atft.atft_manager.GetTargetDevice = MagicMock()
563    mock_atft.atft_manager.GetTargetDevice.return_value = test_dev1
564    mock_atft._HandleStateTransition(test_dev1)
565    self.assertEqual(ProvisionStatus.LOCKAVB_FAILED, test_dev1.provision_status)
566
567  def testHandleStateTransitionProvisionFail(self):
568    mock_atft = MockAtft()
569    test_dev1 = TestDeviceInfo(self.TEST_SERIAL1, self.TEST_LOCATION1,
570                               ProvisionStatus.WAITING)
571    mock_atft._FuseVbootKeyTarget = MagicMock()
572    mock_atft._FuseVbootKeyTarget.side_effect = (
573        lambda target=mock_atft, state=ProvisionStatus.REBOOT_SUCCESS:
574        self.MockStateChange(target, state))
575    mock_atft._FusePermAttrTarget = MagicMock()
576    mock_atft._FusePermAttrTarget.side_effect = (
577        lambda target=mock_atft, state=ProvisionStatus.FUSEATTR_SUCCESS:
578        self.MockStateChange(target, state))
579    mock_atft._LockAvbTarget = MagicMock()
580    mock_atft._LockAvbTarget.side_effect = (
581        lambda target=mock_atft, state=ProvisionStatus.LOCKAVB_SUCCESS:
582        self.MockStateChange(target, state))
583    mock_atft._ProvisionTarget = MagicMock()
584    mock_atft._ProvisionTarget.side_effect = (
585        lambda target=mock_atft, state=ProvisionStatus.PROVISION_FAILED:
586        self.MockStateChange(target, state))
587    mock_atft.auto_dev_serials = [self.TEST_SERIAL1]
588    mock_atft.auto_prov = True
589    mock_atft.atft_manager = MagicMock()
590    mock_atft.atft_manager.GetTargetDevice = MagicMock()
591    mock_atft.atft_manager.GetTargetDevice.return_value = test_dev1
592    mock_atft._HandleStateTransition(test_dev1)
593    self.assertEqual(ProvisionStatus.PROVISION_FAILED,
594                     test_dev1.provision_status)
595
596  def testHandleStateTransitionSkipStep(self):
597    mock_atft = MockAtft()
598    test_dev1 = TestDeviceInfo(self.TEST_SERIAL1, self.TEST_LOCATION1,
599                               ProvisionStatus.WAITING)
600    mock_atft._FuseVbootKeyTarget = MagicMock()
601    mock_atft._FuseVbootKeyTarget.side_effect = (
602        lambda target=mock_atft, state=ProvisionStatus.REBOOT_SUCCESS:
603        self.MockStateChange(target, state))
604    mock_atft._FusePermAttrTarget = MagicMock()
605    mock_atft._FusePermAttrTarget.side_effect = (
606        lambda target=mock_atft, state=ProvisionStatus.FUSEATTR_SUCCESS:
607        self.MockStateChange(target, state))
608    mock_atft._LockAvbTarget = MagicMock()
609    mock_atft._LockAvbTarget.side_effect = (
610        lambda target=mock_atft, state=ProvisionStatus.LOCKAVB_SUCCESS:
611        self.MockStateChange(target, state))
612    mock_atft._ProvisionTarget = MagicMock()
613    mock_atft._ProvisionTarget.side_effect = (
614        lambda target=mock_atft, state=ProvisionStatus.PROVISION_SUCCESS:
615        self.MockStateChange(target, state))
616
617    # The device has bootloader_locked and avb_locked set. Should fuse perm attr
618    # and provision key.
619    test_dev1.provision_state.bootloader_locked = True
620    test_dev1.provision_state.avb_locked = True
621    mock_atft.auto_dev_serials = [self.TEST_SERIAL1]
622    mock_atft.auto_prov = True
623    mock_atft.atft_manager = MagicMock()
624    mock_atft.atft_manager.GetTargetDevice = MagicMock()
625    mock_atft.atft_manager.GetTargetDevice.return_value = test_dev1
626    mock_atft._HandleStateTransition(test_dev1)
627    self.assertEqual(ProvisionStatus.PROVISION_SUCCESS,
628                     test_dev1.provision_status)
629    mock_atft._FusePermAttrTarget.assert_called_once()
630    mock_atft._ProvisionTarget.assert_called_once()
631    self.assertEqual(True, test_dev1.provision_state.bootloader_locked)
632    self.assertEqual(True, test_dev1.provision_state.avb_perm_attr_set)
633    self.assertEqual(True, test_dev1.provision_state.avb_locked)
634    self.assertEqual(True, test_dev1.provision_state.provisioned)
635
636  # Test atft._CheckATFAStatus
637  def testCheckATFAStatus(self):
638    mock_atft = MockAtft()
639    mock_atft.PauseRefresh = MagicMock()
640    mock_atft.ResumeRefresh = MagicMock()
641    mock_atft._SendStartMessageEvent = MagicMock()
642    mock_atft._SendSucceedMessageEvent = MagicMock()
643    mock_atft._CheckATFAStatus()
644    mock_atft.atft_manager.CheckATFAStatus.assert_called()
645
646  # Test atft._FuseVbootKey
647  def MockGetTargetDevice(self, serial):
648    return self.device_map.get(serial)
649
650  def MockReboot(self, target, timeout, success, fail):
651    success()
652    target.provision_state.bootloader_locked = True
653
654  @patch('wx.QueueEvent')
655  def testFuseVbootKey(self, mock_queue_event):
656    mock_atft = MockAtft()
657    mock_atft.dev_listed_event = MagicMock()
658    mock_atft.PauseRefresh = MagicMock()
659    mock_atft.ResumeRefresh = MagicMock()
660    mock_atft._SendStartMessageEvent = MagicMock()
661    mock_atft._SendSucceedMessageEvent = MagicMock()
662    mock_atft._SendAlertEvent = MagicMock()
663    mock_atft._SendMessageEvent = MagicMock()
664    mock_atft.atft_manager.GetTargetDevice.side_effect = (
665        self.MockGetTargetDevice)
666    test_dev1 = TestDeviceInfo(self.TEST_SERIAL1, self.TEST_LOCATION1,
667                               ProvisionStatus.IDLE)
668    test_dev2 = TestDeviceInfo(self.TEST_SERIAL2, self.TEST_LOCATION2,
669                               ProvisionStatus.FUSEVBOOT_FAILED)
670    test_dev3 = TestDeviceInfo(self.TEST_SERIAL3, self.TEST_LOCATION2,
671                               ProvisionStatus.FUSEVBOOT_SUCCESS)
672    test_dev3.provision_state.bootloader_locked = True
673    self.device_map[self.TEST_SERIAL1] = test_dev1
674    self.device_map[self.TEST_SERIAL2] = test_dev2
675    self.device_map[self.TEST_SERIAL3] = test_dev3
676    serials = [self.TEST_SERIAL1, self.TEST_SERIAL2, self.TEST_SERIAL3]
677    mock_atft.atft_manager.Reboot.side_effect = self.MockReboot
678    mock_atft._FuseVbootKey(serials)
679    calls = [call(test_dev1), call(test_dev2)]
680    mock_atft.atft_manager.FuseVbootKey.assert_has_calls(calls)
681    mock_queue_event.assert_called()
682
683  @patch('wx.QueueEvent')
684  def testFuseVbootKeyExceptions(self, mock_queue_event):
685    mock_atft = MockAtft()
686    mock_atft.dev_listed_event = MagicMock()
687    mock_atft.PauseRefresh = MagicMock()
688    mock_atft.ResumeRefresh = MagicMock()
689    mock_atft._SendStartMessageEvent = MagicMock()
690    mock_atft._SendSucceedMessageEvent = MagicMock()
691    mock_atft._SendAlertEvent = MagicMock()
692    mock_atft._SendMessageEvent = MagicMock()
693    mock_atft._HandleException = MagicMock()
694    mock_atft.atft_manager.GetTargetDevice.side_effect = (
695        self.MockGetTargetDevice)
696    test_dev1 = TestDeviceInfo(self.TEST_SERIAL1, self.TEST_LOCATION1,
697                               ProvisionStatus.IDLE)
698    test_dev2 = TestDeviceInfo(self.TEST_SERIAL2, self.TEST_LOCATION2,
699                               ProvisionStatus.FUSEVBOOT_FAILED)
700    test_dev3 = TestDeviceInfo(self.TEST_SERIAL3, self.TEST_LOCATION2,
701                               ProvisionStatus.FUSEVBOOT_SUCCESS)
702    test_dev3.provision_state.bootloader_locked = True
703    self.device_map[self.TEST_SERIAL1] = test_dev1
704    self.device_map[self.TEST_SERIAL2] = test_dev2
705    self.device_map[self.TEST_SERIAL3] = test_dev3
706    serials = [self.TEST_SERIAL1, self.TEST_SERIAL2, self.TEST_SERIAL3]
707    mock_atft.atft_manager.Reboot.side_effect = self.MockReboot
708    mock_atft.atft_manager.FuseVbootKey.side_effect = (
709        fastboot_exceptions.ProductNotSpecifiedException)
710    mock_atft._FuseVbootKey(serials)
711    self.assertEqual(2, mock_atft._HandleException.call_count)
712    mock_queue_event.assert_not_called()
713
714    # Reset states.
715    mock_atft._HandleException.reset_mock()
716    test_dev1 = TestDeviceInfo(self.TEST_SERIAL1, self.TEST_LOCATION1,
717                               ProvisionStatus.IDLE)
718    test_dev2 = TestDeviceInfo(self.TEST_SERIAL2, self.TEST_LOCATION2,
719                               ProvisionStatus.FUSEVBOOT_FAILED)
720    test_dev3 = TestDeviceInfo(self.TEST_SERIAL3, self.TEST_LOCATION2,
721                               ProvisionStatus.FUSEVBOOT_SUCCESS)
722    test_dev3.provision_state.bootloader_locked = True
723    self.device_map[self.TEST_SERIAL1] = test_dev1
724    self.device_map[self.TEST_SERIAL2] = test_dev2
725    self.device_map[self.TEST_SERIAL3] = test_dev3
726    serials = [self.TEST_SERIAL1, self.TEST_SERIAL2, self.TEST_SERIAL3]
727    mock_atft.atft_manager.FuseVbootKey.side_effect = (
728        fastboot_exceptions.FastbootFailure(''))
729    mock_atft._FuseVbootKey(serials)
730    self.assertEqual(2, mock_atft._HandleException.call_count)
731
732    # Reset states, test reboot failure
733    mock_atft._HandleException.reset_mock()
734    test_dev1 = TestDeviceInfo(self.TEST_SERIAL1, self.TEST_LOCATION1,
735                               ProvisionStatus.IDLE)
736    test_dev2 = TestDeviceInfo(self.TEST_SERIAL2, self.TEST_LOCATION2,
737                               ProvisionStatus.FUSEVBOOT_FAILED)
738    test_dev3 = TestDeviceInfo(self.TEST_SERIAL3, self.TEST_LOCATION2,
739                               ProvisionStatus.FUSEVBOOT_SUCCESS)
740    test_dev3.provision_state.bootloader_locked = True
741    self.device_map[self.TEST_SERIAL1] = test_dev1
742    self.device_map[self.TEST_SERIAL2] = test_dev2
743    self.device_map[self.TEST_SERIAL3] = test_dev3
744    serials = [self.TEST_SERIAL1, self.TEST_SERIAL2, self.TEST_SERIAL3]
745    mock_atft.atft_manager.Reboot.side_effect = (
746        fastboot_exceptions.FastbootFailure(''))
747    mock_atft.atft_manager.FuseVbootKey = MagicMock()
748    mock_atft._FuseVbootKey(serials)
749    self.assertEqual(2, mock_atft._HandleException.call_count)
750
751  # Test atft._FusePermAttr
752  def testFusePermAttr(self):
753    mock_atft = MockAtft()
754    mock_atft.PauseRefresh = MagicMock()
755    mock_atft.ResumeRefresh = MagicMock()
756    mock_atft._SendStartMessageEvent = MagicMock()
757    mock_atft._SendSucceedMessageEvent = MagicMock()
758    mock_atft._SendAlertEvent = MagicMock()
759    mock_atft._SendMessageEvent = MagicMock()
760    mock_atft.atft_manager.GetTargetDevice.side_effect = (
761        self.MockGetTargetDevice)
762    test_dev1 = TestDeviceInfo(
763        self.TEST_SERIAL1, self.TEST_LOCATION1,
764        ProvisionStatus.FUSEVBOOT_SUCCESS)
765    test_dev1.provision_state.bootloader_locked = True
766    test_dev2 = TestDeviceInfo(
767        self.TEST_SERIAL2, self.TEST_LOCATION2,
768        ProvisionStatus.REBOOT_SUCCESS)
769    test_dev2.provision_state.bootloader_locked = True
770    test_dev3 = TestDeviceInfo(
771        self.TEST_SERIAL3, self.TEST_LOCATION2,
772        ProvisionStatus.FUSEATTR_FAILED)
773    test_dev3.provision_state.bootloader_locked = True
774    test_dev4 = TestDeviceInfo(
775        self.TEST_SERIAL4, self.TEST_LOCATION2,
776        ProvisionStatus.FUSEATTR_SUCCESS)
777    test_dev4.provision_state.bootloader_locked = True
778    test_dev4.provision_state.avb_perm_attr_set = True
779    self.device_map[self.TEST_SERIAL1] = test_dev1
780    self.device_map[self.TEST_SERIAL2] = test_dev2
781    self.device_map[self.TEST_SERIAL3] = test_dev3
782    self.device_map[self.TEST_SERIAL4] = test_dev4
783    serials = [
784        self.TEST_SERIAL1, self.TEST_SERIAL2, self.TEST_SERIAL3,
785        self.TEST_SERIAL4
786    ]
787    mock_atft._FusePermAttr(serials)
788    calls = [call(test_dev1), call(test_dev2), call(test_dev3)]
789    mock_atft.atft_manager.FusePermAttr.assert_has_calls(calls)
790
791  def testFusePermAttrExceptions(self):
792    mock_atft = MockAtft()
793    mock_atft.PauseRefresh = MagicMock()
794    mock_atft.ResumeRefresh = MagicMock()
795    mock_atft._SendStartMessageEvent = MagicMock()
796    mock_atft._SendSucceedMessageEvent = MagicMock()
797    mock_atft._SendAlertEvent = MagicMock()
798    mock_atft._SendMessageEvent = MagicMock()
799    mock_atft._HandleException = MagicMock()
800    mock_atft.atft_manager.GetTargetDevice.side_effect = (
801        self.MockGetTargetDevice)
802    test_dev1 = TestDeviceInfo(self.TEST_SERIAL1, self.TEST_LOCATION1,
803                               ProvisionStatus.FUSEVBOOT_SUCCESS)
804    test_dev1.provision_state.bootloader_locked = True
805    test_dev2 = TestDeviceInfo(self.TEST_SERIAL2, self.TEST_LOCATION2,
806                               ProvisionStatus.REBOOT_SUCCESS)
807    test_dev2.provision_state.bootloader_locked = True
808    test_dev3 = TestDeviceInfo(self.TEST_SERIAL3, self.TEST_LOCATION2,
809                               ProvisionStatus.FUSEATTR_FAILED)
810    test_dev3.provision_state.bootloader_locked = True
811    test_dev4 = TestDeviceInfo(self.TEST_SERIAL4, self.TEST_LOCATION2,
812                               ProvisionStatus.FUSEATTR_SUCCESS)
813    test_dev4.provision_state.bootloader_locked = True
814    test_dev4.provision_state.avb_perm_attr_set = True
815    self.device_map[self.TEST_SERIAL1] = test_dev1
816    self.device_map[self.TEST_SERIAL2] = test_dev2
817    self.device_map[self.TEST_SERIAL3] = test_dev3
818    self.device_map[self.TEST_SERIAL4] = test_dev4
819    serials = [
820        self.TEST_SERIAL1, self.TEST_SERIAL2, self.TEST_SERIAL3,
821        self.TEST_SERIAL4
822    ]
823    mock_atft.atft_manager.FusePermAttr.side_effect = (
824        fastboot_exceptions.ProductNotSpecifiedException)
825    mock_atft._FusePermAttr(serials)
826    self.assertEqual(3, mock_atft._HandleException.call_count)
827    # Reset states
828    mock_atft._HandleException.reset_mock()
829    test_dev1 = TestDeviceInfo(self.TEST_SERIAL1, self.TEST_LOCATION1,
830                               ProvisionStatus.FUSEVBOOT_SUCCESS)
831    test_dev1.provision_state.bootloader_locked = True
832    test_dev2 = TestDeviceInfo(self.TEST_SERIAL2, self.TEST_LOCATION2,
833                               ProvisionStatus.REBOOT_SUCCESS)
834    test_dev2.provision_state.bootloader_locked = True
835    test_dev3 = TestDeviceInfo(self.TEST_SERIAL3, self.TEST_LOCATION2,
836                               ProvisionStatus.FUSEATTR_FAILED)
837    test_dev3.provision_state.bootloader_locked = True
838    test_dev4 = TestDeviceInfo(self.TEST_SERIAL4, self.TEST_LOCATION2,
839                               ProvisionStatus.FUSEATTR_SUCCESS)
840    self.device_map[self.TEST_SERIAL1] = test_dev1
841    self.device_map[self.TEST_SERIAL2] = test_dev2
842    self.device_map[self.TEST_SERIAL3] = test_dev3
843    self.device_map[self.TEST_SERIAL4] = test_dev4
844    mock_atft.atft_manager.FusePermAttr.side_effect = (
845        fastboot_exceptions.FastbootFailure(''))
846    mock_atft._FusePermAttr(serials)
847    self.assertEqual(3, mock_atft._HandleException.call_count)
848
849  # Test atft._LockAvb
850  def testLockAvb(self):
851    mock_atft = MockAtft()
852    mock_atft.PauseRefresh = MagicMock()
853    mock_atft.ResumeRefresh = MagicMock()
854    mock_atft._SendStartMessageEvent = MagicMock()
855    mock_atft._SendSucceedMessageEvent = MagicMock()
856    mock_atft._SendAlertEvent = MagicMock()
857    mock_atft._SendMessageEvent = MagicMock()
858    mock_atft._HandleException = MagicMock()
859    mock_atft.atft_manager.GetTargetDevice.side_effect = (
860        self.MockGetTargetDevice)
861    test_dev1 = TestDeviceInfo(self.TEST_SERIAL1, self.TEST_LOCATION1,
862                               ProvisionStatus.FUSEATTR_SUCCESS)
863    test_dev1.provision_state.bootloader_locked = True
864    test_dev1.provision_state.avb_perm_attr_set = True
865    test_dev2 = TestDeviceInfo(self.TEST_SERIAL2, self.TEST_LOCATION2,
866                               ProvisionStatus.LOCKAVB_FAILED)
867    test_dev2.provision_state.bootloader_locked = True
868    test_dev2.provision_state.avb_perm_attr_set = True
869    test_dev3 = TestDeviceInfo(self.TEST_SERIAL3, self.TEST_LOCATION2,
870                               ProvisionStatus.FUSEATTR_FAILED)
871    test_dev3.provision_state.bootloader_locked = True
872    test_dev3.provision_state.avb_perm_attr_set = False
873    test_dev4 = TestDeviceInfo(self.TEST_SERIAL4, self.TEST_LOCATION2,
874                               ProvisionStatus.IDLE)
875    self.device_map[self.TEST_SERIAL1] = test_dev1
876    self.device_map[self.TEST_SERIAL2] = test_dev2
877    self.device_map[self.TEST_SERIAL3] = test_dev3
878    self.device_map[self.TEST_SERIAL4] = test_dev4
879    serials = [
880        self.TEST_SERIAL1, self.TEST_SERIAL2, self.TEST_SERIAL3,
881        self.TEST_SERIAL4
882    ]
883    mock_atft._LockAvb(serials)
884    calls = [call(test_dev1), call(test_dev2)]
885    mock_atft.atft_manager.LockAvb.assert_has_calls(calls)
886
887  def testLockAvbExceptions(self):
888    mock_atft = MockAtft()
889    mock_atft.PauseRefresh = MagicMock()
890    mock_atft.ResumeRefresh = MagicMock()
891    mock_atft._SendStartMessageEvent = MagicMock()
892    mock_atft._SendSucceedMessageEvent = MagicMock()
893    mock_atft._SendAlertEvent = MagicMock()
894    mock_atft._SendMessageEvent = MagicMock()
895    mock_atft._HandleException = MagicMock()
896    mock_atft.atft_manager.GetTargetDevice.side_effect = (
897        self.MockGetTargetDevice)
898    test_dev1 = TestDeviceInfo(self.TEST_SERIAL1, self.TEST_LOCATION1,
899                               ProvisionStatus.FUSEATTR_SUCCESS)
900    test_dev1.provision_state.bootloader_locked = True
901    test_dev1.provision_state.avb_perm_attr_set = True
902    test_dev2 = TestDeviceInfo(self.TEST_SERIAL2, self.TEST_LOCATION2,
903                               ProvisionStatus.LOCKAVB_FAILED)
904    test_dev2.provision_state.bootloader_locked = True
905    test_dev2.provision_state.avb_perm_attr_set = True
906    test_dev3 = TestDeviceInfo(self.TEST_SERIAL3, self.TEST_LOCATION2,
907                               ProvisionStatus.FUSEATTR_FAILED)
908    test_dev3.provision_state.bootloader_locked = True
909    test_dev3.provision_state.avb_perm_attr_set = False
910    test_dev4 = TestDeviceInfo(self.TEST_SERIAL4, self.TEST_LOCATION2,
911                               ProvisionStatus.IDLE)
912    self.device_map[self.TEST_SERIAL1] = test_dev1
913    self.device_map[self.TEST_SERIAL2] = test_dev2
914    self.device_map[self.TEST_SERIAL3] = test_dev3
915    self.device_map[self.TEST_SERIAL4] = test_dev4
916    serials = [
917        self.TEST_SERIAL1, self.TEST_SERIAL2, self.TEST_SERIAL3,
918        self.TEST_SERIAL4
919    ]
920    mock_atft.atft_manager.LockAvb.side_effect = (
921        fastboot_exceptions.FastbootFailure(''))
922    mock_atft._LockAvb(serials)
923    self.assertEqual(2, mock_atft._HandleException.call_count)
924
925  # Test atft._CheckLowKeyAlert
926  def MockCheckATFAStatus(self):
927    return self.atfa_keys
928
929  def MockSuccessProvision(self, target):
930    self.atfa_keys -= 1
931
932  def MockFailedProvision(self, target):
933    pass
934
935  def testCheckLowKeyAlert(self):
936    mock_atft = MockAtft()
937    mock_atft.PauseRefresh = MagicMock()
938    mock_atft.ResumeRefresh = MagicMock()
939    mock_atft._SendStartMessageEvent = MagicMock()
940    mock_atft._SendSucceedMessageEvent = MagicMock()
941    mock_atft._SendLowKeyAlertEvent = MagicMock()
942    mock_atft.key_threshold = 100
943    test_dev1 = TestDeviceInfo(self.TEST_SERIAL1, self.TEST_LOCATION1,
944                               ProvisionStatus.WAITING)
945    mock_atft.atft_manager.GetATFAKeysLeft.side_effect = (
946        self.MockCheckATFAStatus)
947    self.atfa_keys = 102
948    # First provision succeed
949    # First check 101 left, no alert
950    mock_atft.atft_manager.Provision.side_effect = self.MockSuccessProvision
951    mock_atft._ProvisionTarget(test_dev1)
952    mock_atft._SendLowKeyAlertEvent.assert_not_called()
953    # Second provision failed
954    # Second check, assume 100 left, verify, 101 left no alert
955    mock_atft.atft_manager.Provision.side_effect = self.MockFailedProvision
956    mock_atft._ProvisionTarget(test_dev1)
957    mock_atft._SendLowKeyAlertEvent.assert_not_called()
958    # Third check, assuem 100 left, verify, 100 left, alert
959    mock_atft.atft_manager.Provision.side_effect = self.MockSuccessProvision
960    mock_atft._ProvisionTarget(test_dev1)
961    mock_atft._SendLowKeyAlertEvent.assert_called()
962
963  def testCheckLowKeyAlertException(self):
964    mock_atft = MockAtft()
965    mock_atft.PauseRefresh = MagicMock()
966    mock_atft.ResumeRefresh = MagicMock()
967    mock_atft._SendStartMessageEvent = MagicMock()
968    mock_atft._SendSucceedMessageEvent = MagicMock()
969    mock_atft._SendLowKeyAlertEvent = MagicMock()
970    mock_atft.key_threshold = 100
971    mock_atft._HandleException = MagicMock()
972    mock_atft.atft_manager.CheckATFAStatus.side_effect = (
973        fastboot_exceptions.FastbootFailure(''))
974    mock_atft._CheckLowKeyAlert()
975    mock_atft._HandleException.assert_called_once()
976    mock_atft._HandleException.reset_mock()
977    mock_atft.atft_manager.CheckATFAStatus.side_effect = (
978        fastboot_exceptions.ProductNotSpecifiedException)
979    mock_atft._CheckLowKeyAlert()
980    mock_atft._HandleException.assert_called_once()
981    mock_atft._HandleException.reset_mock()
982    mock_atft.atft_manager.CheckATFAStatus.side_effect = (
983        fastboot_exceptions.DeviceNotFoundException)
984    mock_atft._CheckLowKeyAlert()
985    mock_atft._HandleException.assert_called_once()
986
987  # Test atft._SwitchStorageMode
988  def testSwitchStorageMode(self):
989    mock_atft = MockAtft()
990    mock_atft.PauseRefresh = MagicMock()
991    mock_atft.ResumeRefresh = MagicMock()
992    mock_atft._SendStartMessageEvent = MagicMock()
993    mock_atft._SendSucceedMessageEvent = MagicMock()
994    mock_atft._SwitchStorageMode()
995    mock_atft.atft_manager.SwitchATFAStorage.assert_called_once()
996
997  def testSwitchStorageModeExceptions(self):
998    mock_atft = MockAtft()
999    mock_atft.PauseRefresh = MagicMock()
1000    mock_atft.ResumeRefresh = MagicMock()
1001    mock_atft._SendStartMessageEvent = MagicMock()
1002    mock_atft._SendSucceedMessageEvent = MagicMock()
1003    mock_atft._HandleException = MagicMock()
1004    mock_atft.atft_manager.SwitchATFAStorage.side_effect = (
1005        fastboot_exceptions.DeviceNotFoundException())
1006    mock_atft._SwitchStorageMode()
1007    mock_atft._HandleException.assert_called_once()
1008    mock_atft._HandleException.reset_mock()
1009    mock_atft.atft_manager.SwitchATFAStorage.side_effect = (
1010        fastboot_exceptions.FastbootFailure(''))
1011    mock_atft._SwitchStorageMode()
1012    mock_atft._HandleException.assert_called_once()
1013
1014  # Test atft._Reboot
1015  def testReboot(self):
1016    mock_atft = MockAtft()
1017    mock_atft.PauseRefresh = MagicMock()
1018    mock_atft.ResumeRefresh = MagicMock()
1019    mock_atft._SendStartMessageEvent = MagicMock()
1020    mock_atft._SendSucceedMessageEvent = MagicMock()
1021    mock_atft._Reboot()
1022    mock_atft.atft_manager.RebootATFA.assert_called_once()
1023
1024  def testRebootExceptions(self):
1025    mock_atft = MockAtft()
1026    mock_atft.PauseRefresh = MagicMock()
1027    mock_atft.ResumeRefresh = MagicMock()
1028    mock_atft._SendStartMessageEvent = MagicMock()
1029    mock_atft._SendSucceedMessageEvent = MagicMock()
1030    mock_atft._HandleException = MagicMock()
1031    mock_atft.atft_manager.RebootATFA.side_effect = (
1032        fastboot_exceptions.DeviceNotFoundException())
1033    mock_atft._Reboot()
1034    mock_atft._HandleException.assert_called_once()
1035    mock_atft._HandleException.reset_mock()
1036    mock_atft.atft_manager.RebootATFA.side_effect = (
1037        fastboot_exceptions.FastbootFailure(''))
1038    mock_atft._Reboot()
1039    mock_atft._HandleException.assert_called_once()
1040
1041  # Test atft._Shutdown
1042  def testShutdown(self):
1043    mock_atft = MockAtft()
1044    mock_atft.PauseRefresh = MagicMock()
1045    mock_atft.ResumeRefresh = MagicMock()
1046    mock_atft._SendStartMessageEvent = MagicMock()
1047    mock_atft._SendSucceedMessageEvent = MagicMock()
1048    mock_atft._Shutdown()
1049    mock_atft.atft_manager.ShutdownATFA.assert_called_once()
1050
1051  def testShutdownExceptions(self):
1052    mock_atft = MockAtft()
1053    mock_atft.PauseRefresh = MagicMock()
1054    mock_atft.ResumeRefresh = MagicMock()
1055    mock_atft._SendStartMessageEvent = MagicMock()
1056    mock_atft._SendSucceedMessageEvent = MagicMock()
1057    mock_atft._HandleException = MagicMock()
1058    mock_atft.atft_manager.ShutdownATFA.side_effect = (
1059        fastboot_exceptions.DeviceNotFoundException())
1060    mock_atft._Shutdown()
1061    mock_atft._HandleException.assert_called_once()
1062    mock_atft._HandleException.reset_mock()
1063    mock_atft.atft_manager.ShutdownATFA.side_effect = (
1064        fastboot_exceptions.FastbootFailure(''))
1065    mock_atft._Shutdown()
1066    mock_atft._HandleException.assert_called_once()
1067
1068  # Test atft._ManualProvision
1069  def testManualProvision(self):
1070    mock_atft = MockAtft()
1071    mock_atft.PauseRefresh = MagicMock()
1072    mock_atft.ResumeRefresh = MagicMock()
1073    mock_atft._SendStartMessageEvent = MagicMock()
1074    mock_atft._SendSucceedMessageEvent = MagicMock()
1075    mock_atft._HandleException = MagicMock()
1076    mock_atft.atft_manager.Provision = MagicMock()
1077    mock_atft._SendAlertEvent = MagicMock()
1078    mock_atft._CheckLowKeyAlert = MagicMock()
1079    mock_atft.atft_manager.GetTargetDevice.side_effect = (
1080        self.MockGetTargetDevice)
1081    test_dev1 = TestDeviceInfo(self.TEST_SERIAL1, self.TEST_LOCATION1,
1082                               ProvisionStatus.PROVISION_FAILED)
1083    test_dev1.provision_state.bootloader_locked = True
1084    test_dev1.provision_state.avb_perm_attr_set = True
1085    test_dev1.provision_state.avb_locked = True
1086    test_dev2 = TestDeviceInfo(self.TEST_SERIAL2, self.TEST_LOCATION2,
1087                               ProvisionStatus.LOCKAVB_SUCCESS)
1088    test_dev2.provision_state.bootloader_locked = True
1089    test_dev2.provision_state.avb_perm_attr_set = True
1090    test_dev2.provision_state.avb_locked = True
1091    test_dev3 = TestDeviceInfo(self.TEST_SERIAL3, self.TEST_LOCATION2,
1092                               ProvisionStatus.FUSEATTR_FAILED)
1093    test_dev3.provision_state.bootloader_locked = True
1094    self.device_map[self.TEST_SERIAL1] = test_dev1
1095    self.device_map[self.TEST_SERIAL2] = test_dev2
1096    self.device_map[self.TEST_SERIAL3] = test_dev3
1097    serials = [self.TEST_SERIAL1, self.TEST_SERIAL2, self.TEST_SERIAL3]
1098    mock_atft._ManualProvision(serials)
1099    calls = [call(test_dev1), call(test_dev2)]
1100    mock_atft.atft_manager.Provision.assert_has_calls(calls)
1101
1102  def testManualProvisionExceptions(self):
1103    mock_atft = MockAtft()
1104    mock_atft.PauseRefresh = MagicMock()
1105    mock_atft.ResumeRefresh = MagicMock()
1106    mock_atft._SendStartMessageEvent = MagicMock()
1107    mock_atft._SendSucceedMessageEvent = MagicMock()
1108    mock_atft._HandleException = MagicMock()
1109    mock_atft.atft_manager.Provision = MagicMock()
1110    mock_atft._SendAlertEvent = MagicMock()
1111    mock_atft._CheckLowKeyAlert = MagicMock()
1112    mock_atft.atft_manager.GetTargetDevice.side_effect = (
1113        self.MockGetTargetDevice)
1114    test_dev1 = TestDeviceInfo(self.TEST_SERIAL1, self.TEST_LOCATION1,
1115                               ProvisionStatus.PROVISION_FAILED)
1116    test_dev1.provision_state.bootloader_locked = True
1117    test_dev1.provision_state.avb_perm_attr_set = True
1118    test_dev1.provision_state.avb_locked = True
1119    test_dev2 = TestDeviceInfo(self.TEST_SERIAL2, self.TEST_LOCATION2,
1120                               ProvisionStatus.LOCKAVB_SUCCESS)
1121    test_dev2.provision_state.bootloader_locked = True
1122    test_dev2.provision_state.avb_perm_attr_set = True
1123    test_dev2.provision_state.avb_locked = True
1124    test_dev3 = TestDeviceInfo(self.TEST_SERIAL3, self.TEST_LOCATION2,
1125                               ProvisionStatus.FUSEATTR_FAILED)
1126    test_dev3.provision_state.bootloader_locked = True
1127    self.device_map[self.TEST_SERIAL1] = test_dev1
1128    self.device_map[self.TEST_SERIAL2] = test_dev2
1129    self.device_map[self.TEST_SERIAL3] = test_dev3
1130    serials = [self.TEST_SERIAL1, self.TEST_SERIAL2, self.TEST_SERIAL3]
1131    mock_atft.atft_manager.Provision.side_effect = (
1132        fastboot_exceptions.FastbootFailure(''))
1133    mock_atft._ManualProvision(serials)
1134    self.assertEqual(2, mock_atft._HandleException.call_count)
1135    test_dev1 = TestDeviceInfo(self.TEST_SERIAL1, self.TEST_LOCATION1,
1136                               ProvisionStatus.PROVISION_FAILED)
1137    test_dev1.provision_state.bootloader_locked = True
1138    test_dev1.provision_state.avb_perm_attr_set = True
1139    test_dev1.provision_state.avb_locked = True
1140    test_dev2 = TestDeviceInfo(self.TEST_SERIAL2, self.TEST_LOCATION2,
1141                               ProvisionStatus.LOCKAVB_SUCCESS)
1142    test_dev2.provision_state.bootloader_locked = True
1143    test_dev2.provision_state.avb_perm_attr_set = True
1144    test_dev2.provision_state.avb_locked = True
1145    test_dev3 = TestDeviceInfo(self.TEST_SERIAL3, self.TEST_LOCATION2,
1146                               ProvisionStatus.FUSEATTR_FAILED)
1147    test_dev3.provision_state.bootloader_locked = True
1148    self.device_map[self.TEST_SERIAL1] = test_dev1
1149    self.device_map[self.TEST_SERIAL2] = test_dev2
1150    self.device_map[self.TEST_SERIAL3] = test_dev3
1151    serials = [self.TEST_SERIAL1, self.TEST_SERIAL2, self.TEST_SERIAL3]
1152    mock_atft._HandleException.reset_mock()
1153    mock_atft.atft_manager.Provision.side_effect = (
1154        fastboot_exceptions.DeviceNotFoundException())
1155    mock_atft._ManualProvision(serials)
1156    self.assertEqual(2, mock_atft._HandleException.call_count)
1157
1158  # Test atft._ProcessKey
1159  def testProcessKeySuccess(self):
1160    mock_atft = MockAtft()
1161    mock_atft.atft_manager = MagicMock()
1162    mock_atft.atft_manager._atfa_dev_manager = MagicMock()
1163    mock_atft._CheckATFAStatus = MagicMock()
1164    mock_atft._SendOperationStartEvent = MagicMock()
1165    mock_atft._SendOperationSucceedEvent = MagicMock()
1166    mock_atft.PauseRefresh = MagicMock()
1167    mock_atft.ResumeRefresh = MagicMock()
1168    mock_atft._HandleException = MagicMock()
1169
1170    mock_atft._ProcessKey()
1171    mock_atft.PauseRefresh.assert_called_once()
1172    mock_atft.ResumeRefresh.assert_called_once()
1173    mock_atft._HandleException.assert_not_called()
1174
1175    mock_atft._SendOperationStartEvent.assert_called_once()
1176    mock_atft._SendOperationSucceedEvent.assert_called_once()
1177    mock_atft._CheckATFAStatus.assert_called_once()
1178
1179  def testProcessKeyFailure(self):
1180    self.TestProcessKeyFailureCommon(fastboot_exceptions.FastbootFailure(''))
1181    self.TestProcessKeyFailureCommon(
1182        fastboot_exceptions.DeviceNotFoundException)
1183
1184  def TestProcessKeyFailureCommon(self, exception):
1185    mock_atft = MockAtft()
1186    mock_atft.atft_manager = MagicMock()
1187    mock_atft._SendOperationStartEvent = MagicMock()
1188    mock_atft._SendOperationSucceedEvent = MagicMock()
1189    mock_atft.PauseRefresh = MagicMock()
1190    mock_atft.ResumeRefresh = MagicMock()
1191    mock_atft._HandleException = MagicMock()
1192    mock_atft.atft_manager.ProcessATFAKey = MagicMock()
1193    mock_atft.atft_manager.ProcessATFAKey.side_effect = exception
1194
1195    mock_atft._ProcessKey()
1196    mock_atft.PauseRefresh.assert_called_once()
1197    mock_atft.ResumeRefresh.assert_called_once()
1198    mock_atft._HandleException.assert_called_once()
1199
1200    mock_atft._SendOperationStartEvent.assert_called_once()
1201    mock_atft._SendOperationSucceedEvent.assert_not_called()
1202
1203
1204if __name__ == '__main__':
1205  unittest.main()
1206