1# Copyright 2013 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4"""Thread and ThreadGroup that reraise exceptions on the main thread."""
5# pylint: disable=W0212
6
7import logging
8import sys
9import threading
10import time
11import traceback
12
13from devil import base_error
14from devil.utils import watchdog_timer
15
16
17class TimeoutError(base_error.BaseError):
18  """Module-specific timeout exception."""
19
20  def __init__(self, message):
21    super(TimeoutError, self).__init__(message)
22
23
24def LogThreadStack(thread, error_log_func=logging.critical):
25  """Log the stack for the given thread.
26
27  Args:
28    thread: a threading.Thread instance.
29    error_log_func: Logging function when logging errors.
30  """
31  stack = sys._current_frames()[thread.ident]
32  error_log_func('*' * 80)
33  error_log_func('Stack dump for thread %r', thread.name)
34  error_log_func('*' * 80)
35  for filename, lineno, name, line in traceback.extract_stack(stack):
36    error_log_func('File: "%s", line %d, in %s', filename, lineno, name)
37    if line:
38      error_log_func('  %s', line.strip())
39  error_log_func('*' * 80)
40
41
42class ReraiserThread(threading.Thread):
43  """Thread class that can reraise exceptions."""
44
45  def __init__(self, func, args=None, kwargs=None, name=None):
46    """Initialize thread.
47
48    Args:
49      func: callable to call on a new thread.
50      args: list of positional arguments for callable, defaults to empty.
51      kwargs: dictionary of keyword arguments for callable, defaults to empty.
52      name: thread name, defaults to the function name.
53    """
54    if not name:
55      if hasattr(func, '__name__') and func.__name__ != '<lambda>':
56        name = func.__name__
57      else:
58        name = 'anonymous'
59    super(ReraiserThread, self).__init__(name=name)
60    if not args:
61      args = []
62    if not kwargs:
63      kwargs = {}
64    self.daemon = True
65    self._func = func
66    self._args = args
67    self._kwargs = kwargs
68    self._ret = None
69    self._exc_info = None
70    self._thread_group = None
71
72  if sys.version_info < (3, ):
73    # pylint: disable=exec-used
74    exec ('''def ReraiseIfException(self):
75  """Reraise exception if an exception was raised in the thread."""
76  if self._exc_info:
77    raise self._exc_info[0], self._exc_info[1], self._exc_info[2]''')
78  else:
79
80    def ReraiseIfException(self):
81      """Reraise exception if an exception was raised in the thread."""
82      if self._exc_info:
83        raise self._exc_info[1]
84
85  def GetReturnValue(self):
86    """Reraise exception if present, otherwise get the return value."""
87    self.ReraiseIfException()
88    return self._ret
89
90  # override
91  def run(self):
92    """Overrides Thread.run() to add support for reraising exceptions."""
93    try:
94      self._ret = self._func(*self._args, **self._kwargs)
95    except:  # pylint: disable=W0702
96      self._exc_info = sys.exc_info()
97
98
99class ReraiserThreadGroup(object):
100  """A group of ReraiserThread objects."""
101
102  def __init__(self, threads=None):
103    """Initialize thread group.
104
105    Args:
106      threads: a list of ReraiserThread objects; defaults to empty.
107    """
108    self._threads = []
109    # Set when a thread from one group has called JoinAll on another. It is used
110    # to detect when a there is a TimeoutRetryThread active that links to the
111    # current thread.
112    self.blocked_parent_thread_group = None
113    if threads:
114      for thread in threads:
115        self.Add(thread)
116
117  def Add(self, thread):
118    """Add a thread to the group.
119
120    Args:
121      thread: a ReraiserThread object.
122    """
123    assert thread._thread_group is None
124    thread._thread_group = self
125    self._threads.append(thread)
126
127  def StartAll(self, will_block=False):
128    """Start all threads.
129
130    Args:
131      will_block: Whether the calling thread will subsequently block on this
132        thread group. Causes the active ReraiserThreadGroup (if there is one)
133        to be marked as blocking on this thread group.
134    """
135    if will_block:
136      # Multiple threads blocking on the same outer thread should not happen in
137      # practice.
138      assert not self.blocked_parent_thread_group
139      self.blocked_parent_thread_group = CurrentThreadGroup()
140    for thread in self._threads:
141      thread.start()
142
143  def _JoinAll(self, watcher=None, timeout=None):
144    """Join all threads without stack dumps.
145
146    Reraises exceptions raised by the child threads and supports breaking
147    immediately on exceptions raised on the main thread.
148
149    Args:
150      watcher: Watchdog object providing the thread timeout. If none is
151          provided, the thread will never be timed out.
152      timeout: An optional number of seconds to wait before timing out the join
153          operation. This will not time out the threads.
154    """
155    if watcher is None:
156      watcher = watchdog_timer.WatchdogTimer(None)
157    alive_threads = self._threads[:]
158    end_time = (time.time() + timeout) if timeout else None
159    try:
160      while alive_threads and (end_time is None or end_time > time.time()):
161        for thread in alive_threads[:]:
162          if watcher.IsTimedOut():
163            raise TimeoutError('Timed out waiting for %d of %d threads.' %
164                               (len(alive_threads), len(self._threads)))
165          # Allow the main thread to periodically check for interrupts.
166          thread.join(0.1)
167          if not thread.isAlive():
168            alive_threads.remove(thread)
169      # All threads are allowed to complete before reraising exceptions.
170      for thread in self._threads:
171        thread.ReraiseIfException()
172    finally:
173      self.blocked_parent_thread_group = None
174
175  def IsAlive(self):
176    """Check whether any of the threads are still alive.
177
178    Returns:
179      Whether any of the threads are still alive.
180    """
181    return any(t.isAlive() for t in self._threads)
182
183  def JoinAll(self, watcher=None, timeout=None,
184              error_log_func=logging.critical):
185    """Join all threads.
186
187    Reraises exceptions raised by the child threads and supports breaking
188    immediately on exceptions raised on the main thread. Unfinished threads'
189    stacks will be logged on watchdog timeout.
190
191    Args:
192      watcher: Watchdog object providing the thread timeout. If none is
193          provided, the thread will never be timed out.
194      timeout: An optional number of seconds to wait before timing out the join
195          operation. This will not time out the threads.
196      error_log_func: Logging function when logging errors.
197    """
198    try:
199      self._JoinAll(watcher, timeout)
200    except TimeoutError:
201      error_log_func('Timed out. Dumping threads.')
202      for thread in (t for t in self._threads if t.isAlive()):
203        LogThreadStack(thread, error_log_func=error_log_func)
204      raise
205
206  def GetAllReturnValues(self, watcher=None):
207    """Get all return values, joining all threads if necessary.
208
209    Args:
210      watcher: same as in |JoinAll|. Only used if threads are alive.
211    """
212    if any([t.isAlive() for t in self._threads]):
213      self.JoinAll(watcher)
214    return [t.GetReturnValue() for t in self._threads]
215
216
217def CurrentThreadGroup():
218  """Returns the ReraiserThreadGroup that owns the running thread.
219
220  Returns:
221    The current thread group, otherwise None.
222  """
223  current_thread = threading.current_thread()
224  if isinstance(current_thread, ReraiserThread):
225    return current_thread._thread_group  # pylint: disable=no-member
226  return None
227
228
229def RunAsync(funcs, watcher=None):
230  """Executes the given functions in parallel and returns their results.
231
232  Args:
233    funcs: List of functions to perform on their own threads.
234    watcher: Watchdog object providing timeout, by default waits forever.
235
236  Returns:
237    A list of return values in the order of the given functions.
238  """
239  thread_group = ReraiserThreadGroup(ReraiserThread(f) for f in funcs)
240  thread_group.StartAll(will_block=True)
241  return thread_group.GetAllReturnValues(watcher=watcher)
242