1#!/usr/bin/python
2#
3# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
4# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
6
7"""Tests for the drone managers thread queue."""
8
9import cPickle
10import logging
11import Queue
12import unittest
13
14import common
15from autotest_lib.client.common_lib import utils
16from autotest_lib.client.common_lib.test_utils import mock, unittest
17from autotest_lib.scheduler import drone_task_queue
18from autotest_lib.scheduler import drones
19from autotest_lib.scheduler import thread_lib
20from autotest_lib.server.hosts import ssh_host
21
22
23class DroneThreadLibTest(unittest.TestCase):
24    """Threaded task queue drone library tests."""
25
26    def create_remote_drone(self, hostname):
27        """Create and initialize a Remote Drone.
28
29        @param hostname: The name of the host for the remote drone.
30
31        @return: A remote drone instance.
32        """
33        drones.drone_utility.create_host.expect_call(hostname).and_return(
34                self._mock_host)
35        self._mock_host.is_up.expect_call().and_return(True)
36        return drones._RemoteDrone(hostname, timestamp_remote_calls=False)
37
38
39    def setUp(self):
40        self.god = mock.mock_god()
41        self._mock_host = self.god.create_mock_class(ssh_host.SSHHost,
42                                                     'mock SSHHost')
43        self.god.stub_function(drones.drone_utility, 'create_host')
44        self.drone_utility_path = 'mock-drone-utility-path'
45        self.mock_return = {'results': ['mock results'],
46                            'warnings': []}
47        self.god.stub_with(drones._RemoteDrone, '_drone_utility_path',
48                self.drone_utility_path)
49
50
51    def tearDown(self):
52        self.god.unstub_all()
53
54
55    def test_worker(self):
56        """Test the worker method of a ThreadedTaskQueue."""
57        # Invoke the worker method with a drone that has a queued call and check
58        # that the drones host.run method is invoked for the call, and the
59        # results queue contains the expected results.
60        drone = self.create_remote_drone('fakehostname')
61        task_queue = thread_lib.ThreadedTaskQueue()
62
63        drone.queue_call('foo')
64        mock_result = utils.CmdResult(stdout=cPickle.dumps(self.mock_return))
65        self._mock_host.run.expect_call(
66                'python %s' % self.drone_utility_path,
67                stdin=cPickle.dumps(drone.get_calls()), stdout_tee=None,
68                connect_timeout=mock.is_instance_comparator(int)).and_return(
69                        mock_result)
70        task_queue.worker(drone, task_queue.results_queue)
71        result = task_queue.results_queue.get()
72
73        self.assertTrue(task_queue.results_queue.empty() and
74                        result.drone == drone and
75                        result.results == self.mock_return['results'])
76        self.god.check_playback()
77
78
79    def test_wait_on_drones(self):
80        """Test waiting on drone threads."""
81
82        def waiting_func(queue):
83            while len(queue.queue) < 2:
84                continue
85            logging.warning('Consuming thread finished.')
86            queue.put(3)
87
88        def exception_func(queue):
89            while queue.empty():
90                continue
91            queue.put(2)
92            logging.warning('Failing thread raising error.')
93            raise ValueError('Value error')
94
95        def quick_func():
96            return
97
98        # Create 2 threads, one of which raises an exception while the other
99        # just exits normally. Insert both threads into the thread_queue against
100        # mock drones and confirm that:
101        # a. The task queue waits for both threads, though the first one fails.
102        # b. The task queue records the right DroneTaskQueueException, which
103        #       contains the original exception.
104        # c. The failing thread records its own exception instead of raising it.
105        task_queue = thread_lib.ThreadedTaskQueue()
106        drone1 = self.create_remote_drone('fakehostname1')
107        drone2 = self.create_remote_drone('fakehostname2')
108        sync_queue = Queue.Queue()
109
110        waiting_worker = thread_lib.ExceptionRememberingThread(
111                target=waiting_func, args=(sync_queue,))
112        failing_worker = thread_lib.ExceptionRememberingThread(
113                target=exception_func, args=(sync_queue,))
114        task_queue.drone_threads[drone1] = waiting_worker
115        task_queue.drone_threads[drone2] = failing_worker
116        master_thread = thread_lib.ExceptionRememberingThread(
117                target=task_queue.wait_on_drones)
118
119        thread_list = [failing_worker, waiting_worker, master_thread]
120        for thread in thread_list:
121            thread.setDaemon(True)
122            thread.start()
123        sync_queue.put(1)
124        master_thread.join()
125
126        self.assertTrue(isinstance(master_thread.err,
127                                   drone_task_queue.DroneTaskQueueException))
128        self.assertTrue(isinstance(failing_worker.err, ValueError))
129        self.assertTrue(str(failing_worker.err) in str(master_thread.err))
130        self.assertTrue(3 in list(sync_queue.queue))
131        self.assertTrue(task_queue.drone_threads == {})
132
133        # Call wait_on_drones after the child thread has exited.
134        quick_worker = thread_lib.ExceptionRememberingThread(target=quick_func)
135        task_queue.drone_threads[drone1] = quick_worker
136        quick_worker.start()
137        while quick_worker.isAlive():
138            continue
139        task_queue.wait_on_drones()
140        self.assertTrue(task_queue.drone_threads == {})
141
142
143    def test_get_results(self):
144        """Test retrieving results from the results queue."""
145
146        # Insert results for the same drone twice into the results queue
147        # and confirm that an exception is raised.
148        task_queue = thread_lib.ThreadedTaskQueue()
149        drone1 = self.create_remote_drone('fakehostname1')
150        drone2 = self.create_remote_drone('fakehostname2')
151        task_queue.results_queue.put(
152                thread_lib.ThreadedTaskQueue.result(drone1, self.mock_return))
153        task_queue.results_queue.put(
154                thread_lib.ThreadedTaskQueue.result(drone1, self.mock_return))
155        self.god.stub_function(task_queue, 'wait_on_drones')
156        task_queue.wait_on_drones.expect_call()
157        self.assertRaises(drone_task_queue.DroneTaskQueueException,
158                          task_queue.get_results)
159
160        # Insert results for different drones and check that they're returned
161        # in a drone results dict.
162        self.assertTrue(task_queue.results_queue.empty())
163        task_queue.results_queue.put(
164                thread_lib.ThreadedTaskQueue.result(drone1, self.mock_return))
165        task_queue.results_queue.put(
166                thread_lib.ThreadedTaskQueue.result(drone2, self.mock_return))
167        task_queue.wait_on_drones.expect_call()
168        results = task_queue.get_results()
169        self.assertTrue(results[drone1] == self.mock_return and
170                        results[drone2] == self.mock_return)
171        self.god.check_playback()
172
173
174    def test_execute(self):
175        """Test task queue execute."""
176        drone1 = self.create_remote_drone('fakehostname1')
177        drone2 = self.create_remote_drone('fakehostname2')
178        drone3 = self.create_remote_drone('fakehostname3')
179
180        # Check task queue exception conditions.
181        task_queue = thread_lib.ThreadedTaskQueue()
182        task_queue.results_queue.put(1)
183        self.assertRaises(drone_task_queue.DroneTaskQueueException,
184                          task_queue.execute, [])
185        task_queue.results_queue.get()
186        task_queue.drone_threads[drone1] = None
187        self.assertRaises(drone_task_queue.DroneTaskQueueException,
188                          task_queue.execute, [])
189        task_queue.drone_threads = {}
190
191        # Queue 2 calls against each drone, and confirm that the host's
192        # run method is called 3 times. Then check the threads created,
193        # and finally compare results returned by the task queue against
194        # the mock results.
195        drones = [drone1, drone2, drone3]
196        for drone in drones:
197            drone.queue_call('foo')
198            drone.queue_call('bar')
199            mock_result = utils.CmdResult(
200                    stdout=cPickle.dumps(self.mock_return))
201            self._mock_host.run.expect_call(
202                    'python %s' % self.drone_utility_path,
203                    stdin=cPickle.dumps(drone.get_calls()), stdout_tee=None,
204                    connect_timeout=mock.is_instance_comparator(int)
205                    ).and_return(mock_result)
206        task_queue.execute(drones, wait=False)
207        self.assertTrue(set(task_queue.drone_threads.keys()) == set(drones))
208        for drone, thread in task_queue.drone_threads.iteritems():
209            self.assertTrue(drone.hostname in thread.getName())
210            self.assertTrue(thread.isDaemon())
211            self.assertRaises(RuntimeError, thread.start)
212        results = task_queue.get_results()
213        for drone, result in results.iteritems():
214            self.assertTrue(result == self.mock_return['results'])
215
216        # Test synchronous execute
217        drone1.queue_call('foo')
218        mock_result = utils.CmdResult(stdout=cPickle.dumps(self.mock_return))
219        self._mock_host.run.expect_call(
220                'python %s' % self.drone_utility_path,
221                stdin=cPickle.dumps(drone1.get_calls()), stdout_tee=None,
222                connect_timeout=mock.is_instance_comparator(int)).and_return(
223                        mock_result)
224        self.assertTrue(task_queue.execute(drones, wait=True)[drone1] ==
225                        self.mock_return['results'])
226        self.god.check_playback()
227
228
229if __name__ == '__main__':
230    unittest.main()
231