1# Copyright 2020 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Stress test utility for repeating actions repeatedly on android devices.
16
17Configures multiple devices to simultaneously run through the same set of
18actions over and over, while keeping logs from various sources. Primarily
19designed for playing audio to the devices and scanning their log output for
20events, while running other adb commands in between.
21"""
22from __future__ import absolute_import
23from __future__ import division
24from __future__ import print_function
25
26import datetime
27from email import encoders
28from email.mime import text
29import email.mime.base as base
30import email.mime.multipart as multipart
31import logging
32import mimetypes
33import os
34import platform
35import re
36import shlex
37import signal
38import smtplib
39import socket
40import subprocess
41import sys
42import tempfile
43import threading
44import time
45import uuid
46import wave
47from absl import app
48from absl import flags
49import pexpect
50import queue
51import stress_test_common
52import stress_test_pb2
53from google.protobuf import text_format
54
55_SUMMARY_LINES = "-" * 73
56
57if sys.platform.startswith("win"):
58  pexpect = None
59
60_SUMMARY_COLUMNS = (
61    "|        Event Type       |      Event Count     | Consecutive no event |")
62_SUMMARY_COL_FORMATT = "|%-25.25s|% 22d|% 22d|"
63
64FLAGS = flags.FLAGS
65flags.DEFINE_string("notification_address", "",
66                    "Email address where to send notification events. Will "
67                    "default to $USER@google.com if not provided. No emails "
68                    "will be sent if suppress_notification_emails is True.")
69flags.DEFINE_bool("suppress_notification_emails", False,
70                  "Prevents emails from being sent as notifications if True.")
71flags.DEFINE_string("test_name", None,
72                    "Name of stress test to run. For example, if you set this "
73                    "to 'dsp_trigger_sw_rejection', the stress test in "
74                    "'stress_test.dsp_trigger_sw_rejection.ascii_proto' will "
75                    "be loaded and executed.")
76# flags.mark_flag_as_required("test_name")
77flags.DEFINE_string("output_root", "./",
78                    "Path where directory should be generated containing all "
79                    "logs from devices and moved files.")
80flags.DEFINE_integer("num_iterations", None,
81                     "If set to a positive number, the number of iterations of "
82                     "the stress test to run. Otherwise, the test runs "
83                     "forever.")
84flags.DEFINE_list("devices", [],
85                  "Serial numbers of devices that should be included in the "
86                  "stress test. If empty, all devices will be used.")
87flags.DEFINE_integer("print_summary_every_n", 10,
88                     "Prints the summary to the log file every n iterations.")
89
90flags.DEFINE_string("email_sender_address", "",
91                    "Account to use for sending notification emails.")
92flags.DEFINE_string("email_sender_password", "",
93                    "Password to use for notification email account.")
94flags.DEFINE_string("email_smtp_server", "smtp.gmail.com",
95                    "SMTP server to use for sending notification emails.")
96flags.DEFINE_integer("email_smtp_port", 465,
97                     "Port to use for the notification SMTP server.")
98flags.DEFINE_integer("device_settle_time", 5,
99                     "Time to wait for devices to settle.")
100flags.DEFINE_bool("use_sox", platform.system() != "Windows",
101                  "Use sox for playback, otherwise, attempt to use platform "
102                  "specific features.")
103flags.DEFINE_bool("attach_bugreport", True,
104                  "Attach bugreport to email if test failed.")
105flags.DEFINE_bool("delete_data_dir", False,
106                  "If true, code will delete all the files generated by this "
107                  "test at the end.")
108
109if platform.system().startswith("CYGWIN"):
110  FLAGS.device_settle_time = 30
111
112
113def QueueWorker(worker_queue):
114  while True:
115    work = worker_queue.get()
116    try:
117      work()
118    except:  # pylint:disable=bare-except
119      logging.exception("Exception in worker queue - task remains uncompleted.")
120    worker_queue.task_done()
121
122
123def SendNotificationEmail(subject, body, bugreport=None):
124  """Sends an email with the specified subject and body.
125
126     Also attach bugreport if bugreport location is provided as argument
127
128  Args:
129    subject: Subject of the email.
130    body: Body of the email.
131    bugreport: If provided, it will be attach to the email.
132  """
133  if FLAGS.suppress_notification_emails:
134    logging.info("Email with subject '%s' has been suppressed", subject)
135    return
136  try:
137    # Assemble the message to send.
138    recpient_address = FLAGS.notification_address
139    message = multipart.MIMEMultipart("alternative")
140    message["From"] = "Stress Test on %s" % socket.gethostname()
141    message["To"] = recpient_address
142    message["Subject"] = subject
143    message.attach(text.MIMEText(body, "plain"))
144    message.attach(text.MIMEText("<pre>%s</pre>" % body, "html"))
145
146    if FLAGS.attach_bugreport and bugreport:
147      # buildozer: disable=unused-variable
148      ctype, _ = mimetypes.guess_type(bugreport)
149      maintype, subtype = ctype.split("/", 1)
150      with open(bugreport, "rb") as fp:
151        att = base.MIMEBase(maintype, subtype)
152        att.set_payload(fp.read())
153        encoders.encode_base64(att)
154        att.add_header("Content-Disposition", "attachment", filename=bugreport)
155        message.attach(att)
156
157    # Send the message from our special account.
158    server = smtplib.SMTP_SSL(FLAGS.email_smtp_server, FLAGS.email_smtp_port)
159    server.login(FLAGS.email_sender_address, FLAGS.email_sender_password)
160    server.sendmail(FLAGS.email_sender_address, recpient_address,
161                    message.as_string())
162    server.quit()
163    logging.info("Email with subject '%s' has been sent", subject)
164  except:  # pylint:disable=bare-except
165    logging.exception("Failed to send notification email")
166
167
168class ProcessLogger(threading.Thread):
169
170  class EventScanner(object):
171
172    def __init__(self, name, process_name, regexes):
173      """Struct to store the data about an event.
174
175      Args:
176        name: Name of event.
177        process_name: Name of the process that is being logged.
178        regexes: An iteratable of regex strings that indicate an event has
179            happened.
180      """
181
182      self.name = name
183      self.process_name = process_name
184      self.searches = [re.compile(regex).search for regex in regexes]
185      self.count = 0
186
187    def ScanForEvent(self, line, lock=None):
188      """Checks the line for matches. If found, updates the internal counter."""
189
190      for search in self.searches:
191        if search(line.decode("utf-8")):
192          # Grab the lock (if provided), update the counter, and release it.
193          if lock: lock.acquire()
194          self.count += 1
195          if lock: lock.release()
196          logging.info("Event '%s' detected on %s", self.name,
197                       self.process_name)
198
199  def __init__(self, name, command, output, events,
200               restart_process, repeats_output_when_opened):
201    """Threaded class that monitors processes for events, and logs output.
202
203    Args:
204      name: The name of the process being logged.
205      command: A list of arguments to be passed to the subprocess to execute.
206      output: Name of output file to write process stdout to. If blank or None,
207          will not be generated.
208      events: An iterable of LoggingEventConfigs to look for in the output.
209      restart_process: Restart the process if it terminates by itself. This
210          should typically be true, but false for processes that only should be
211          run once and have their output logged.
212      repeats_output_when_opened: Set to true if the process will repeat the
213          output of a previous call when it is restarted. This will prevent
214          duplicate lines from being logged.
215    """
216    super(ProcessLogger, self).__init__()
217    self.name = name
218    self.command = command
219    self.restart_process = restart_process
220    self.repeats_output_when_opened = repeats_output_when_opened
221    self.process = None
222    self.lock = threading.Lock()
223    self.looking = False
224
225    # Compile the list of regexes that we're supposed to be looking for.
226    self.events = []
227    for event in events:
228      self.events.append(ProcessLogger.EventScanner(event.name, self.name,
229                                                    event.regex))
230
231    if output:
232      stress_test_common.MakeDirsIfNeeded(os.path.dirname(output))
233      self.output_fp = open(output, "w", encoding="utf-8")
234      logging.info("Logging device info to %s", output)
235    else:
236      self.output_fp = None
237
238  def GetEventCountsSinceLastCall(self):
239    """Returns the counts of all events since this method was last called."""
240    event_map = {}
241    self.lock.acquire()
242    for event in self.events:
243      event_map[event.name] = event.count
244      event.count = 0
245    self.lock.release()
246    return event_map
247
248  def run(self):
249    last_line = None
250    should_log = True
251    first_run = True
252    skip_exception_line = False
253    self.lock.acquire()
254    last_run_time = 0
255    while self.restart_process:
256      self.lock.release()
257      if not first_run:
258        logging.info("Restarting process %s", "".join(str(self.command)))
259        time_since_last_run = datetime.datetime.now() - last_run_time
260        if time_since_last_run.total_seconds() < 1.0:
261          needed_delay = 1.0 - time_since_last_run.total_seconds()
262          logging.info("Delaying for %.2f seconds", needed_delay)
263          time.sleep(needed_delay)
264      else:
265        first_run = False
266
267      try:
268        if pexpect:
269          self.process = pexpect.spawn(" ".join(self.command), timeout=None)
270          output_source = self.process
271        else:
272          self.process = subprocess.Popen(self.command, stdout=subprocess.PIPE)
273          output_source = self.process.stdout
274        last_run_time = datetime.datetime.now()
275        for line in output_source:
276          # If the process we're logging likes to repeat its output, we need to
277          # look for the last line we saw before we start doing anything with
278          # these lines anymore.
279          if self.repeats_output_when_opened:
280            if not should_log:
281              if last_line == line:
282                should_log = True
283              continue
284            elif skip_exception_line:
285              # ignore the last line which caused UnicodeEncodeError
286              skip_exception_line = False
287              continue
288
289          if self.output_fp:
290            self.output_fp.write(line.decode("utf-8", "backslashreplace").rstrip())
291            self.output_fp.write("\n")
292
293          # Loop through all events we're watching for, to see if they occur on
294          # this line. If they do, update the fact that we've seen this event.
295          for event in self.events:
296            if self.looking:
297              event.ScanForEvent(line, lock=self.lock)
298          last_line = line
299      except UnicodeEncodeError:
300        logging.exception("UnicodeEncodeError on running logger process")
301        skip_exception_line = True
302      except:  # pylint:disable=bare-except
303        logging.exception("Exception encountered running process")
304      finally:
305        if pexpect:
306          self.process.terminate()
307        else:
308          self.process.send_signal(signal.SIGTERM)
309        should_log = False
310      self.lock.acquire()
311
312    self.lock.release()
313    if pexpect:
314      if self.process.exitstatus is not None:
315        logging.info("Process finished - exit code %d", self.process.exitstatus)
316      else:
317        logging.info("Process finished - signal code %d",
318                     self.process.signalstatus)
319    else:
320      if self.process.returncode is not None:
321        logging.info("Process finished - return code %d",
322                     self.process.returncode)
323      else:
324        logging.info("Process finished - no return code")
325
326  def StopLogging(self):
327    if self.process:
328      self.lock.acquire()
329      self.restart_process = False
330      self.lock.release()
331
332      if pexpect:
333        self.process.kill(signal.SIGHUP)
334        self.process.kill(signal.SIGINT)
335      else:
336        self.process.send_signal(signal.SIGTERM)
337
338
339class Device(object):
340
341  SECONDS_TO_SLEEP_DURING_ROOT = 0.5
342
343  def __init__(self, serial_number, output_root, test_events, expected_result):
344    """Responsible for monitoring a specific device, and pulling files from it.
345
346    The actual work of the constructor will be handled asynchronously, you must
347    call WaitForTasks() before using the device.
348
349    Args:
350      serial_number: The device serial number.
351      output_root: The directory where to output log files/anything pulled from
352          the device.
353      test_events: The events (with conditions) that come from the StressTest
354          that should be evaluated at every iteration, along with a list of
355          actions to take when one of these events occur. For example, if there
356          have not been any detected hotword triggers, a bugreport can be
357          generated.
358      expected_result: Expected event count to pass the test.
359    """
360    self.serial_number = serial_number
361    self.output_root = output_root
362    self.cmd_string_replacements = {}
363    self.iteration = 0
364    self.cmd_string_replacements["iteration"] = 0
365    self.cmd_string_replacements["serial_number"] = serial_number
366    self.cmd_string_replacements["output_root"] = output_root
367    self.name = None
368    self.process_loggers = []
369    self.event_log = stress_test_pb2.EventLog()
370    self.cnt_per_iteration = expected_result
371
372    # Prepare the work queue, and offload the rest of the init into it.
373    self.work_queue = queue.Queue()
374    self.worker = threading.Thread(target=QueueWorker, args=[self.work_queue])
375    self.worker.daemon = True
376    self.worker.name = self.name
377    self.worker.start()
378    self.abort_requested = False
379    self.remove_device = False
380    self.test_events = test_events
381
382    self.work_queue.put(self.__init_async__)
383
384  def __init_async__(self):
385    # Get the device type, and append it to the serial number.
386    self.device_type = self.Command(["shell", "getprop",
387                                     "ro.product.name"]).strip().decode("utf-8")
388    self.name = "%s_%s" % (self.device_type, self.serial_number)
389    self.worker.name = self.name
390    self.cmd_string_replacements["device"] = self.name
391    logging.info("Setting up device %s", self.name)
392
393    config = stress_test_common.LoadDeviceConfig(self.device_type,
394                                                 self.serial_number)
395
396    # Get the device ready.
397    self.Root()
398
399    # Run any setup commands.
400    for cmd in config.setup_command:
401      result = self.Command(
402          shlex.split(cmd % self.cmd_string_replacements)).strip()
403      if result:
404        for line in result.splitlines():
405          logging.info(line)
406
407    self.files_to_move = config.file_to_move
408
409    self.event_names = set([event.name for event in config.event])
410    self.event_counter = {name: 0 for name in self.event_names}
411    self.iterations_since_event = {name: 0 for name in self.event_names}
412
413    for file_to_watch in config.file_to_watch:
414      # Are there any events that match up with this file?
415      events = [x for x in config.event if x.source == file_to_watch.source]
416
417      if file_to_watch.source == "LOGCAT":
418        command = [
419            "adb", "-s", self.serial_number, "logcat", "-v", "usec", ""
420        ]
421        command.extend(["%s:S" % tag for tag in config.tag_to_suppress])
422        name = "logcat_" + self.serial_number
423      else:
424        command = [
425            "adb", "-s", self.serial_number, "shell",
426            "while : ; do cat %s 2>&1; done" % file_to_watch.source
427        ]
428        name = "%s_%s" % (os.path.basename(
429            file_to_watch.source), self.serial_number)
430
431      process_logger = ProcessLogger(
432          name, command, os.path.join(
433              self.output_root,
434              file_to_watch.destination % self.cmd_string_replacements),
435          events, True, file_to_watch.repeats_output_on_open)
436      self.process_loggers.append(process_logger)
437      process_logger.start()
438
439    # Add any of the background processes.
440    for daemon_process in config.daemon_process:
441      # Are there any events that match up with this file?
442      events = [x for x in config.event if x.source == daemon_process.name]
443      command = shlex.split(
444          daemon_process.command % self.cmd_string_replacements)
445      if daemon_process.destination:
446        output = os.path.join(
447            self.output_root,
448            daemon_process.destination % self.cmd_string_replacements)
449      else:
450        output = None
451      name = "%s_%s" % (daemon_process.name, self.serial_number)
452      process_logger = ProcessLogger(name, command, output, events,
453                                     daemon_process.restart,
454                                     daemon_process.repeats_output_on_open)
455      self.process_loggers.append(process_logger)
456      process_logger.start()
457
458    # Build up the list of events we can actually process.
459    self.__UpdateEventCounters(number_of_iterations=0)
460    test_events = self.test_events
461    self.test_events = []
462    for event in test_events:
463      try:
464        eval(event.condition,  # pylint:disable=eval-used
465             {"__builtins__": None}, self.__ValuesInEval())
466        self.test_events.append(event)
467      except Exception as err:  # pylint:disable=broad-except
468        logging.error("Test event %s is not compatible with %s", event.name,
469                      self.name)
470        logging.error(str(err))
471    # Make sure that device specific events don't have conditions.
472    self.device_events = []
473    for event in config.test_event:
474      if not event.name:
475        logging.error("Device %s test event is missing a name", self.name)
476        continue
477      if event.condition:
478        self.test_events.append(event)
479      else:
480        self.device_events.append(event)
481
482  def StartLookingForEvents(self):
483    """Starts all child ProcessLoggers to start looking for events."""
484    for process_logger in self.process_loggers:
485      process_logger.looking = True
486
487  def __ValuesInEval(self):
488    values_in_eval = {key: value for key, value
489                      in list(self.event_counter.items())}
490    for key, value in list(self.iterations_since_event.items()):
491      values_in_eval["iterations_since_%s" % key] = value
492    return values_in_eval
493
494  def __GetExpectedEventCount(self, event):
495    if event == "logcat_iteration":
496      return -1
497    try:
498      event_cnt = getattr(self.cnt_per_iteration, event)
499    except AttributeError:
500      event_cnt = -1
501      logging.exception("%s is not an attribute of expected_result", event)
502    return event_cnt
503
504  def __UpdateEventCounters(self, number_of_iterations=1):
505    # Update the event counters
506    visited_events = set()
507    error_log = []
508    for process_logger in self.process_loggers:
509      events = process_logger.GetEventCountsSinceLastCall()
510      for event, count in list(events.items()):
511        # Print log when there is any missed event
512        expected_count = self.__GetExpectedEventCount(event)
513
514        if expected_count > 0:
515          if count > expected_count * number_of_iterations:
516            logging.info(
517                "[STRESS_TEST] In iteration %d, got duplicated %s : %d",
518                self.iteration, self.name, count)
519            logging.info("[STRESS_TEST] Will count only : %d",
520                         expected_count * number_of_iterations)
521            count = expected_count * number_of_iterations
522
523        if count:
524          self.event_counter[event] += count
525          visited_events.add(event)
526
527        if expected_count >= 0:
528          if expected_count * number_of_iterations != count:
529            error_log.append(
530                _SUMMARY_COL_FORMATT %
531                (event, count, expected_count * number_of_iterations))
532
533    # Go clear all the events that weren't consecutive.
534    for event in self.iterations_since_event:
535      if event in visited_events:
536        self.iterations_since_event[event] = 0
537      else:
538        self.iterations_since_event[event] += number_of_iterations
539
540    if error_log:
541      logging.info(_SUMMARY_LINES)
542      logging.info(" iteration %d : Something wrong in %s.",
543                   self.iteration, self.name)
544      logging.info(_SUMMARY_LINES)
545      logging.info(_SUMMARY_COLUMNS)
546      logging.info(_SUMMARY_LINES)
547      for line in error_log:
548        logging.info(line)
549      logging.info(_SUMMARY_LINES)
550
551  def ProcessEvents(self):
552    """Updates the event_counter and iterations_since_event maps."""
553    self.work_queue.put(self.__ProcessEventsAsync)
554
555  def __ProcessEventsAsync(self):
556    # Move any files to the local machine that should be moved.
557    if self.files_to_move:
558      for file_to_move in self.files_to_move:
559        try:
560          self.Command(["pull", file_to_move.source, file_to_move.destination])
561        except:  # pylint:disable=bare-except
562          logging.exception("Failed to pull %s", file_to_move.source)
563
564    self.__UpdateEventCounters()
565
566    for event in self.test_events:
567      if eval(event.condition,  # pylint:disable=eval-used
568              {"__builtins__": None}, self.__ValuesInEval()):
569        logging.info("Condition has been met for event '%s'", event.name)
570        # Write the updated event log.
571        event_log_details = self.event_log.event.add()
572        event_log_details.iteration = self.iteration
573        event_log_details.name = event.name
574        with open(os.path.join(self.output_root,
575                               "%s_event_log.ascii_proto" % self.name),
576                  "w") as fp:
577          text_format.PrintMessage(self.event_log, fp)
578
579        # Do whatever other actions that are part of the event.
580        self.__ProcessEventActionQueue(event)
581
582        # Run any device specific actions for this event.
583        for device_event in self.device_events:
584          if device_event.name == event.name:
585            self.__ProcessEventActionQueue(device_event)
586
587    # Set up the next iteration.
588    self.iteration += 1
589    self.cmd_string_replacements["iteration"] = self.iteration
590
591  def __ProcessEventActionQueue(self, event):
592    bugreport = None
593    for action in event.action:
594      if action == "BUGREPORT":
595        bugreport = self.TakeBugReport()
596      elif action.startswith("DUMPSYS "):
597        self.CaptureDumpsys(action[action.find(" ") + 1:])
598      elif action == "NOTIFY":
599        SendNotificationEmail(
600            "%s had event '%s' occur" % (self.name, event.name),
601            "\n".join(["Current Summary:"] + self.GetSummaryLines()), bugreport)
602      elif action == "REMOVE_DEVICE":
603        logging.info("Removing %s from the test", self.serial_number)
604        self.remove_device = True
605      elif action == "ABORT":
606        logging.info("Abort requested")
607        self.abort_requested = True
608      else:
609        action %= self.cmd_string_replacements
610        logging.info("Running command %s on %s", action, self.name)
611        result = self.Command(shlex.split(action)).strip()
612        if result:
613          for line in result.splitlines():
614            logging.info(line)
615
616  def Root(self):
617    self.Command(["root"])
618    time.sleep(Device.SECONDS_TO_SLEEP_DURING_ROOT)
619    self.Command(["wait-for-device"])
620    time.sleep(Device.SECONDS_TO_SLEEP_DURING_ROOT)
621
622  def Stop(self):
623    """Stops all file loggers attached to this device."""
624    for process_logger in self.process_loggers:
625      process_logger.StopLogging()
626    self.process_loggers = []
627
628  def Join(self):
629    for process_logger in self.process_loggers:
630      process_logger.join()
631    self.WaitForTasks()
632
633  def AsyncCommand(self, command, log_output=False):
634    self.work_queue.put(
635        lambda: self.__AsyncCommand(command, log_output=log_output))
636
637  def __AsyncCommand(self, command, log_output=False):
638    result = self.Command(command).strip()
639    if result and log_output:
640      for line in result.splitlines():
641        logging.info(line.decode("utf-8"))
642
643  def Command(self, command):
644    """Runs the provided command on this device."""
645    if command[0] in {"bugreport", "root", "wait-for-device", "shell",
646                      "logcat"}:
647      return subprocess.check_output(
648          ["adb", "-s", self.serial_number] + command)
649    elif command[0] == "DUMPSYS":
650      self.CaptureDumpsys(command[1])
651      return ""
652    elif command[0] == "pull":
653      try:
654        files = subprocess.check_output(
655            ["adb", "-s", self.serial_number, "shell", "ls", command[1]]
656        ).strip().splitlines()
657      except subprocess.CalledProcessError:
658        return ""
659      if len(files) == 1 and "No such file or directory" in files[0]:
660        return ""
661      for source_file in files:
662        destination = os.path.join(self.output_root,
663                                   command[2] % self.cmd_string_replacements)
664        stress_test_common.MakeDirsIfNeeded(os.path.dirname(destination))
665        logging.info("Moving %s from %s to %s", source_file, self.name,
666                     destination)
667        subprocess.check_output(["adb", "-s", self.serial_number, "pull",
668                                 source_file, destination])
669        if FLAGS.delete_data_dir:
670          subprocess.check_output([
671              "adb", "-s", self.serial_number, "shell", "rm", "-rf", source_file
672          ])
673        return ""
674    else:
675      return subprocess.check_output(command)
676
677  def TakeBugReport(self):
678    logging.info("Capturing bugreport on %s", self.name)
679    bugreport = os.path.join(self.output_root,
680                             "%s_bugreport_iteration_%06d.zip" %
681                             (self.name, self.iteration))
682    sdk = int(self.Command(
683        ["shell", "getprop", "ro.build.version.sdk"]).strip())
684    if sdk >= 24:  # SDK 24 = Android N
685      with open(bugreport, "w") as bugreport_fp:
686        bugreport_fp.write(self.Command(["bugreport", bugreport]))
687    else:
688      bugreport_txt = os.path.join(self.output_root,
689                                   "%s_bugreport_iteration_%06d.txt" %
690                                   (self.name, self.iteration))
691      with open(bugreport_txt, "w") as bugreport_fp:
692        bugreport_fp.write(self.Command(["bugreport"]))
693      self.Command(["zip", bugreport, bugreport_txt])
694
695    self.Command(["pull", "/data/anr/traces.txt",
696                  "%s_traces_iteration_%06d.txt" % (self.name, self.iteration)])
697    self.Command(["pull", "/data/anr/traces.txt.bugreport",
698                  "%s_traces_iteration_%06d.txt.bugreport" % (self.name,
699                                                              self.iteration)])
700    return bugreport
701
702  def CaptureDumpsys(self, dumpsys_unit):
703    logging.info("Taking dumpsys %s on %s", dumpsys_unit, self.name)
704    stress_test_common.MakeDirsIfNeeded(os.path.join(self.output_root,
705                                                     self.name))
706    with open(os.path.join(self.output_root, self.name,
707                           "%s_%06d.txt" % (dumpsys_unit, self.iteration)),
708              "w") as dumpsys_fp:
709      dumpsys_fp.write(self.Command(["shell", "dumpsys", dumpsys_unit]))
710
711  def WaitForTasks(self):
712    self.work_queue.join()
713
714  def GetSummaryLines(self):
715    lines = [
716        "Device {}".format(self.name),
717        _SUMMARY_LINES, _SUMMARY_COLUMNS, _SUMMARY_LINES
718    ]
719    for event, count in sorted(self.event_counter.items()):
720      lines.append(_SUMMARY_COL_FORMATT % (
721          event, count, self.iterations_since_event[event]))
722    lines.append(_SUMMARY_LINES)
723    return lines
724
725
726def RunAsyncCommand(devices, command):
727  """Helper function for running async commands on many devices."""
728  for device in devices:
729    device.AsyncCommand(command)
730  for device in devices:
731    device.WaitForTasks()
732
733
734class StressTest(object):
735  """Manages dispatching commands to devices/playing audio and events."""
736
737  def __init__(self, output_root, test_name):
738    self.output_root = output_root
739    self.devices = []
740    self.test_name = test_name
741    config = stress_test_pb2.StressTestConfig()
742    config_contents = stress_test_common.GetResourceContents(
743        os.path.join(stress_test_common.RESOURCE_DIR,
744                     "stress_test.%s.ascii_proto" % test_name))
745    text_format.Merge(config_contents, config)
746    self.events = config.event
747    self.setup_commands = config.setup_command
748    self.steps = config.step
749    self.audio_tempfiles = {}
750    self.uuid = str(uuid.uuid4())
751    self.expected_result = None
752    self.iteration = 0
753    if config.expected_result:
754      self.expected_result = config.expected_result[0]
755
756    # Place all the audio files into temp files.
757    for step in self.steps:
758      if step.audio_file and step.audio_file not in self.audio_tempfiles:
759        # We can't delete the temp file on windows, since it gets nuked too
760        # early.
761        audio_tempfile = tempfile.NamedTemporaryFile(
762            delete=(platform.system() != "Windows"),
763            dir="." if platform.system().startswith("CYGWIN") else None
764        )
765        if platform.system().startswith("CYGWIN"):
766          audio_tempfile.name = os.path.basename(audio_tempfile.name)
767        self.audio_tempfiles[step.audio_file] = audio_tempfile
768        if FLAGS.use_sox:
769          # Write out the raw PCM samples as a wave file.
770          audio_tempfile.write(
771              stress_test_common.GetResourceContents(step.audio_file))
772        else:
773          # Make a temporary wave file for playout if we can't use sox.
774          wavefile = wave.open(audio_tempfile, "wb")
775          if step.audio_file_sample_rate <= 0:
776            step.audio_file_sample_rate = 16000
777          wavefile.setframerate(step.audio_file_sample_rate)
778          if step.audio_file_num_channels <= 0:
779            step.audio_file_num_channels = 1
780          wavefile.setnchannels(step.audio_file_num_channels)
781          if not step.audio_file_format:
782            wavefile.setsampwidth(2)
783          elif step.audio_file_format == "s8":
784            wavefile.setsampwidth(1)
785          elif step.audio_file_format == "s16":
786            wavefile.setsampwidth(2)
787          elif step.audio_file_format == "s32":
788            wavefile.setsampwidth(4)
789          else:
790            raise RuntimeError(
791                "Unsupported wave file format for %s" % step.audio_file)
792          wavefile.writeframes(stress_test_common.GetResourceContents(
793              step.audio_file))
794          wavefile.close()
795        audio_tempfile.flush()
796
797        if platform.system() == "Windows":
798          audio_tempfile.close()
799
800    # Create all the devices that are attached to this machine.
801    for serial_number in self.GetActiveSerialNumbers():
802      self.devices.append(
803          Device(serial_number, output_root, self.events, self.expected_result))
804    if not self.devices:
805      raise app.UsageError("No devices connected")
806
807    self.devices.sort(key=lambda x: x.name)
808
809    # Make sure every device is done with their work for setup.
810    for device in self.devices:
811      device.WaitForTasks()
812
813    # Write out the info meta-data proto. Useful for doing analysis of the logs
814    # after the stress test has completed.
815    stress_test_info = stress_test_pb2.StressTestInfo()
816    stress_test_info.test_name = self.test_name
817    stress_test_info.test_description = config.description
818    stress_test_info.uuid = self.uuid
819    for device in self.devices:
820      device_pb = stress_test_info.device.add()
821      device_pb.device_type = device.device_type
822      device_pb.serial_number = device.serial_number
823
824    text_format.PrintMessage(stress_test_info, open(os.path.join(
825        self.output_root, "stress_test_info.ascii_proto"), "w"))
826
827  def GetActiveSerialNumbers(self):
828    serial_numbers = []
829    for line in sorted(
830        subprocess.check_output(["adb", "devices"]).splitlines()):
831      if line.endswith(b"device"):
832        serial_number = line.split()[0].strip()
833        if FLAGS.devices and serial_number not in FLAGS.devices:
834          continue
835        serial_numbers.append(serial_number.decode("utf-8"))
836    return serial_numbers
837
838  def Start(self):
839    logging.info("Waiting for devices to settle")
840    time.sleep(5)
841    # Make a copy of the device list, as we'll be modifying this actual list.
842    devices = list(self.devices)
843    dropped_devices = []
844
845    # If we have any setup commands, run them.
846    for command in self.setup_commands:
847      logging.info("Running command %s", command)
848      # Can't use the async command helper function since we need to get at
849      # the device cmd_string_replacements.
850      for device in devices:
851        device.AsyncCommand(
852            shlex.split(command % device.cmd_string_replacements),
853            log_output=True)
854      for device in devices:
855        device.WaitForTasks()
856
857    for device in devices:
858      device.StartLookingForEvents()
859      device.AsyncCommand(["shell", "log", "-t", "STRESS_TEST",
860                           "Starting {%s} TZ=$(getprop persist.sys.timezone) "
861                           "YEAR=$(date +%%Y)" % self.uuid], True)
862    self.iteration = 0
863    while True:
864      logging.info("Starting iteration %d", self.iteration)
865      # Perform all the actions specified in the test.
866      RunAsyncCommand(devices, [
867          "shell", "log", "-t", "STRESS_TEST",
868          "Performing iteration %d $(head -n 3 "
869          "/proc/timer_list | tail -n 1)" % self.iteration
870      ])
871
872      for step in self.steps:
873        if step.delay_before:
874          logging.info("Waiting for %.2f seconds", step.delay_before)
875          time.sleep(step.delay_before)
876
877        if step.audio_file:
878          logging.info("Playing %s", step.audio_file)
879          RunAsyncCommand(devices, ["shell", "log", "-t", "STRESS_TEST",
880                                    "Playing %s" % step.audio_file])
881
882          if FLAGS.use_sox:
883            subprocess.check_call(["sox", "-q",
884                                   self.audio_tempfiles[step.audio_file].name,
885                                   "-d"])
886          elif platform.system() == "Windows":
887            import winsound  # pylint:disable=g-import-not-at-top
888            winsound.PlaySound(self.audio_tempfiles[step.audio_file].name,
889                               winsound.SND_FILENAME | winsound.SND_NODEFAULT)
890          else:
891            raise app.RuntimeError("Unsupported platform for audio playback")
892
893        if step.command:
894          logging.info("Running command %s", step.command)
895          # Can't use the async command helper function since we need to get at
896          # the device cmd_string_replacements.
897          for device in devices:
898            device.AsyncCommand(
899                shlex.split(step.command % device.cmd_string_replacements),
900                log_output=True)
901          for device in devices:
902            device.WaitForTasks()
903
904        if step.delay_after:
905          logging.info("Waiting for %.2f seconds", step.delay_after)
906          time.sleep(step.delay_after)
907
908      RunAsyncCommand(devices, [
909          "shell", "log", "-t", "STRESS_TEST",
910          "Iteration %d complete $(head -n 3 "
911          "/proc/timer_list | tail -n 1)" % self.iteration
912      ])
913      self.iteration += 1
914
915      # TODO(somebody): Sometimes the logcat seems to get stuck and buffers for
916      # a bit. This throws off the event counts, so we should probably add some
917      # synchronization rules before we trigger any events.
918
919      # Go through each device, update the event counter, and see if we need to
920      # trigger any events.
921      devices_to_remove = []
922      abort_requested = False
923      active_devices = self.GetActiveSerialNumbers()
924      for device in devices:
925        if device.serial_number in active_devices:
926          device.ProcessEvents()
927        else:
928          logging.error("Dropped device %s", device.name)
929          SendNotificationEmail(
930              "Dropped device %s" % device.name,
931              "Device %s is not longer present in the system" % device.name)
932          dropped_devices.append(device)
933          devices_to_remove.append(device)
934
935      # Check to see if any of the dropped devices have come back. If yes, grab
936      # a bug report.
937      for device in dropped_devices:
938        if device.serial_number in active_devices:
939          logging.info("Device %s reappeared", device.name)
940          device.Root()
941          device.TakeBugReport()
942
943      dropped_devices = [d for d in dropped_devices
944                         if d.serial_number not in active_devices]
945
946      for device in devices:
947        device.WaitForTasks()
948        if device.remove_device:
949          devices_to_remove.append(device)
950        if device.abort_requested:
951          abort_requested = True
952
953      # Remove devices from our list of things to monitor if they've been marked
954      # for deletion.
955      if devices_to_remove:
956        for device in devices_to_remove:
957          device.Stop()
958        devices = [d for d in devices if d not in devices_to_remove]
959
960      # Print out the iteration summary.
961      if self.iteration % FLAGS.print_summary_every_n == 0:
962        for line in self.GetSummaryLines():
963          logging.info(line)
964
965      # See if we need to break out of the outer loop.
966      if abort_requested or not devices:
967        break
968      if FLAGS.num_iterations:
969        if self.iteration >= FLAGS.num_iterations:
970          logging.info("Completed full iteration : %d", self.iteration)
971          break
972    SendNotificationEmail(
973        "Stress test %s completed" % (FLAGS.test_name),
974        "\n".join(["Summary:"] + self.GetSummaryLines()))
975
976  def Stop(self):
977    logging.debug("Stopping devices")
978    for device in self.devices:
979      device.Stop()
980    for device in self.devices:
981      device.Join()
982
983  def GetSummaryLines(self):
984    lines = [
985        _SUMMARY_LINES,
986        "Conducted %d iterations out of %d" %
987        (self.iteration, FLAGS.num_iterations),
988        _SUMMARY_LINES
989    ]
990    for device in self.devices:
991      lines.extend(device.GetSummaryLines())
992    lines.append(_SUMMARY_LINES)
993    return lines
994
995
996def main(unused_argv):
997  # Check to make sure that there are no other instances of ADB running - if
998  # there are, print a warning and wait a bit for them to see it and decide if
999  # they want to keep running, knowing that logs may be invalid.
1000  try:
1001    if "adb" in subprocess.check_output(["ps", "-ale"]).decode("utf-8"):
1002      print("It looks like there are other instances of adb running. If these "
1003            "other instances are also cating log files, you will not be "
1004            "capturing everything in this stress test (so logs will be "
1005            "invalid).")
1006      print("Continuing in 3...", end=" ")
1007      sys.stdout.flush()
1008      for i in [2, 1, 0]:
1009        time.sleep(1)
1010        if i:
1011          print("%d..." % i, end=" ")
1012        else:
1013          print("")
1014        sys.stdout.flush()
1015  except OSError:
1016    print("Unexpected error:", sys.exc_info()[0])
1017    if sys.platform.startswith("win"):
1018      pass
1019    else:
1020      raise
1021
1022  # Make the base output directory.
1023  output_root = os.path.join(FLAGS.output_root, "%s_%s" % (
1024      FLAGS.test_name, datetime.datetime.now().strftime("%Y%m%d_%H%M%S")))
1025  # output_root = os.path.join(FLAGS.output_root, FLAGS.test_name)
1026  stress_test_common.MakeDirsIfNeeded(output_root)
1027
1028  # Set up logging.
1029  formatter = logging.Formatter(
1030      "%(levelname)-1.1s %(asctime)s [%(threadName)-16.16s] %(message)s")
1031  root_logger = logging.getLogger()
1032  root_logger.setLevel(logging.INFO)
1033  root_logger.setLevel(logging.DEBUG)
1034
1035  file_handler = logging.FileHandler(os.path.join(output_root,
1036                                                  "stress_test.log"))
1037  file_handler.setFormatter(formatter)
1038  root_logger.addHandler(file_handler)
1039
1040  console_handler = logging.StreamHandler()
1041  console_handler.setFormatter(formatter)
1042  root_logger.addHandler(console_handler)
1043
1044  stress_test = StressTest(output_root, FLAGS.test_name)
1045  try:
1046    stress_test.Start()
1047  finally:
1048    logging.info("Stopping device logging threads")
1049    stress_test.Stop()
1050    for line in stress_test.GetSummaryLines():
1051      logging.info(line)
1052    if FLAGS.delete_data_dir:
1053      print("Deleting Data Dir")
1054      subprocess.check_output(["rm", "-r", "-f", output_root])
1055
1056
1057if __name__ == "__main__":
1058  app.run(main)
1059