1"""
2This module contains the actions that a configurable CFM test can execute.
3"""
4import abc
5import logging
6import random
7import re
8import sys
9import time
10
11class Action(object):
12    """
13    Abstract base class for all actions.
14    """
15    __metaclass__ = abc.ABCMeta
16
17    def __repr__(self):
18        return self.__class__.__name__
19
20    def execute(self, context):
21        """
22        Executes the action.
23
24        @param context ActionContext instance providing dependencies to the
25                action.
26        """
27        logging.info('Executing action "%s"', self)
28        self.do_execute(context)
29        logging.info('Done executing action "%s"', self)
30
31    @abc.abstractmethod
32    def do_execute(self, context):
33        """
34        Performs the actual execution.
35
36        Subclasses must override this method.
37
38        @param context ActionContext instance providing dependencies to the
39                action.
40        """
41        pass
42
43class MuteMicrophone(Action):
44    """
45    Mutes the microphone in a call.
46    """
47    def do_execute(self, context):
48        context.cfm_facade.mute_mic()
49
50class UnmuteMicrophone(Action):
51    """
52    Unmutes the microphone in a call.
53    """
54    def do_execute(self, context):
55        context.cfm_facade.unmute_mic()
56
57class JoinMeeting(Action):
58    """
59    Joins a meeting.
60    """
61    def __init__(self, meeting_code):
62        """
63        Initializes.
64
65        @param meeting_code The meeting code for the meeting to join.
66        """
67        super(JoinMeeting, self).__init__()
68        self.meeting_code = meeting_code
69
70    def __repr__(self):
71        return 'JoinMeeting "%s"' % self.meeting_code
72
73    def do_execute(self, context):
74        context.cfm_facade.join_meeting_session(self.meeting_code)
75
76class CreateMeeting(Action):
77    """
78    Creates a new meeting from the landing page.
79    """
80    def do_execute(self, context):
81        context.cfm_facade.start_meeting_session()
82
83class LeaveMeeting(Action):
84    """
85    Leaves the current meeting.
86    """
87    def do_execute(self, context):
88        context.cfm_facade.end_meeting_session()
89
90class RebootDut(Action):
91    """
92    Reboots the DUT.
93    """
94    def __init__(self, restart_chrome_for_cfm=False):
95        """Initializes.
96
97        To enable the cfm_facade to interact with the CFM, Chrome needs an extra
98        restart. Setting restart_chrome_for_cfm toggles this extra restart.
99
100        @param restart_chrome_for_cfm If True, restarts chrome to enable
101                the cfm_facade and waits for the telemetry commands to become
102                available. If false, does not do an extra restart of Chrome.
103        """
104        self._restart_chrome_for_cfm = restart_chrome_for_cfm
105
106    def do_execute(self, context):
107        context.host.reboot()
108        if self._restart_chrome_for_cfm:
109            context.cfm_facade.restart_chrome_for_cfm()
110            context.cfm_facade.wait_for_meetings_telemetry_commands()
111
112class RepeatTimes(Action):
113    """
114    Repeats a scenario a number of times.
115    """
116    def __init__(self, times, scenario):
117        """
118        Initializes.
119
120        @param times The number of times to repeat the scenario.
121        @param scenario The scenario to repeat.
122        """
123        super(RepeatTimes, self).__init__()
124        self.times = times
125        self.scenario = scenario
126
127    def __str__(self):
128        return 'Repeat[scenario=%s, times=%s]' % (self.scenario, self.times)
129
130    def do_execute(self, context):
131        for _ in xrange(self.times):
132            self.scenario.execute(context)
133
134class AssertFileDoesNotContain(Action):
135    """
136    Asserts that a file on the DUT does not contain specified regexes.
137    """
138    def __init__(self, path, forbidden_regex_list):
139        """
140        Initializes.
141
142        @param path The file path on the DUT to check.
143        @param forbidden_regex_list a list with regular expressions that should
144                not appear in the file.
145        """
146        super(AssertFileDoesNotContain, self).__init__()
147        self.path = path
148        self.forbidden_regex_list = forbidden_regex_list
149
150    def __repr__(self):
151        return ('AssertFileDoesNotContain[path=%s, forbidden_regex_list=%s'
152                % (self.path, self.forbidden_regex_list))
153
154    def do_execute(self, context):
155        contents = context.file_contents_collector.collect_file_contents(
156                self.path)
157        for forbidden_regex in self.forbidden_regex_list:
158            match = re.search(forbidden_regex, contents)
159            if match:
160                raise AssertionError(
161                        'Regex "%s" matched "%s" in "%s"'
162                        % (forbidden_regex, match.group(), self.path))
163
164class AssertUsbDevices(Action):
165    """
166    Asserts that USB devices with given specs matches a predicate.
167    """
168    def __init__(
169            self,
170            usb_device_specs,
171            predicate=lambda usb_device_list: len(usb_device_list) == 1):
172        """
173        Initializes with a spec to assert and a predicate.
174
175        @param usb_device_specs a list of UsbDeviceSpecs for the devices to
176                check.
177        @param predicate A function that accepts a list of UsbDevices
178                and returns true if the list is as expected or false otherwise.
179                If the method returns false an AssertionError is thrown.
180                The default predicate checks that there is exactly one item
181                in the list.
182        """
183        super(AssertUsbDevices, self).__init__()
184        self._usb_device_specs = usb_device_specs
185        self._predicate = predicate
186
187    def do_execute(self, context):
188        usb_devices = context.usb_device_collector.get_devices_by_spec(
189                *self._usb_device_specs)
190        if not self._predicate(usb_devices):
191            raise AssertionError(
192                    'Assertion failed for usb device specs %s. '
193                    'Usb devices were: %s'
194                    % (self._usb_device_specs, usb_devices))
195
196    def __str__(self):
197        return 'AssertUsbDevices for specs %s' % str(self._usb_device_specs)
198
199class SelectScenarioAtRandom(Action):
200    """
201    Executes a randomly selected scenario a number of times.
202
203    Note that there is no validation performed - you have to take care
204    so that it makes sense to execute the supplied scenarios in any order
205    any number of times.
206    """
207    def __init__(
208            self,
209            scenarios,
210            run_times,
211            random_seed=random.randint(0, sys.maxsize)):
212        """
213        Initializes.
214
215        @param scenarios An iterable with scenarios to choose from.
216        @param run_times The number of scenarios to run. I.e. the number of
217            times a random scenario is selected.
218        @param random_seed The seed to use for the random generator. Providing
219            the same seed as an earlier run will execute the scenarios in the
220            same order. Optional, by default a random seed is used.
221        """
222        super(SelectScenarioAtRandom, self).__init__()
223        self._scenarios = scenarios
224        self._run_times = run_times
225        self._random_seed = random_seed
226        self._random = random.Random(random_seed)
227
228    def do_execute(self, context):
229        for _ in xrange(self._run_times):
230            self._random.choice(self._scenarios).execute(context)
231
232    def __repr__(self):
233        return ('SelectScenarioAtRandom [seed=%s, run_times=%s, scenarios=%s]'
234                % (self._random_seed, self._run_times, self._scenarios))
235
236
237class PowerCycleUsbPort(Action):
238    """
239    Power cycle USB ports that a specific peripheral type is attached to.
240    """
241    def __init__(
242            self,
243            usb_device_specs,
244            wait_for_change_timeout=10,
245            filter_function=lambda x: x):
246        """
247        Initializes.
248
249        @param usb_device_specs List of UsbDeviceSpecs of the devices to power
250            cycle the port for.
251        @param wait_for_change_timeout The timeout in seconds for waiting
252            for devices to disappeard/appear after turning power off/on.
253            If the devices do not disappear/appear within the timeout an
254            error is raised.
255        @param filter_function Function accepting a list of UsbDevices and
256            returning a list of UsbDevices that should be power cycled. The
257            default is to return the original list, i.e. power cycle all
258            devices matching the usb_device_specs.
259
260        @raises TimeoutError if the devices do not turn off/on within
261            wait_for_change_timeout seconds.
262        """
263        self._usb_device_specs = usb_device_specs
264        self._filter_function = filter_function
265        self._wait_for_change_timeout = wait_for_change_timeout
266
267    def do_execute(self, context):
268        def _get_devices():
269            return context.usb_device_collector.get_devices_by_spec(
270                    *self._usb_device_specs)
271        devices = _get_devices()
272        devices_to_cycle = self._filter_function(devices)
273        # If we are asked to power cycle a device connected to a USB hub (for
274        # example a Mimo which has an internal hub) the devices's bus and port
275        # cannot be used. Those values represent the bus and port of the hub.
276        # Instead we must locate the device that is actually connected to the
277        # physical USB port. This device is the parent at level 1 of the current
278        # device. If the device is not connected to a hub, device.get_parent(1)
279        # will return the device itself.
280        devices_to_cycle = [device.get_parent(1) for device in devices_to_cycle]
281        logging.debug('Power cycling devices: %s', devices_to_cycle)
282        port_ids = [(d.bus, d.port) for d in devices_to_cycle]
283        context.usb_port_manager.set_port_power(port_ids, False)
284        # TODO(kerl): We should do a better check than counting devices.
285        # Possibly implementing __eq__() in UsbDevice and doing a proper
286        # intersection to see which devices are running or not.
287        expected_devices_after_power_off = len(devices) - len(devices_to_cycle)
288        _wait_for_condition(
289                lambda: len(_get_devices()) == expected_devices_after_power_off,
290                self._wait_for_change_timeout)
291        context.usb_port_manager.set_port_power(port_ids, True)
292        _wait_for_condition(
293                lambda: len(_get_devices()) == len(devices),
294                self._wait_for_change_timeout)
295
296    def __repr__(self):
297        return ('PowerCycleUsbPort[usb_device_specs=%s, '
298                'wait_for_change_timeout=%s]'
299                % (str(self._usb_device_specs), self._wait_for_change_timeout))
300
301
302class Sleep(Action):
303    """
304    Action that sleeps for a number of seconds.
305    """
306    def __init__(self, num_seconds):
307        """
308        Initializes.
309
310        @param num_seconds The number of seconds to sleep.
311        """
312        self._num_seconds = num_seconds
313
314    def do_execute(self, context):
315        time.sleep(self._num_seconds)
316
317    def __repr__(self):
318        return 'Sleep[num_seconds=%s]' % self._num_seconds
319
320
321class RetryAssertAction(Action):
322    """
323    Action that retries an assertion action a number of times if it fails.
324
325    An example use case for this action is to verify that a peripheral device
326    appears after power cycling. E.g.:
327        PowerCycleUsbPort(ATRUS),
328        RetryAssertAction(AssertUsbDevices(ATRUS), 10)
329    """
330    def __init__(self, action, num_tries, retry_delay_seconds=1):
331        """
332        Initializes.
333
334        @param action The action to execute.
335        @param num_tries The number of times to try the action before failing
336            for real. Must be more than 0.
337        @param retry_delay_seconds The number of seconds to sleep between
338            retries.
339
340        @raises ValueError if num_tries is below 1.
341        """
342        super(RetryAssertAction, self).__init__()
343        if num_tries < 1:
344            raise ValueError('num_tries must be > 0. Was %s' % num_tries)
345        self._action = action
346        self._num_tries = num_tries
347        self._retry_delay_seconds = retry_delay_seconds
348
349    def do_execute(self, context):
350        for attempt in xrange(self._num_tries):
351            try:
352                self._action.execute(context)
353                return
354            except AssertionError as e:
355                if attempt == self._num_tries - 1:
356                    raise e
357                else:
358                    logging.info(
359                            'Action %s failed, will retry %d more times',
360                             self._action,
361                             self._num_tries - attempt - 1,
362                             exc_info=True)
363                    time.sleep(self._retry_delay_seconds)
364
365    def __repr__(self):
366        return ('RetryAssertAction[action=%s, '
367                'num_tries=%s, retry_delay_seconds=%s]'
368                % (self._action, self._num_tries, self._retry_delay_seconds))
369
370
371class AssertNoNewCrashes(Action):
372    """
373    Asserts that no new crash files exist on disk.
374    """
375    def do_execute(self, context):
376        new_crash_files = context.crash_detector.get_new_crash_files()
377        if new_crash_files:
378            raise AssertionError(
379                    'New crash files detected: %s' % str(new_crash_files))
380
381
382class TimeoutError(RuntimeError):
383    """
384    Error raised when an operation times out.
385    """
386    pass
387
388
389def _wait_for_condition(condition, timeout_seconds=10):
390    """
391    Wait for a condition to become true.
392
393    Checks the condition every second.
394
395    @param condition The condition to check - a function returning a boolean.
396    @param timeout_seconds The timeout in seconds.
397
398    @raises TimeoutError in case the condition does not become true within
399        the timeout.
400    """
401    if condition():
402        return
403    for _ in xrange(timeout_seconds):
404        time.sleep(1)
405        if condition():
406            return
407    raise TimeoutError('Timeout after %s seconds waiting for condition %s'
408                       % (timeout_seconds, condition))
409
410