1# Copyright 2014 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4""" Wrapper that allows method execution in parallel.
5
6This class wraps a list of objects of the same type, emulates their
7interface, and executes any functions called on the objects in parallel
8in ReraiserThreads.
9
10This means that, given a list of objects:
11
12  class Foo:
13    def __init__(self):
14      self.baz = Baz()
15
16    def bar(self, my_param):
17      // do something
18
19  list_of_foos = [Foo(1), Foo(2), Foo(3)]
20
21we can take a sequential operation on that list of objects:
22
23  for f in list_of_foos:
24    f.bar('Hello')
25
26and run it in parallel across all of the objects:
27
28  Parallelizer(list_of_foos).bar('Hello')
29
30It can also handle (non-method) attributes of objects, so that this:
31
32  for f in list_of_foos:
33    f.baz.myBazMethod()
34
35can be run in parallel with:
36
37  Parallelizer(list_of_foos).baz.myBazMethod()
38
39Because it emulates the interface of the wrapped objects, a Parallelizer
40can be passed to a method or function that takes objects of that type:
41
42  def DoesSomethingWithFoo(the_foo):
43    the_foo.bar('Hello')
44    the_foo.bar('world')
45    the_foo.baz.myBazMethod
46
47  DoesSomethingWithFoo(Parallelizer(list_of_foos))
48
49Note that this class spins up a thread for each object. Using this class
50to parallelize operations that are already fast will incur a net performance
51penalty.
52
53"""
54# pylint: disable=protected-access
55
56from devil.utils import reraiser_thread
57from devil.utils import watchdog_timer
58
59_DEFAULT_TIMEOUT = 30
60_DEFAULT_RETRIES = 3
61
62
63class Parallelizer(object):
64  """Allows parallel execution of method calls across a group of objects."""
65
66  def __init__(self, objs):
67    self._orig_objs = objs
68    self._objs = objs
69
70  def __getattr__(self, name):
71    """Emulate getting the |name| attribute of |self|.
72
73    Args:
74      name: The name of the attribute to retrieve.
75    Returns:
76      A Parallelizer emulating the |name| attribute of |self|.
77    """
78    self.pGet(None)
79
80    r = type(self)(self._orig_objs)
81    r._objs = [getattr(o, name) for o in self._objs]
82    return r
83
84  def __getitem__(self, index):
85    """Emulate getting the value of |self| at |index|.
86
87    Returns:
88      A Parallelizer emulating the value of |self| at |index|.
89    """
90    self.pGet(None)
91
92    r = type(self)(self._orig_objs)
93    r._objs = [o[index] for o in self._objs]
94    return r
95
96  def __call__(self, *args, **kwargs):
97    """Emulate calling |self| with |args| and |kwargs|.
98
99    Note that this call is asynchronous. Call pFinish on the return value to
100    block until the call finishes.
101
102    Returns:
103      A Parallelizer wrapping the ReraiserThreadGroup running the call in
104      parallel.
105    Raises:
106      AttributeError if the wrapped objects aren't callable.
107    """
108    self.pGet(None)
109
110    for o in self._objs:
111      if not callable(o):
112        raise AttributeError("'%s' is not callable" % o.__name__)
113
114    r = type(self)(self._orig_objs)
115    r._objs = reraiser_thread.ReraiserThreadGroup([
116        reraiser_thread.ReraiserThread(
117            o, args=args, kwargs=kwargs, name='%s.%s' % (str(d), o.__name__))
118        for d, o in zip(self._orig_objs, self._objs)
119    ])
120    r._objs.StartAll()
121    return r
122
123  def pFinish(self, timeout):
124    """Finish any outstanding asynchronous operations.
125
126    Args:
127      timeout: The maximum number of seconds to wait for an individual
128               result to return, or None to wait forever.
129    Returns:
130      self, now emulating the return values.
131    """
132    self._assertNoShadow('pFinish')
133    if isinstance(self._objs, reraiser_thread.ReraiserThreadGroup):
134      self._objs.JoinAll()
135      self._objs = self._objs.GetAllReturnValues(
136          watchdog_timer.WatchdogTimer(timeout))
137    return self
138
139  def pGet(self, timeout):
140    """Get the current wrapped objects.
141
142    Args:
143      timeout: Same as |pFinish|.
144    Returns:
145      A list of the results, in order of the provided devices.
146    Raises:
147      Any exception raised by any of the called functions.
148    """
149    self._assertNoShadow('pGet')
150    self.pFinish(timeout)
151    return self._objs
152
153  def pMap(self, f, *args, **kwargs):
154    """Map a function across the current wrapped objects in parallel.
155
156    This calls f(o, *args, **kwargs) for each o in the set of wrapped objects.
157
158    Note that this call is asynchronous. Call pFinish on the return value to
159    block until the call finishes.
160
161    Args:
162      f: The function to call.
163      args: The positional args to pass to f.
164      kwargs: The keyword args to pass to f.
165    Returns:
166      A Parallelizer wrapping the ReraiserThreadGroup running the map in
167      parallel.
168    """
169    self._assertNoShadow('pMap')
170    r = type(self)(self._orig_objs)
171    r._objs = reraiser_thread.ReraiserThreadGroup([
172        reraiser_thread.ReraiserThread(
173            f,
174            args=tuple([o] + list(args)),
175            kwargs=kwargs,
176            name='%s(%s)' % (f.__name__, d))
177        for d, o in zip(self._orig_objs, self._objs)
178    ])
179    r._objs.StartAll()
180    return r
181
182  def _assertNoShadow(self, attr_name):
183    """Ensures that |attr_name| isn't shadowing part of the wrapped obejcts.
184
185    If the wrapped objects _do_ have an |attr_name| attribute, it will be
186    inaccessible to clients.
187
188    Args:
189      attr_name: The attribute to check.
190    Raises:
191      AssertionError if the wrapped objects have an attribute named 'attr_name'
192      or '_assertNoShadow'.
193    """
194    if isinstance(self._objs, reraiser_thread.ReraiserThreadGroup):
195      assert not hasattr(self._objs, '_assertNoShadow')
196      assert not hasattr(self._objs, attr_name)
197    else:
198      assert not any(hasattr(o, '_assertNoShadow') for o in self._objs)
199      assert not any(hasattr(o, attr_name) for o in self._objs)
200
201
202class SyncParallelizer(Parallelizer):
203  """A Parallelizer that blocks on function calls."""
204
205  def __enter__(self):
206    """Emulate entering the context of |self|.
207
208    Note that this call is synchronous.
209
210    Returns:
211      A Parallelizer emulating the value returned from entering into the
212      context of |self|.
213    """
214    r = type(self)(self._orig_objs)
215    r._objs = [o.__enter__ for o in r._objs]
216    return r.__call__()
217
218  def __exit__(self, exc_type, exc_val, exc_tb):
219    """Emulate exiting the context of |self|.
220
221    Note that this call is synchronous.
222
223    Args:
224      exc_type: the exception type.
225      exc_val: the exception value.
226      exc_tb: the exception traceback.
227    """
228    r = type(self)(self._orig_objs)
229    r._objs = [o.__exit__ for o in r._objs]
230    r.__call__(exc_type, exc_val, exc_tb)
231
232  # override
233  def __call__(self, *args, **kwargs):
234    """Emulate calling |self| with |args| and |kwargs|.
235
236    Note that this call is synchronous.
237
238    Returns:
239      A Parallelizer emulating the value returned from calling |self| with
240      |args| and |kwargs|.
241    Raises:
242      AttributeError if the wrapped objects aren't callable.
243    """
244    r = super(SyncParallelizer, self).__call__(*args, **kwargs)
245    r.pFinish(None)
246    return r
247
248  # override
249  def pMap(self, f, *args, **kwargs):
250    """Map a function across the current wrapped objects in parallel.
251
252    This calls f(o, *args, **kwargs) for each o in the set of wrapped objects.
253
254    Note that this call is synchronous.
255
256    Args:
257      f: The function to call.
258      args: The positional args to pass to f.
259      kwargs: The keyword args to pass to f.
260    Returns:
261      A Parallelizer wrapping the ReraiserThreadGroup running the map in
262      parallel.
263    """
264    r = super(SyncParallelizer, self).pMap(f, *args, **kwargs)
265    r.pFinish(None)
266    return r
267