1__author__ = """Copyright Andy Whitcroft, Martin J. Bligh - 2006, 2007"""
2
3import sys, os, signal, time, cPickle, logging
4
5from autotest_lib.client.common_lib import error, utils
6from autotest_lib.client.common_lib.cros import retry
7
8
9# entry points that use subcommand must set this to their logging manager
10# to get log redirection for subcommands
11logging_manager_object = None
12
13
14def parallel(tasklist, timeout=None, return_results=False):
15    """
16    Run a set of predefined subcommands in parallel.
17
18    @param tasklist: A list of subcommand instances to execute.
19    @param timeout: Number of seconds after which the commands should timeout.
20    @param return_results: If True instead of an AutoServError being raised
21            on any error a list of the results|exceptions from the tasks is
22            returned.  [default: False]
23    """
24    run_error = False
25    for task in tasklist:
26        task.fork_start()
27
28    remaining_timeout = None
29    if timeout:
30        endtime = time.time() + timeout
31
32    results = []
33    for task in tasklist:
34        if timeout:
35            remaining_timeout = max(endtime - time.time(), 1)
36        try:
37            status = task.fork_waitfor(timeout=remaining_timeout)
38        except error.AutoservSubcommandError:
39            run_error = True
40        else:
41            if status != 0:
42                run_error = True
43
44        results.append(cPickle.load(task.result_pickle))
45        task.result_pickle.close()
46
47    if return_results:
48        return results
49    elif run_error:
50        message = 'One or more subcommands failed:\n'
51        for task, result in zip(tasklist, results):
52            message += 'task: %s returned/raised: %r\n' % (task, result)
53        raise error.AutoservError(message)
54
55
56def parallel_simple(function, arglist, subdir_name_constructor=lambda x: str(x),
57                    log=True, timeout=None, return_results=False):
58    """
59    Each element in the arglist used to create a subcommand object,
60    where that arg is used both as a subdir name, and a single argument
61    to pass to "function".
62
63    We create a subcommand object for each element in the list,
64    then execute those subcommand objects in parallel.
65
66    NOTE: As an optimization, if len(arglist) == 1 a subcommand is not used.
67
68    @param function: A callable to run in parallel once per arg in arglist.
69    @param arglist: A list of arguments to be used one per subcommand
70    @param subdir_name_constructor: A function that returns a name for the
71            result sub-directory created per subcommand.
72            Signature is:
73                subdir_name_constructor(arg)
74            where arg is the argument passed to function.
75    @param log: If True, output will be written to output in a subdirectory
76            named after each subcommand's arg.
77    @param timeout: Number of seconds after which the commands should timeout.
78    @param return_results: If True instead of an AutoServError being raised
79            on any error a list of the results|exceptions from the function
80            called on each arg is returned.  [default: False]
81
82    @returns None or a list of results/exceptions.
83    """
84    if not arglist:
85        logging.warning('parallel_simple was called with an empty arglist, '
86                        'did you forget to pass in a list of machines?')
87
88    # Bypass the multithreading if only one machine.
89    if len(arglist) == 1:
90        arg = arglist[0]
91        if return_results:
92            try:
93                result = function(arg)
94            except Exception, e:
95                return [e]
96            return [result]
97        else:
98            function(arg)
99            return
100
101    subcommands = []
102    for arg in arglist:
103        args = [arg]
104        subdir = subdir_name_constructor(arg) if log else None
105        subcommands.append(subcommand(function, args, subdir))
106    return parallel(subcommands, timeout, return_results=return_results)
107
108
109class subcommand(object):
110    fork_hooks, join_hooks = [], []
111
112    def __init__(self, func, args, subdir = None):
113        # func(args) - the subcommand to run
114        # subdir     - the subdirectory to log results in
115        if subdir:
116            self.subdir = os.path.abspath(subdir)
117            if not os.path.exists(self.subdir):
118                os.mkdir(self.subdir)
119            self.debug = os.path.join(self.subdir, 'debug')
120            if not os.path.exists(self.debug):
121                os.mkdir(self.debug)
122        else:
123            self.subdir = None
124            self.debug = None
125
126        self.func = func
127        self.args = args
128        self.pid = None
129        self.returncode = None
130
131
132    def __str__(self):
133        return str('subcommand(func=%s,  args=%s, subdir=%s)' %
134                   (self.func, self.args, self.subdir))
135
136
137    @classmethod
138    def register_fork_hook(cls, hook):
139        """ Register a function to be called from the child process after
140        forking. """
141        cls.fork_hooks.append(hook)
142
143
144    @classmethod
145    def register_join_hook(cls, hook):
146        """ Register a function to be called when from the child process
147        just before the child process terminates (joins to the parent). """
148        cls.join_hooks.append(hook)
149
150
151    def redirect_output(self):
152        if self.subdir and logging_manager_object:
153            tag = os.path.basename(self.subdir)
154            logging_manager_object.tee_redirect_debug_dir(self.debug, tag=tag)
155
156
157    def fork_start(self):
158        sys.stdout.flush()
159        sys.stderr.flush()
160        r, w = os.pipe()
161        self.returncode = None
162        self.pid = os.fork()
163
164        if self.pid:                            # I am the parent
165            os.close(w)
166            self.result_pickle = os.fdopen(r, 'r')
167            return
168        else:
169            os.close(r)
170
171        # We are the child from this point on. Never return.
172        signal.signal(signal.SIGTERM, signal.SIG_DFL) # clear handler
173        if self.subdir:
174            os.chdir(self.subdir)
175        self.redirect_output()
176
177        try:
178            for hook in self.fork_hooks:
179                hook(self)
180            result = self.func(*self.args)
181            os.write(w, cPickle.dumps(result, cPickle.HIGHEST_PROTOCOL))
182            exit_code = 0
183        except Exception, e:
184            logging.exception('function failed')
185            exit_code = 1
186            os.write(w, cPickle.dumps(e, cPickle.HIGHEST_PROTOCOL))
187
188        os.close(w)
189
190        try:
191            for hook in self.join_hooks:
192                hook(self)
193        finally:
194            sys.stdout.flush()
195            sys.stderr.flush()
196            os._exit(exit_code)
197
198
199    def _handle_exitstatus(self, sts):
200        """
201        This is partially borrowed from subprocess.Popen.
202        """
203        if os.WIFSIGNALED(sts):
204            self.returncode = -os.WTERMSIG(sts)
205        elif os.WIFEXITED(sts):
206            self.returncode = os.WEXITSTATUS(sts)
207        else:
208            # Should never happen
209            raise RuntimeError("Unknown child exit status!")
210
211        if self.returncode != 0:
212            print "subcommand failed pid %d" % self.pid
213            print "%s" % (self.func,)
214            print "rc=%d" % self.returncode
215            print
216            if self.debug:
217                stderr_file = os.path.join(self.debug, 'autoserv.stderr')
218                if os.path.exists(stderr_file):
219                    for line in open(stderr_file).readlines():
220                        print line,
221            print "\n--------------------------------------------\n"
222            raise error.AutoservSubcommandError(self.func, self.returncode)
223
224
225    def poll(self):
226        """
227        This is borrowed from subprocess.Popen.
228        """
229        if self.returncode is None:
230            try:
231                pid, sts = os.waitpid(self.pid, os.WNOHANG)
232                if pid == self.pid:
233                    self._handle_exitstatus(sts)
234            except os.error:
235                pass
236        return self.returncode
237
238
239    def wait(self):
240        """
241        This is borrowed from subprocess.Popen.
242        """
243        if self.returncode is None:
244            pid, sts = os.waitpid(self.pid, 0)
245            self._handle_exitstatus(sts)
246        return self.returncode
247
248
249    def fork_waitfor(self, timeout=None):
250        if not timeout:
251            return self.wait()
252        else:
253            _, result = retry.timeout(self.wait, timeout_sec=timeout)
254
255            if result is None:
256                utils.nuke_pid(self.pid)
257                print "subcommand failed pid %d" % self.pid
258                print "%s" % (self.func,)
259                print "timeout after %ds" % timeout
260                print
261                result = self.wait()
262
263            return result
264