1# Copyright 2016 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""Tester feedback request multiplexer."""
6
7from multiprocessing import reduction
8import Queue
9import collections
10import multiprocessing
11import os
12import sys
13
14import common
15from autotest_lib.client.common_lib.feedback import tester_feedback_client
16
17import input_handlers
18import request
19import sequenced_request
20
21
22ReqTuple = collections.namedtuple(
23        'ReqTuple', ('obj', 'reduced_reply_pipe', 'query_num', 'atomic'))
24
25
26class FeedbackRequestMultiplexer(object):
27    """A feedback request multiplexer."""
28
29    class RequestProcessingTerminated(Exception):
30        """User internally to signal processor termination."""
31
32
33    def __init__(self):
34        self._request_queue = multiprocessing.Queue()
35        self._pending = []
36        self._request_handling_process = None
37        self._running = False
38        self._atomic_seq = None
39
40
41    def _dequeue_request(self, block=False):
42        try:
43            req_tuple = self._request_queue.get(block=block)
44        except Queue.Empty:
45            return False
46
47        if req_tuple is None:
48            raise self.RequestProcessingTerminated
49        self._pending.append(req_tuple)
50        return True
51
52
53    def _atomic_seq_cont(self):
54        """Returns index of next pending request in atomic sequence, if any."""
55        for req_idx, req_tuple in enumerate(self._pending):
56            if req_tuple.query_num == self._atomic_seq:
57                return req_idx
58
59
60    def _handle_requests(self, stdin):
61        """Processes feedback requests until termination is signaled.
62
63        This method is run in a separate process and needs to override stdin in
64        order for raw_input() to work.
65        """
66        sys.stdin = stdin
67        try:
68            while True:
69                req_idx = None
70
71                # Wait for a (suitable) request to become available.
72                while True:
73                    if self._atomic_seq is None:
74                        if self._pending:
75                            break
76                    else:
77                        req_idx = self._atomic_seq_cont()
78                        if req_idx is not None:
79                            break
80                    self._dequeue_request(block=True)
81
82                # If no request was pre-selected, prompt the user to choose one.
83                if req_idx is None:
84                    raw_input('Pending feedback requests, hit Enter to '
85                              'process... ')
86
87                    # Pull all remaining queued requests.
88                    while self._dequeue_request():
89                        pass
90
91                    # Select the request to process.
92                    if len(self._pending) == 1:
93                        print('Processing: %s' %
94                              self._pending[0].obj.get_title())
95                        req_idx = 0
96                    else:
97                        choose_req = sequenced_request.SequencedFeedbackRequest(
98                                None, None, None)
99                        choose_req.append_question(
100                                'List of pending feedback requests:',
101                                input_handlers.MultipleChoiceInputHandler(
102                                        [req_tuple.obj.get_title()
103                                         for req_tuple in self._pending],
104                                        default=1),
105                                prompt='Choose a request to process')
106                        req_idx, _ = choose_req.execute()
107
108                # Pop and handle selected request, then close pipe.
109                req_tuple = self._pending.pop(req_idx)
110                if req_tuple.obj is not None:
111                    try:
112                        ret = req_tuple.obj.execute()
113                    except request.FeedbackRequestError as e:
114                        ret = (tester_feedback_client.QUERY_RET_ERROR, str(e))
115                    reply_pipe = req_tuple.reduced_reply_pipe[0](
116                            *req_tuple.reduced_reply_pipe[1])
117                    reply_pipe.send(ret)
118                    reply_pipe.close()
119
120                # Set the atomic sequence if so instructed.
121                self._atomic_seq = (req_tuple.query_num if req_tuple.atomic
122                                    else None)
123
124        except self.RequestProcessingTerminated:
125            pass
126
127
128    def start(self):
129        """Starts the request multiplexer."""
130        if self._running:
131            return
132
133        dup_stdin = os.fdopen(os.dup(sys.stdin.fileno()))
134        self._request_handling_process = multiprocessing.Process(
135                target=self._handle_requests, args=(dup_stdin,))
136        self._request_handling_process.start()
137
138        self._running = True
139
140
141    def stop(self):
142        """Stops the request multiplexer."""
143        if not self._running:
144            return
145
146        # Tell the request handler to quit.
147        self._request_queue.put(None)
148        self._request_handling_process.join()
149
150        self._running = False
151
152
153    def process_request(self, request, query_num, atomic):
154        """Processes a feedback requests and returns its result.
155
156        This call is used by queries for submitting individual requests. It is
157        a blocking call that should be called from a separate execution thread.
158
159        @param request: The feedback request to process.
160        @param query_num: The unique query number.
161        @param atomic: Whether subsequent request(s) are expected and should be
162                       processed without interruption.
163        """
164        reply_pipe_send, reply_pipe_recv = multiprocessing.Pipe()
165        reduced_reply_pipe_send = reduction.reduce_connection(reply_pipe_send)
166        self._request_queue.put(ReqTuple(request, reduced_reply_pipe_send,
167                                         query_num, atomic))
168        return reply_pipe_recv.recv()
169
170
171    def end_atomic_seq(self, query_num):
172        """Ends the current atomic sequence.
173
174        This enqueues a null request with the given query_num and atomicity set
175        to False, causing the multiplexer to terminate the atomic sequence.
176
177        @param query_num: The unique query number.
178        """
179        self._request_queue.put(ReqTuple(None, None, query_num, False))
180