1#!/usr/bin/python2
2# Copyright 2016 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6import __builtin__
7import Queue
8import json
9import logging
10import os
11import shutil
12import signal
13import stat
14import subprocess
15import sys
16import tarfile
17import tempfile
18import time
19import unittest
20
21import mock
22import mox
23
24import common
25from autotest_lib.client.common_lib import global_config
26from autotest_lib.client.common_lib import utils
27#For unittest without cloud_client.proto compiled.
28try:
29    from autotest_lib.site_utils import cloud_console_client
30except ImportError:
31    cloud_console_client = None
32from autotest_lib.site_utils import gs_offloader
33from autotest_lib.site_utils import job_directories
34from autotest_lib.site_utils import job_directories_unittest as jd_test
35from autotest_lib.tko import models
36from autotest_lib.utils import gslib
37from autotest_lib.site_utils import pubsub_utils
38from chromite.lib import timeout_util
39
40# Test value to use for `days_old`, if nothing else is required.
41_TEST_EXPIRATION_AGE = 7
42
43
44def _get_options(argv):
45    """Helper function to exercise command line parsing.
46
47    @param argv Value of sys.argv to be parsed.
48
49    """
50    sys.argv = ['bogus.py'] + argv
51    return gs_offloader.parse_options()
52
53
54def is_fifo(path):
55  """Determines whether a path is a fifo.
56
57  @param path: fifo path string.
58  """
59  return stat.S_ISFIFO(os.lstat(path).st_mode)
60
61
62def _get_fake_process():
63  return FakeProcess()
64
65
66class FakeProcess(object):
67    """Fake process object."""
68
69    def __init__(self):
70        self.returncode = 0
71
72
73    def wait(self):
74        return True
75
76
77class OffloaderOptionsTests(mox.MoxTestBase):
78    """Tests for the `Offloader` constructor.
79
80    Tests that offloader instance fields are set as expected
81    for given command line options.
82
83    """
84
85    _REGULAR_ONLY = {job_directories.SwarmingJobDirectory,
86                     job_directories.RegularJobDirectory}
87    _SPECIAL_ONLY = {job_directories.SwarmingJobDirectory,
88                     job_directories.SpecialJobDirectory}
89    _BOTH = _REGULAR_ONLY | _SPECIAL_ONLY
90
91
92    def setUp(self):
93        super(OffloaderOptionsTests, self).setUp()
94        self.mox.StubOutWithMock(utils, 'get_offload_gsuri')
95        gs_offloader.GS_OFFLOADING_ENABLED = True
96        gs_offloader.GS_OFFLOADER_MULTIPROCESSING = False
97
98
99    def _mock_get_sub_offloader(self, is_moblab, multiprocessing=False,
100                               console_client=None, delete_age=0):
101        """Mock the process of getting the offload_dir function."""
102        if is_moblab:
103            expected_gsuri = '%sresults/%s/%s/' % (
104                    global_config.global_config.get_config_value(
105                            'CROS', 'image_storage_server'),
106                    'Fa:ke:ma:c0:12:34', 'rand0m-uu1d')
107        else:
108            expected_gsuri = utils.DEFAULT_OFFLOAD_GSURI
109        utils.get_offload_gsuri().AndReturn(expected_gsuri)
110        sub_offloader = gs_offloader.GSOffloader(expected_gsuri,
111            multiprocessing, delete_age, console_client)
112        self.mox.StubOutWithMock(gs_offloader, 'GSOffloader')
113        if cloud_console_client:
114            self.mox.StubOutWithMock(cloud_console_client,
115                    'is_cloud_notification_enabled')
116        if console_client:
117            cloud_console_client.is_cloud_notification_enabled().AndReturn(True)
118            gs_offloader.GSOffloader(
119                    expected_gsuri, multiprocessing, delete_age,
120                    mox.IsA(cloud_console_client.PubSubBasedClient)).AndReturn(
121                        sub_offloader)
122        else:
123            if cloud_console_client:
124                cloud_console_client.is_cloud_notification_enabled().AndReturn(
125                        False)
126            gs_offloader.GSOffloader(
127                expected_gsuri, multiprocessing, delete_age, None).AndReturn(
128                    sub_offloader)
129        self.mox.ReplayAll()
130        return sub_offloader
131
132
133    def test_process_no_options(self):
134        """Test default offloader options."""
135        sub_offloader = self._mock_get_sub_offloader(False)
136        offloader = gs_offloader.Offloader(_get_options([]))
137        self.assertEqual(set(offloader._jobdir_classes),
138                         self._REGULAR_ONLY)
139        self.assertEqual(offloader._processes, 1)
140        self.assertEqual(offloader._gs_offloader,
141                         sub_offloader)
142        self.assertEqual(offloader._upload_age_limit, 0)
143        self.assertEqual(offloader._delete_age_limit, 0)
144
145
146    def test_process_all_option(self):
147        """Test offloader handling for the --all option."""
148        sub_offloader = self._mock_get_sub_offloader(False)
149        offloader = gs_offloader.Offloader(_get_options(['--all']))
150        self.assertEqual(set(offloader._jobdir_classes), self._BOTH)
151        self.assertEqual(offloader._processes, 1)
152        self.assertEqual(offloader._gs_offloader,
153                         sub_offloader)
154        self.assertEqual(offloader._upload_age_limit, 0)
155        self.assertEqual(offloader._delete_age_limit, 0)
156
157
158    def test_process_hosts_option(self):
159        """Test offloader handling for the --hosts option."""
160        sub_offloader = self._mock_get_sub_offloader(False)
161        offloader = gs_offloader.Offloader(
162                _get_options(['--hosts']))
163        self.assertEqual(set(offloader._jobdir_classes),
164                         self._SPECIAL_ONLY)
165        self.assertEqual(offloader._processes, 1)
166        self.assertEqual(offloader._gs_offloader,
167                         sub_offloader)
168        self.assertEqual(offloader._upload_age_limit, 0)
169        self.assertEqual(offloader._delete_age_limit, 0)
170
171
172    def test_parallelism_option(self):
173        """Test offloader handling for the --parallelism option."""
174        sub_offloader = self._mock_get_sub_offloader(False)
175        offloader = gs_offloader.Offloader(
176                _get_options(['--parallelism', '2']))
177        self.assertEqual(set(offloader._jobdir_classes),
178                         self._REGULAR_ONLY)
179        self.assertEqual(offloader._processes, 2)
180        self.assertEqual(offloader._gs_offloader,
181                         sub_offloader)
182        self.assertEqual(offloader._upload_age_limit, 0)
183        self.assertEqual(offloader._delete_age_limit, 0)
184
185
186    def test_delete_only_option(self):
187        """Test offloader handling for the --delete_only option."""
188        offloader = gs_offloader.Offloader(
189                _get_options(['--delete_only']))
190        self.assertEqual(set(offloader._jobdir_classes),
191                         self._REGULAR_ONLY)
192        self.assertEqual(offloader._processes, 1)
193        self.assertIsInstance(offloader._gs_offloader,
194                              gs_offloader.FakeGSOffloader)
195        self.assertEqual(offloader._upload_age_limit, 0)
196        self.assertEqual(offloader._delete_age_limit, 0)
197
198
199    def test_days_old_option(self):
200        """Test offloader handling for the --days_old option."""
201        sub_offloader = self._mock_get_sub_offloader(False, delete_age=7)
202        offloader = gs_offloader.Offloader(
203                _get_options(['--days_old', '7']))
204        self.assertEqual(set(offloader._jobdir_classes),
205                         self._REGULAR_ONLY)
206        self.assertEqual(offloader._processes, 1)
207        self.assertEqual(offloader._gs_offloader,
208                         sub_offloader)
209        self.assertEqual(offloader._upload_age_limit, 7)
210        self.assertEqual(offloader._delete_age_limit, 7)
211
212
213    def test_moblab_gsuri_generation(self):
214        """Test offloader construction for Moblab."""
215        sub_offloader = self._mock_get_sub_offloader(True)
216        offloader = gs_offloader.Offloader(_get_options([]))
217        self.assertEqual(set(offloader._jobdir_classes),
218                         self._REGULAR_ONLY)
219        self.assertEqual(offloader._processes, 1)
220        self.assertEqual(offloader._gs_offloader,
221                         sub_offloader)
222        self.assertEqual(offloader._upload_age_limit, 0)
223        self.assertEqual(offloader._delete_age_limit, 0)
224
225
226    def test_globalconfig_offloading_flag(self):
227        """Test enabling of --delete_only via global_config."""
228        gs_offloader.GS_OFFLOADING_ENABLED = False
229        offloader = gs_offloader.Offloader(
230                _get_options([]))
231        self.assertIsInstance(offloader._gs_offloader,
232                             gs_offloader.FakeGSOffloader)
233
234    def test_offloader_multiprocessing_flag_set(self):
235        """Test multiprocessing is set."""
236        sub_offloader = self._mock_get_sub_offloader(True, True)
237        offloader = gs_offloader.Offloader(_get_options(['-m']))
238        self.assertEqual(offloader._gs_offloader,
239                         sub_offloader)
240        self.mox.VerifyAll()
241
242    def test_offloader_multiprocessing_flag_not_set_default_false(self):
243        """Test multiprocessing is set."""
244        gs_offloader.GS_OFFLOADER_MULTIPROCESSING = False
245        sub_offloader = self._mock_get_sub_offloader(True, False)
246        offloader = gs_offloader.Offloader(_get_options([]))
247        self.assertEqual(offloader._gs_offloader,
248                         sub_offloader)
249        self.mox.VerifyAll()
250
251    def test_offloader_multiprocessing_flag_not_set_default_true(self):
252        """Test multiprocessing is set."""
253        gs_offloader.GS_OFFLOADER_MULTIPROCESSING = True
254        sub_offloader = self._mock_get_sub_offloader(True, True)
255        offloader = gs_offloader.Offloader(_get_options([]))
256        self.assertEqual(offloader._gs_offloader,
257                         sub_offloader)
258        self.mox.VerifyAll()
259
260
261    def test_offloader_pubsub_enabled(self):
262        """Test multiprocessing is set."""
263        if not cloud_console_client:
264            return
265        self.mox.StubOutWithMock(pubsub_utils, "PubSubClient")
266        sub_offloader = self._mock_get_sub_offloader(True, False,
267                cloud_console_client.PubSubBasedClient())
268        offloader = gs_offloader.Offloader(_get_options([]))
269        self.assertEqual(offloader._gs_offloader,
270                         sub_offloader)
271        self.mox.VerifyAll()
272
273
274class _MockJobDirectory(job_directories._JobDirectory):
275    """Subclass of `_JobDirectory` used as a helper for tests."""
276
277    GLOB_PATTERN = '[0-9]*-*'
278
279
280    def __init__(self, resultsdir):
281        """Create new job in initial state."""
282        super(_MockJobDirectory, self).__init__(resultsdir)
283        self._timestamp = None
284        self.queue_args = [resultsdir, os.path.dirname(resultsdir), self._timestamp]
285
286
287    def get_timestamp_if_finished(self):
288        return self._timestamp
289
290
291    def set_finished(self, days_old):
292        """Make this job appear to be finished.
293
294        After calling this function, calls to `enqueue_offload()`
295        will find this job as finished, but not expired and ready
296        for offload.  Note that when `days_old` is 0,
297        `enqueue_offload()` will treat a finished job as eligible
298        for offload.
299
300        @param days_old The value of the `days_old` parameter that
301                        will be passed to `enqueue_offload()` for
302                        testing.
303
304        """
305        self._timestamp = jd_test.make_timestamp(days_old, False)
306        self.queue_args[2] = self._timestamp
307
308
309    def set_expired(self, days_old):
310        """Make this job eligible to be offloaded.
311
312        After calling this function, calls to `offload` will attempt
313        to offload this job.
314
315        @param days_old The value of the `days_old` parameter that
316                        will be passed to `enqueue_offload()` for
317                        testing.
318
319        """
320        self._timestamp = jd_test.make_timestamp(days_old, True)
321        self.queue_args[2] = self._timestamp
322
323
324    def set_incomplete(self):
325        """Make this job appear to have failed offload just once."""
326        self.offload_count += 1
327        self.first_offload_start = time.time()
328        if not os.path.isdir(self.dirname):
329            os.mkdir(self.dirname)
330
331
332    def set_reportable(self):
333        """Make this job be reportable."""
334        self.set_incomplete()
335        self.offload_count += 1
336
337
338    def set_complete(self):
339        """Make this job be completed."""
340        self.offload_count += 1
341        if os.path.isdir(self.dirname):
342            os.rmdir(self.dirname)
343
344
345    def process_gs_instructions(self):
346        """Always still offload the job directory."""
347        return True
348
349
350class CommandListTests(unittest.TestCase):
351    """Tests for `_get_cmd_list()`."""
352
353    def _command_list_assertions(self, job, use_rsync=True, multi=False):
354        """Call `_get_cmd_list()` and check the return value.
355
356        Check the following assertions:
357          * The command name (argv[0]) is 'gsutil'.
358          * '-m' option (argv[1]) is on when the argument, multi, is True.
359          * The arguments contain the 'cp' subcommand.
360          * The next-to-last argument (the source directory) is the
361            job's `queue_args[0]`.
362          * The last argument (the destination URL) is the job's
363            'queue_args[1]'.
364
365        @param job A job with properly calculated arguments to
366                   `_get_cmd_list()`
367        @param use_rsync True when using 'rsync'. False when using 'cp'.
368        @param multi True when using '-m' option for gsutil.
369
370        """
371        test_bucket_uri = 'gs://a-test-bucket'
372
373        gs_offloader.USE_RSYNC_ENABLED = use_rsync
374
375        gs_path = os.path.join(test_bucket_uri, job.queue_args[1])
376
377        command = gs_offloader._get_cmd_list(
378                multi, job.queue_args[0], gs_path)
379
380        self.assertEqual(command[0], 'gsutil')
381        if multi:
382            self.assertEqual(command[1], '-m')
383        self.assertEqual(command[-2], job.queue_args[0])
384
385        if use_rsync:
386            self.assertTrue('rsync' in command)
387            self.assertEqual(command[-1],
388                             os.path.join(test_bucket_uri, job.queue_args[0]))
389        else:
390            self.assertTrue('cp' in command)
391            self.assertEqual(command[-1],
392                             os.path.join(test_bucket_uri, job.queue_args[1]))
393
394        finish_command = gs_offloader._get_finish_cmd_list(gs_path)
395        self.assertEqual(finish_command[0], 'gsutil')
396        self.assertEqual(finish_command[1], 'cp')
397        self.assertEqual(finish_command[2], '/dev/null')
398        self.assertEqual(finish_command[3],
399                         os.path.join(gs_path, '.finished_offload'))
400
401
402    def test__get_cmd_list_regular(self):
403        """Test `_get_cmd_list()` as for a regular job."""
404        job = _MockJobDirectory('118-debug')
405        self._command_list_assertions(job)
406
407
408    def test__get_cmd_list_special(self):
409        """Test `_get_cmd_list()` as for a special job."""
410        job = _MockJobDirectory('hosts/host1/118-reset')
411        self._command_list_assertions(job)
412
413
414    def test_get_cmd_list_regular_no_rsync(self):
415        """Test `_get_cmd_list()` as for a regular job."""
416        job = _MockJobDirectory('118-debug')
417        self._command_list_assertions(job, use_rsync=False)
418
419
420    def test_get_cmd_list_special_no_rsync(self):
421        """Test `_get_cmd_list()` as for a special job."""
422        job = _MockJobDirectory('hosts/host1/118-reset')
423        self._command_list_assertions(job, use_rsync=False)
424
425
426    def test_get_cmd_list_regular_multi(self):
427        """Test `_get_cmd_list()` as for a regular job with True multi."""
428        job = _MockJobDirectory('118-debug')
429        self._command_list_assertions(job, multi=True)
430
431
432    def test__get_cmd_list_special_multi(self):
433        """Test `_get_cmd_list()` as for a special job with True multi."""
434        job = _MockJobDirectory('hosts/host1/118-reset')
435        self._command_list_assertions(job, multi=True)
436
437
438class _TempResultsDirTestCase(unittest.TestCase):
439    """Mixin class for tests using a temporary results directory."""
440
441    REGULAR_JOBLIST = [
442        '111-fubar', '112-fubar', '113-fubar', '114-snafu']
443    HOST_LIST = ['host1', 'host2', 'host3']
444    SPECIAL_JOBLIST = [
445        'hosts/host1/333-reset', 'hosts/host1/334-reset',
446        'hosts/host2/444-reset', 'hosts/host3/555-reset']
447
448
449    def setUp(self):
450        super(_TempResultsDirTestCase, self).setUp()
451        self._resultsroot = tempfile.mkdtemp()
452        self._cwd = os.getcwd()
453        os.chdir(self._resultsroot)
454
455
456    def tearDown(self):
457        os.chdir(self._cwd)
458        shutil.rmtree(self._resultsroot)
459        super(_TempResultsDirTestCase, self).tearDown()
460
461
462    def make_job(self, jobdir):
463        """Create a job with results in `self._resultsroot`.
464
465        @param jobdir Name of the subdirectory to be created in
466                      `self._resultsroot`.
467
468        """
469        os.makedirs(jobdir)
470        return _MockJobDirectory(jobdir)
471
472
473    def make_job_hierarchy(self):
474        """Create a sample hierarchy of job directories.
475
476        `self.REGULAR_JOBLIST` is a list of directories for regular
477        jobs to be created; `self.SPECIAL_JOBLIST` is a list of
478        directories for special jobs to be created.
479
480        """
481        for d in self.REGULAR_JOBLIST:
482            os.mkdir(d)
483        hostsdir = 'hosts'
484        os.mkdir(hostsdir)
485        for host in self.HOST_LIST:
486            os.mkdir(os.path.join(hostsdir, host))
487        for d in self.SPECIAL_JOBLIST:
488            os.mkdir(d)
489
490
491class _TempResultsDirTestBase(_TempResultsDirTestCase, mox.MoxTestBase):
492    """Base Mox test class for tests using a temporary results directory."""
493
494
495class FailedOffloadsLogTest(_TempResultsDirTestBase):
496    """Test the formatting of failed offloads log file."""
497    # Below is partial sample of a failed offload log file.  This text is
498    # deliberately hard-coded and then parsed to create the test data; the idea
499    # is to make sure the actual text format will be reviewed by a human being.
500    #
501    # first offload      count  directory
502    # --+----1----+----  ----+ ----+----1----+----2----+----3
503    _SAMPLE_DIRECTORIES_REPORT = '''\
504    =================== ======  ==============================
505    2014-03-14 15:09:26      1  118-fubar
506    2014-03-14 15:19:23      2  117-fubar
507    2014-03-14 15:29:20      6  116-fubar
508    2014-03-14 15:39:17     24  115-fubar
509    2014-03-14 15:49:14    120  114-fubar
510    2014-03-14 15:59:11    720  113-fubar
511    2014-03-14 16:09:08   5040  112-fubar
512    2014-03-14 16:19:05  40320  111-fubar
513    '''
514
515    def setUp(self):
516        super(FailedOffloadsLogTest, self).setUp()
517        self._offloader = gs_offloader.Offloader(_get_options([]))
518        self._joblist = []
519        for line in self._SAMPLE_DIRECTORIES_REPORT.split('\n')[1 : -1]:
520            date_, time_, count, dir_ = line.split()
521            job = _MockJobDirectory(dir_)
522            job.offload_count = int(count)
523            timestruct = time.strptime("%s %s" % (date_, time_),
524                                       gs_offloader.FAILED_OFFLOADS_TIME_FORMAT)
525            job.first_offload_start = time.mktime(timestruct)
526            # enter the jobs in reverse order, to make sure we
527            # test that the output will be sorted.
528            self._joblist.insert(0, job)
529
530
531    def assert_report_well_formatted(self, report_file):
532        """Assert that report file is well formatted.
533
534        @param report_file: Path to report file
535        """
536        with open(report_file, 'r') as f:
537            report_lines = f.read().split()
538
539        for end_of_header_index in range(len(report_lines)):
540            if report_lines[end_of_header_index].startswith('=='):
541                break
542        self.assertLess(end_of_header_index, len(report_lines),
543                        'Failed to find end-of-header marker in the report')
544
545        relevant_lines = report_lines[end_of_header_index:]
546        expected_lines = self._SAMPLE_DIRECTORIES_REPORT.split()
547        self.assertListEqual(relevant_lines, expected_lines)
548
549
550    def test_failed_offload_log_format(self):
551        """Trigger an e-mail report and check its contents."""
552        log_file = os.path.join(self._resultsroot, 'failed_log')
553        report = self._offloader._log_failed_jobs_locally(self._joblist,
554                                                          log_file=log_file)
555        self.assert_report_well_formatted(log_file)
556
557
558    def test_failed_offload_file_overwrite(self):
559        """Verify that we can saefly overwrite the log file."""
560        log_file = os.path.join(self._resultsroot, 'failed_log')
561        with open(log_file, 'w') as f:
562            f.write('boohoohoo')
563        report = self._offloader._log_failed_jobs_locally(self._joblist,
564                                                          log_file=log_file)
565        self.assert_report_well_formatted(log_file)
566
567
568class OffloadDirectoryTests(_TempResultsDirTestBase):
569    """Tests for `offload_dir()`."""
570
571    def setUp(self):
572        super(OffloadDirectoryTests, self).setUp()
573        # offload_dir() logs messages; silence them.
574        self._saved_loglevel = logging.getLogger().getEffectiveLevel()
575        logging.getLogger().setLevel(logging.CRITICAL+1)
576        self._job = self.make_job(self.REGULAR_JOBLIST[0])
577        self.mox.StubOutWithMock(gs_offloader, '_get_cmd_list')
578        alarm = mock.patch('signal.alarm', return_value=0)
579        alarm.start()
580        self.addCleanup(alarm.stop)
581        self.mox.StubOutWithMock(models.test, 'parse_job_keyval')
582        self.should_remove_sarming_req_dir = False
583
584
585    def tearDown(self):
586        logging.getLogger().setLevel(self._saved_loglevel)
587        super(OffloadDirectoryTests, self).tearDown()
588
589    def _mock__upload_cts_testresult(self):
590        self.mox.StubOutWithMock(gs_offloader, '_upload_cts_testresult')
591        gs_offloader._upload_cts_testresult(
592                mox.IgnoreArg(),mox.IgnoreArg()).AndReturn(None)
593
594    def _mock_create_marker_file(self):
595        self.mox.StubOutWithMock(__builtin__, 'open')
596        open(mox.IgnoreArg(), 'a').AndReturn(mock.MagicMock())
597
598
599    def _mock_offload_dir_calls(self, command, queue_args,
600                                marker_initially_exists=False):
601        """Mock out the calls needed by `offload_dir()`.
602
603        This covers only the calls made when there is no timeout.
604
605        @param command Command list to be returned by the mocked
606                       call to `_get_cmd_list()`.
607
608        """
609        self.mox.StubOutWithMock(os.path, 'isfile')
610        os.path.isfile(mox.IgnoreArg()).AndReturn(marker_initially_exists)
611        command.append(queue_args[0])
612        gs_offloader._get_cmd_list(
613                False, queue_args[0],
614                '%s%s' % (utils.DEFAULT_OFFLOAD_GSURI,
615                          queue_args[1])).AndReturn(command)
616        self._mock__upload_cts_testresult()
617
618
619    def _run_offload_dir(self, should_succeed, delete_age):
620        """Make one call to `offload_dir()`.
621
622        The caller ensures all mocks are set up already.
623
624        @param should_succeed True iff the call to `offload_dir()`
625                              is expected to succeed and remove the
626                              offloaded job directory.
627
628        """
629        self.mox.ReplayAll()
630        gs_offloader.GSOffloader(
631                utils.DEFAULT_OFFLOAD_GSURI, False, delete_age).offload(
632                        self._job.queue_args[0],
633                        self._job.queue_args[1],
634                        self._job.queue_args[2])
635        self.mox.VerifyAll()
636        self.assertEqual(not should_succeed,
637                         os.path.isdir(self._job.queue_args[0]))
638        swarming_req_dir = gs_offloader._get_swarming_req_dir(
639                self._job.queue_args[0])
640        if swarming_req_dir:
641            self.assertEqual(not self.should_remove_sarming_req_dir,
642                             os.path.exists(swarming_req_dir))
643
644
645    def test_offload_success(self):
646        """Test that `offload_dir()` can succeed correctly."""
647        self._mock_offload_dir_calls(['test', '-d'],
648                                     self._job.queue_args)
649        os.path.isfile(mox.IgnoreArg()).AndReturn(True)
650        self._mock_create_marker_file()
651        self._run_offload_dir(True, 0)
652
653
654    def test_offload_failure(self):
655        """Test that `offload_dir()` can fail correctly."""
656        self._mock_offload_dir_calls(['test', '!', '-d'],
657                                     self._job.queue_args)
658        self._run_offload_dir(False, 0)
659
660
661    def test_offload_swarming_req_dir_remove(self):
662        """Test that `offload_dir()` can prune the empty swarming task dir."""
663        should_remove = os.path.join('results', 'swarming-123abc0')
664        self._job = self.make_job(os.path.join(should_remove, '1'))
665        self._mock_offload_dir_calls(['test', '-d'],
666                                     self._job.queue_args)
667
668        os.path.isfile(mox.IgnoreArg()).AndReturn(True)
669        self.should_remove_sarming_req_dir = True
670        self._mock_create_marker_file()
671        self._run_offload_dir(True, 0)
672
673
674    def test_offload_swarming_req_dir_exist(self):
675        """Test that `offload_dir()` keeps the non-empty swarming task dir."""
676        should_not_remove = os.path.join('results', 'swarming-456edf0')
677        self._job = self.make_job(os.path.join(should_not_remove, '1'))
678        self.make_job(os.path.join(should_not_remove, '2'))
679        self._mock_offload_dir_calls(['test', '-d'],
680                                     self._job.queue_args)
681
682        os.path.isfile(mox.IgnoreArg()).AndReturn(True)
683        self.should_remove_sarming_req_dir = False
684        self._mock_create_marker_file()
685        self._run_offload_dir(True, 0)
686
687
688    def test_sanitize_dir(self):
689        """Test that folder/file name with invalid character can be corrected.
690        """
691        results_folder = tempfile.mkdtemp()
692        invalid_chars = '_'.join(['[', ']', '*', '?', '#'])
693        invalid_files = []
694        invalid_folder_name = 'invalid_name_folder_%s' % invalid_chars
695        invalid_folder = os.path.join(
696                results_folder,
697                invalid_folder_name)
698        invalid_files.append(os.path.join(
699                invalid_folder,
700                'invalid_name_file_%s' % invalid_chars))
701        good_folder =  os.path.join(results_folder, 'valid_name_folder')
702        good_file = os.path.join(good_folder, 'valid_name_file')
703        for folder in [invalid_folder, good_folder]:
704            os.makedirs(folder)
705        for f in invalid_files + [good_file]:
706            with open(f, 'w'):
707                pass
708        # check that broken symlinks don't break sanitization
709        symlink = os.path.join(invalid_folder, 'broken-link')
710        os.symlink(os.path.join(results_folder, 'no-such-file'),
711                   symlink)
712        fifo1 = os.path.join(results_folder, 'test_fifo1')
713        fifo2 = os.path.join(good_folder, 'test_fifo2')
714        fifo3 = os.path.join(invalid_folder, 'test_fifo3')
715        invalid_fifo4_name = 'test_fifo4_%s' % invalid_chars
716        fifo4 = os.path.join(invalid_folder, invalid_fifo4_name)
717        os.mkfifo(fifo1)
718        os.mkfifo(fifo2)
719        os.mkfifo(fifo3)
720        os.mkfifo(fifo4)
721        gs_offloader.sanitize_dir(results_folder)
722        for _, dirs, files in os.walk(results_folder):
723            for name in dirs + files:
724                self.assertEqual(name, gslib.escape(name))
725                for c in name:
726                    self.assertFalse(c in ['[', ']', '*', '?', '#'])
727        self.assertTrue(os.path.exists(good_file))
728
729        self.assertTrue(os.path.exists(fifo1))
730        self.assertFalse(is_fifo(fifo1))
731        self.assertTrue(os.path.exists(fifo2))
732        self.assertFalse(is_fifo(fifo2))
733        corrected_folder = os.path.join(
734                results_folder, gslib.escape(invalid_folder_name))
735        corrected_fifo3 = os.path.join(
736                corrected_folder,
737                'test_fifo3')
738        self.assertFalse(os.path.exists(fifo3))
739        self.assertTrue(os.path.exists(corrected_fifo3))
740        self.assertFalse(is_fifo(corrected_fifo3))
741        corrected_fifo4 = os.path.join(
742                corrected_folder, gslib.escape(invalid_fifo4_name))
743        self.assertFalse(os.path.exists(fifo4))
744        self.assertTrue(os.path.exists(corrected_fifo4))
745        self.assertFalse(is_fifo(corrected_fifo4))
746
747        corrected_symlink = os.path.join(
748                corrected_folder,
749                'broken-link')
750        self.assertFalse(os.path.lexists(symlink))
751        self.assertTrue(os.path.exists(corrected_symlink))
752        self.assertFalse(os.path.islink(corrected_symlink))
753        shutil.rmtree(results_folder)
754
755
756    def check_limit_file_count(self, is_test_job=True):
757        """Test that folder with too many files can be compressed.
758
759        @param is_test_job: True to check the method with test job result
760                            folder. Set to False for special task folder.
761        """
762        results_folder = tempfile.mkdtemp()
763        host_folder = os.path.join(
764                results_folder,
765                'lab1-host1' if is_test_job else 'hosts/lab1-host1/1-repair')
766        debug_folder = os.path.join(host_folder, 'debug')
767        sysinfo_folder = os.path.join(host_folder, 'sysinfo')
768        for folder in [debug_folder, sysinfo_folder]:
769            os.makedirs(folder)
770            for i in range(10):
771                with open(os.path.join(folder, str(i)), 'w') as f:
772                    f.write('test')
773
774        gs_offloader._MAX_FILE_COUNT = 100
775        gs_offloader.limit_file_count(
776                results_folder if is_test_job else host_folder)
777        self.assertTrue(os.path.exists(sysinfo_folder))
778
779        gs_offloader._MAX_FILE_COUNT = 10
780        gs_offloader.limit_file_count(
781                results_folder if is_test_job else host_folder)
782        self.assertFalse(os.path.exists(sysinfo_folder))
783        self.assertTrue(os.path.exists(sysinfo_folder + '.tgz'))
784        self.assertTrue(os.path.exists(debug_folder))
785
786        shutil.rmtree(results_folder)
787
788
789    def test_limit_file_count(self):
790        """Test that folder with too many files can be compressed.
791        """
792        self.check_limit_file_count(is_test_job=True)
793        self.check_limit_file_count(is_test_job=False)
794
795
796    def test_is_valid_result(self):
797        """Test _is_valid_result."""
798        release_build = 'veyron_minnie-cheets-release/R52-8248.0.0'
799        pfq_build = 'cyan-cheets-android-pfq/R54-8623.0.0-rc1'
800        trybot_build = 'trybot-samus-release/R54-8640.0.0-b5092'
801        trybot_2_build = 'trybot-samus-pfq/R54-8640.0.0-b5092'
802        release_2_build = 'test-trybot-release/R54-8640.0.0-b5092'
803        self.assertTrue(gs_offloader._is_valid_result(
804            release_build, gs_offloader.CTS_RESULT_PATTERN, 'arc-cts'))
805        self.assertTrue(gs_offloader._is_valid_result(
806            release_build, gs_offloader.CTS_RESULT_PATTERN, 'test_that_wrapper'))
807        self.assertTrue(gs_offloader._is_valid_result(
808            release_build, gs_offloader.CTS_RESULT_PATTERN, 'cros_test_platform'))
809        self.assertTrue(gs_offloader._is_valid_result(
810            release_build, gs_offloader.CTS_RESULT_PATTERN, 'bvt-arc'))
811        self.assertTrue(gs_offloader._is_valid_result(
812            release_build, gs_offloader.CTS_RESULT_PATTERN, 'bvt-perbuild'))
813        self.assertFalse(gs_offloader._is_valid_result(
814            release_build, gs_offloader.CTS_RESULT_PATTERN, 'bvt-cq'))
815        self.assertTrue(gs_offloader._is_valid_result(
816            release_build, gs_offloader.CTS_V2_RESULT_PATTERN, 'arc-gts'))
817        self.assertFalse(gs_offloader._is_valid_result(
818            None, gs_offloader.CTS_RESULT_PATTERN, 'arc-cts'))
819        self.assertFalse(gs_offloader._is_valid_result(
820            release_build, gs_offloader.CTS_RESULT_PATTERN, None))
821        self.assertFalse(gs_offloader._is_valid_result(
822            pfq_build, gs_offloader.CTS_RESULT_PATTERN, 'arc-cts'))
823        self.assertFalse(gs_offloader._is_valid_result(
824            trybot_build, gs_offloader.CTS_RESULT_PATTERN, 'arc-cts'))
825        self.assertFalse(gs_offloader._is_valid_result(
826            trybot_2_build, gs_offloader.CTS_RESULT_PATTERN, 'arc-cts'))
827        self.assertTrue(gs_offloader._is_valid_result(
828            release_2_build, gs_offloader.CTS_RESULT_PATTERN, 'arc-cts'))
829
830
831    def create_results_folder(self):
832        """Create CTS/GTS results folders."""
833        results_folder = tempfile.mkdtemp()
834        host_folder = os.path.join(results_folder, 'chromeos4-row9-rack11-host22')
835        debug_folder = os.path.join(host_folder, 'debug')
836        sysinfo_folder = os.path.join(host_folder, 'sysinfo')
837        cts_result_folder = os.path.join(
838                host_folder, 'cheets_CTS.android.dpi', 'results', 'cts-results')
839        cts_v2_result_folder = os.path.join(host_folder,
840                'cheets_CTS_N.CtsGraphicsTestCases', 'results', 'android-cts')
841        gts_result_folder = os.path.join(
842                host_folder, 'cheets_GTS.google.admin', 'results', 'android-gts')
843        timestamp_str = '2016.04.28_01.41.44'
844        timestamp_cts_folder = os.path.join(cts_result_folder, timestamp_str)
845        timestamp_cts_v2_folder = os.path.join(cts_v2_result_folder, timestamp_str)
846        timestamp_gts_folder = os.path.join(gts_result_folder, timestamp_str)
847
848        # Build host keyvals set to parse model info.
849        host_info_path = os.path.join(host_folder, 'host_keyvals')
850        dir_to_create = '/'
851        for tdir in host_info_path.split('/'):
852            dir_to_create = os.path.join(dir_to_create, tdir)
853            if not os.path.exists(dir_to_create):
854                os.mkdir(dir_to_create)
855        with open(os.path.join(host_info_path, 'chromeos4-row9-rack11-host22'), 'w') as store_file:
856            store_file.write('labels=board%3Acoral,hw_video_acc_vp9,cros,'+
857                             'hw_jpeg_acc_dec,bluetooth,model%3Arobo360,'+
858                             'accel%3Acros-ec,'+
859                             'sku%3Arobo360_IntelR_CeleronR_CPU_N3450_1_10GHz_4Gb')
860
861        # .autoserv_execute file is needed for the test results package to look
862        # legit.
863        autoserve_path = os.path.join(host_folder, '.autoserv_execute')
864        with open(autoserve_path, 'w') as temp_file:
865            temp_file.write(' ')
866
867        # Test results in cts_result_folder with a different time-stamp.
868        timestamp_str_2 = '2016.04.28_10.41.44'
869        timestamp_cts_folder_2 = os.path.join(cts_result_folder, timestamp_str_2)
870
871        for folder in [debug_folder, sysinfo_folder, cts_result_folder,
872                       timestamp_cts_folder, timestamp_cts_folder_2,
873                       timestamp_cts_v2_folder, timestamp_gts_folder]:
874            os.makedirs(folder)
875
876        path_pattern_pair = [(timestamp_cts_folder, gs_offloader.CTS_RESULT_PATTERN),
877                             (timestamp_cts_folder_2, gs_offloader.CTS_RESULT_PATTERN),
878                             (timestamp_cts_folder_2, gs_offloader.CTS_COMPRESSED_RESULT_PATTERN),
879                             (timestamp_cts_v2_folder, gs_offloader.CTS_V2_RESULT_PATTERN),
880                             (timestamp_cts_v2_folder, gs_offloader.CTS_V2_COMPRESSED_RESULT_PATTERN),
881                             (timestamp_gts_folder, gs_offloader.CTS_V2_RESULT_PATTERN)]
882
883        # Create timestamp.zip file_path.
884        cts_zip_file = os.path.join(cts_result_folder, timestamp_str + '.zip')
885        cts_zip_file_2 = os.path.join(cts_result_folder, timestamp_str_2 + '.zip')
886        cts_v2_zip_file = os.path.join(cts_v2_result_folder, timestamp_str + '.zip')
887        gts_zip_file = os.path.join(gts_result_folder, timestamp_str + '.zip')
888
889        # Create xml file_path.
890        cts_result_file = os.path.join(timestamp_cts_folder, 'testResult.xml')
891        cts_result_file_2 = os.path.join(timestamp_cts_folder_2,
892                                         'testResult.xml')
893        cts_result_compressed_file_2 = os.path.join(timestamp_cts_folder_2,
894                                                     'testResult.xml.tgz')
895        gts_result_file = os.path.join(timestamp_gts_folder, 'test_result.xml')
896        cts_v2_result_file = os.path.join(timestamp_cts_v2_folder,
897                                         'test_result.xml')
898        cts_v2_result_compressed_file = os.path.join(timestamp_cts_v2_folder,
899                                         'test_result.xml.tgz')
900
901        for file_path in [cts_zip_file, cts_zip_file_2, cts_v2_zip_file,
902                          gts_zip_file, cts_result_file, cts_result_file_2,
903                          cts_result_compressed_file_2, gts_result_file,
904                          cts_v2_result_file, cts_v2_result_compressed_file]:
905          if file_path.endswith('tgz'):
906              test_result_file = gs_offloader.CTS_COMPRESSED_RESULT_TYPES[
907                      os.path.basename(file_path)]
908              with open(test_result_file, 'w') as f:
909                  f.write('test')
910              with tarfile.open(file_path, 'w:gz') as tar_file:
911                  tar_file.add(test_result_file)
912              os.remove(test_result_file)
913          else:
914              with open(file_path, 'w') as f:
915                  f.write('test')
916
917        return (results_folder, host_folder, path_pattern_pair)
918
919
920    def test__upload_cts_testresult(self):
921        """Test _upload_cts_testresult."""
922        results_folder, host_folder, path_pattern_pair = self.create_results_folder()
923
924        self.mox.StubOutWithMock(gs_offloader, '_upload_files')
925        gs_offloader._upload_files(
926            mox.IgnoreArg(), mox.IgnoreArg(), mox.IgnoreArg(), False,
927                mox.IgnoreArg(), mox.IgnoreArg()).AndReturn(
928                ['test', '-d', host_folder])
929        gs_offloader._upload_files(
930            mox.IgnoreArg(), mox.IgnoreArg(), mox.IgnoreArg(), False,
931                mox.IgnoreArg(), mox.IgnoreArg()).AndReturn(
932                ['test', '-d', host_folder])
933        gs_offloader._upload_files(
934            mox.IgnoreArg(), mox.IgnoreArg(), mox.IgnoreArg(), False,
935                mox.IgnoreArg(), mox.IgnoreArg()).AndReturn(
936                ['test', '-d', host_folder])
937
938        self.mox.ReplayAll()
939        gs_offloader._upload_cts_testresult(results_folder, False)
940        self.mox.VerifyAll()
941        shutil.rmtree(results_folder)
942
943
944    def test_parse_cts_job_results_file_path(self):
945        # A autotest path
946        path = ('/317739475-chromeos-test/chromeos4-row9-rack11-host22/'
947                'cheets_CTS.android.dpi/results/cts-results/'
948                '2016.04.28_01.41.44')
949        job_id, package, timestamp = \
950            gs_offloader._parse_cts_job_results_file_path(path)
951        self.assertEqual('317739475-chromeos-test', job_id)
952        self.assertEqual('cheets_CTS.android.dpi', package)
953        self.assertEqual('2016.04.28_01.41.44', timestamp)
954
955
956        # A skylab path
957        path = ('/swarming-458e3a3a7fc6f210/1/autoserv_test/'
958                'cheets_CTS.android.dpi/results/cts-results/'
959                '2016.04.28_01.41.44')
960        job_id, package, timestamp = \
961            gs_offloader._parse_cts_job_results_file_path(path)
962        self.assertEqual('swarming-458e3a3a7fc6f210-1', job_id)
963        self.assertEqual('cheets_CTS.android.dpi', package)
964        self.assertEqual('2016.04.28_01.41.44', timestamp)
965
966
967    def test_upload_files(self):
968        """Test upload_files"""
969        results_folder, host_folder, path_pattern_pair = self.create_results_folder()
970
971        for path, pattern in path_pattern_pair:
972            models.test.parse_job_keyval(mox.IgnoreArg()).AndReturn({
973                'build': 'veyron_minnie-cheets-release/R52-8248.0.0',
974                'hostname': 'chromeos4-row9-rack11-host22',
975                'parent_job_id': 'p_id',
976                'suite': 'arc-cts'
977            })
978
979            gs_offloader._get_cmd_list(
980                False, mox.IgnoreArg(), mox.IgnoreArg()).AndReturn(
981                    ['test', '-d', path])
982            gs_offloader._get_cmd_list(
983                False, mox.IgnoreArg(), mox.IgnoreArg()).AndReturn(
984                    ['test', '-d', path])
985
986            self.mox.ReplayAll()
987            gs_offloader._upload_files(host_folder, path, pattern, False,
988                                       'gs://a-test-bucket/',
989                                       'gs://a-test-apfe-bucket/')
990            self.mox.VerifyAll()
991            self.mox.ResetAll()
992
993        shutil.rmtree(results_folder)
994
995
996    def test_get_metrics_fields(self):
997        """Test method _get_metrics_fields."""
998        results_folder, host_folder, _ = self.create_results_folder()
999        models.test.parse_job_keyval(mox.IgnoreArg()).AndReturn({
1000                'build': 'veyron_minnie-cheets-release/R52-8248.0.0',
1001                'parent_job_id': 'p_id',
1002                'suite': 'arc-cts'
1003            })
1004        try:
1005            self.mox.ReplayAll()
1006            self.assertEqual({'board': 'veyron_minnie-cheets',
1007                              'milestone': 'R52'},
1008                             gs_offloader._get_metrics_fields(host_folder))
1009            self.mox.VerifyAll()
1010        finally:
1011            shutil.rmtree(results_folder)
1012
1013
1014class OffladerConfigTests(_TempResultsDirTestBase):
1015    """Tests for the `Offloader` to follow side_effect config."""
1016
1017    def setUp(self):
1018        super(OffladerConfigTests, self).setUp()
1019        gs_offloader.GS_OFFLOADING_ENABLED = True
1020        gs_offloader.GS_OFFLOADER_MULTIPROCESSING = True
1021        self.dest_path = '/results'
1022        self.mox.StubOutWithMock(gs_offloader, '_get_metrics_fields')
1023        self.mox.StubOutWithMock(gs_offloader, '_OffloadError')
1024        self.mox.StubOutWithMock(gs_offloader, '_upload_cts_testresult')
1025        self.mox.StubOutWithMock(gs_offloader, '_emit_offload_metrics')
1026        self.mox.StubOutWithMock(gs_offloader, '_get_cmd_list')
1027        self.mox.StubOutWithMock(subprocess, 'Popen')
1028        self.mox.StubOutWithMock(gs_offloader, '_emit_gs_returncode_metric')
1029
1030
1031    def _run(self, results_dir, gs_bucket, expect_dest, cts_enabled):
1032        stdout = os.path.join(results_dir, 'std.log')
1033        stderr = os.path.join(results_dir, 'std.err')
1034        config = {
1035            'tko': {
1036                'proxy_socket': '/file-system/foo-socket',
1037                'mysql_user': 'foo-user',
1038                'mysql_password_file': '/file-system/foo-password-file'
1039            },
1040            'google_storage': {
1041                'bucket': gs_bucket,
1042                'credentials_file': '/foo-creds'
1043            },
1044            'cts': {
1045                'enabled': cts_enabled,
1046            },
1047            'this_field_is_ignored': True
1048        }
1049        path = os.path.join(results_dir, 'side_effects_config.json')
1050        with open(path, 'w') as f:
1051            f.write(json.dumps(config))
1052        gs_offloader._get_metrics_fields(results_dir)
1053        if cts_enabled:
1054            gs_offloader._upload_cts_testresult(results_dir, True)
1055        gs_offloader._get_cmd_list(
1056            True,
1057            mox.IgnoreArg(),
1058            expect_dest).AndReturn(['test', '-d', expect_dest])
1059        subprocess.Popen(mox.IgnoreArg(),
1060                         stdout=stdout,
1061                         stderr=stderr).AndReturn(_get_fake_process())
1062        gs_offloader._OffloadError(mox.IgnoreArg())
1063        gs_offloader._emit_gs_returncode_metric(mox.IgnoreArg()).AndReturn(True)
1064        gs_offloader._emit_offload_metrics(mox.IgnoreArg()).AndReturn(True)
1065        sub_offloader = gs_offloader.GSOffloader(results_dir, True, 0, None)
1066        subprocess.Popen(mox.IgnoreArg(),
1067                         stdout=stdout,
1068                         stderr=stderr).AndReturn(_get_fake_process())
1069        self.mox.ReplayAll()
1070        sub_offloader._try_offload(results_dir, self.dest_path, stdout, stderr)
1071        self.mox.VerifyAll()
1072        self.mox.ResetAll()
1073        shutil.rmtree(results_dir)
1074
1075
1076    def test_upload_files_to_dev(self):
1077        """Test upload results to dev gs bucket and skip cts uploading."""
1078        res = tempfile.mkdtemp()
1079        gs_bucket = 'dev-bucket'
1080        expect_dest = 'gs://' + gs_bucket + self.dest_path
1081        self._run(res, gs_bucket, expect_dest, False)
1082
1083
1084    def test_upload_files_prod(self):
1085        """Test upload results to the prod gs bucket and also upload to cts."""
1086        res = tempfile.mkdtemp()
1087        gs_bucket = 'prod-bucket'
1088        expect_dest = 'gs://' + gs_bucket + self.dest_path
1089        self._run(res, gs_bucket, expect_dest, True)
1090
1091
1092    def test_skip_gs_prefix(self):
1093        """Test skip the 'gs://' prefix if already presented."""
1094        res = tempfile.mkdtemp()
1095        gs_bucket = 'gs://prod-bucket'
1096        expect_dest = gs_bucket + self.dest_path
1097        self._run(res, gs_bucket, expect_dest, True)
1098
1099
1100class JobDirectoryOffloadTests(_TempResultsDirTestBase):
1101    """Tests for `_JobDirectory.enqueue_offload()`.
1102
1103    When testing with a `days_old` parameter of 0, we use
1104    `set_finished()` instead of `set_expired()`.  This causes the
1105    job's timestamp to be set in the future.  This is done so as
1106    to test that when `days_old` is 0, the job is always treated
1107    as eligible for offload, regardless of the timestamp's value.
1108
1109    Testing covers the following assertions:
1110     A. Each time `enqueue_offload()` is called, a message that
1111        includes the job's directory name will be logged using
1112        `logging.debug()`, regardless of whether the job was
1113        enqueued.  Nothing else is allowed to be logged.
1114     B. If the job is not eligible to be offloaded,
1115        `first_offload_start` and `offload_count` are 0.
1116     C. If the job is not eligible for offload, nothing is
1117        enqueued in `queue`.
1118     D. When the job is offloaded, `offload_count` increments
1119        each time.
1120     E. When the job is offloaded, the appropriate parameters are
1121        enqueued exactly once.
1122     F. The first time a job is offloaded, `first_offload_start` is
1123        set to the current time.
1124     G. `first_offload_start` only changes the first time that the
1125        job is offloaded.
1126
1127    The test cases below are designed to exercise all of the
1128    meaningful state transitions at least once.
1129
1130    """
1131
1132    def setUp(self):
1133        super(JobDirectoryOffloadTests, self).setUp()
1134        self._job = self.make_job(self.REGULAR_JOBLIST[0])
1135        self._queue = Queue.Queue()
1136
1137
1138    def _offload_unexpired_job(self, days_old):
1139        """Make calls to `enqueue_offload()` for an unexpired job.
1140
1141        This method tests assertions B and C that calling
1142        `enqueue_offload()` has no effect.
1143
1144        """
1145        self.assertEqual(self._job.offload_count, 0)
1146        self.assertEqual(self._job.first_offload_start, 0)
1147        gs_offloader._enqueue_offload(self._job, self._queue, days_old)
1148        gs_offloader._enqueue_offload(self._job, self._queue, days_old)
1149        self.assertTrue(self._queue.empty())
1150        self.assertEqual(self._job.offload_count, 0)
1151        self.assertEqual(self._job.first_offload_start, 0)
1152
1153
1154    def _offload_expired_once(self, days_old, count):
1155        """Make one call to `enqueue_offload()` for an expired job.
1156
1157        This method tests assertions D and E regarding side-effects
1158        expected when a job is offloaded.
1159
1160        """
1161        gs_offloader._enqueue_offload(self._job, self._queue, days_old)
1162        self.assertEqual(self._job.offload_count, count)
1163        self.assertFalse(self._queue.empty())
1164        v = self._queue.get_nowait()
1165        self.assertTrue(self._queue.empty())
1166        self.assertEqual(v, self._job.queue_args)
1167
1168
1169    def _offload_expired_job(self, days_old):
1170        """Make calls to `enqueue_offload()` for a just-expired job.
1171
1172        This method directly tests assertions F and G regarding
1173        side-effects on `first_offload_start`.
1174
1175        """
1176        t0 = time.time()
1177        self._offload_expired_once(days_old, 1)
1178        t1 = self._job.first_offload_start
1179        self.assertLessEqual(t1, time.time())
1180        self.assertGreaterEqual(t1, t0)
1181        self._offload_expired_once(days_old, 2)
1182        self.assertEqual(self._job.first_offload_start, t1)
1183        self._offload_expired_once(days_old, 3)
1184        self.assertEqual(self._job.first_offload_start, t1)
1185
1186
1187    def test_case_1_no_expiration(self):
1188        """Test a series of `enqueue_offload()` calls with `days_old` of 0.
1189
1190        This tests that offload works as expected if calls are
1191        made both before and after the job becomes expired.
1192
1193        """
1194        self._offload_unexpired_job(0)
1195        self._job.set_finished(0)
1196        self._offload_expired_job(0)
1197
1198
1199    def test_case_2_no_expiration(self):
1200        """Test a series of `enqueue_offload()` calls with `days_old` of 0.
1201
1202        This tests that offload works as expected if calls are made
1203        only after the job becomes expired.
1204
1205        """
1206        self._job.set_finished(0)
1207        self._offload_expired_job(0)
1208
1209
1210    def test_case_1_with_expiration(self):
1211        """Test a series of `enqueue_offload()` calls with `days_old` non-zero.
1212
1213        This tests that offload works as expected if calls are made
1214        before the job finishes, before the job expires, and after
1215        the job expires.
1216
1217        """
1218        self._offload_unexpired_job(_TEST_EXPIRATION_AGE)
1219        self._job.set_finished(_TEST_EXPIRATION_AGE)
1220        self._offload_unexpired_job(_TEST_EXPIRATION_AGE)
1221        self._job.set_expired(_TEST_EXPIRATION_AGE)
1222        self._offload_expired_job(_TEST_EXPIRATION_AGE)
1223
1224
1225    def test_case_2_with_expiration(self):
1226        """Test a series of `enqueue_offload()` calls with `days_old` non-zero.
1227
1228        This tests that offload works as expected if calls are made
1229        between finishing and expiration, and after the job expires.
1230
1231        """
1232        self._job.set_finished(_TEST_EXPIRATION_AGE)
1233        self._offload_unexpired_job(_TEST_EXPIRATION_AGE)
1234        self._job.set_expired(_TEST_EXPIRATION_AGE)
1235        self._offload_expired_job(_TEST_EXPIRATION_AGE)
1236
1237
1238    def test_case_3_with_expiration(self):
1239        """Test a series of `enqueue_offload()` calls with `days_old` non-zero.
1240
1241        This tests that offload works as expected if calls are made
1242        only before finishing and after expiration.
1243
1244        """
1245        self._offload_unexpired_job(_TEST_EXPIRATION_AGE)
1246        self._job.set_expired(_TEST_EXPIRATION_AGE)
1247        self._offload_expired_job(_TEST_EXPIRATION_AGE)
1248
1249
1250    def test_case_4_with_expiration(self):
1251        """Test a series of `enqueue_offload()` calls with `days_old` non-zero.
1252
1253        This tests that offload works as expected if calls are made
1254        only after expiration.
1255
1256        """
1257        self._job.set_expired(_TEST_EXPIRATION_AGE)
1258        self._offload_expired_job(_TEST_EXPIRATION_AGE)
1259
1260
1261class GetJobDirectoriesTests(_TempResultsDirTestBase):
1262    """Tests for `_JobDirectory.get_job_directories()`."""
1263
1264    def setUp(self):
1265        super(GetJobDirectoriesTests, self).setUp()
1266        self.make_job_hierarchy()
1267        os.mkdir('not-a-job')
1268        open('not-a-dir', 'w').close()
1269
1270
1271    def _run_get_directories(self, cls, expected_list):
1272        """Test `get_job_directories()` for the given class.
1273
1274        Calls the method, and asserts that the returned list of
1275        directories matches the expected return value.
1276
1277        @param expected_list Expected return value from the call.
1278        """
1279        dirlist = cls.get_job_directories()
1280        self.assertEqual(set(dirlist), set(expected_list))
1281
1282
1283    def test_get_regular_jobs(self):
1284        """Test `RegularJobDirectory.get_job_directories()`."""
1285        self._run_get_directories(job_directories.RegularJobDirectory,
1286                                  self.REGULAR_JOBLIST)
1287
1288
1289    def test_get_special_jobs(self):
1290        """Test `SpecialJobDirectory.get_job_directories()`."""
1291        self._run_get_directories(job_directories.SpecialJobDirectory,
1292                                  self.SPECIAL_JOBLIST)
1293
1294
1295class AddJobsTests(_TempResultsDirTestBase):
1296    """Tests for `Offloader._add_new_jobs()`."""
1297
1298    MOREJOBS = ['115-fubar', '116-fubar', '117-fubar', '118-snafu']
1299
1300    def setUp(self):
1301        super(AddJobsTests, self).setUp()
1302        self._initial_job_names = (
1303            set(self.REGULAR_JOBLIST) | set(self.SPECIAL_JOBLIST))
1304        self.make_job_hierarchy()
1305        self._offloader = gs_offloader.Offloader(_get_options(['-a']))
1306        self.mox.StubOutWithMock(logging, 'debug')
1307
1308
1309    def _run_add_new_jobs(self, expected_key_set):
1310        """Basic test assertions for `_add_new_jobs()`.
1311
1312        Asserts the following:
1313          * The keys in the offloader's `_open_jobs` dictionary
1314            matches the expected set of keys.
1315          * For every job in `_open_jobs`, the job has the expected
1316            directory name.
1317
1318        """
1319        count = len(expected_key_set) - len(self._offloader._open_jobs)
1320        logging.debug(mox.IgnoreArg(), count)
1321        self.mox.ReplayAll()
1322        self._offloader._add_new_jobs()
1323        self.assertEqual(expected_key_set,
1324                         set(self._offloader._open_jobs.keys()))
1325        for jobkey, job in self._offloader._open_jobs.items():
1326            self.assertEqual(jobkey, job.dirname)
1327        self.mox.VerifyAll()
1328        self.mox.ResetAll()
1329
1330
1331    def test_add_jobs_empty(self):
1332        """Test adding jobs to an empty dictionary.
1333
1334        Calls the offloader's `_add_new_jobs()`, then perform
1335        the assertions of `self._check_open_jobs()`.
1336
1337        """
1338        self._run_add_new_jobs(self._initial_job_names)
1339
1340
1341    def test_add_jobs_non_empty(self):
1342        """Test adding jobs to a non-empty dictionary.
1343
1344        Calls the offloader's `_add_new_jobs()` twice; once from
1345        initial conditions, and then again after adding more
1346        directories.  After the second call, perform the assertions
1347        of `self._check_open_jobs()`.  Additionally, assert that
1348        keys added by the first call still map to their original
1349        job object after the second call.
1350
1351        """
1352        self._run_add_new_jobs(self._initial_job_names)
1353        jobs_copy = self._offloader._open_jobs.copy()
1354        for d in self.MOREJOBS:
1355            os.mkdir(d)
1356        self._run_add_new_jobs(self._initial_job_names |
1357                                 set(self.MOREJOBS))
1358        for key in jobs_copy.keys():
1359            self.assertIs(jobs_copy[key],
1360                          self._offloader._open_jobs[key])
1361
1362
1363class ReportingTests(_TempResultsDirTestBase):
1364    """Tests for `Offloader._report_failed_jobs()`."""
1365
1366    def setUp(self):
1367        super(ReportingTests, self).setUp()
1368        self._offloader = gs_offloader.Offloader(_get_options([]))
1369        self.mox.StubOutWithMock(self._offloader, '_log_failed_jobs_locally')
1370        self.mox.StubOutWithMock(logging, 'debug')
1371
1372
1373    def _add_job(self, jobdir):
1374        """Add a job to the dictionary of unfinished jobs."""
1375        j = self.make_job(jobdir)
1376        self._offloader._open_jobs[j.dirname] = j
1377        return j
1378
1379
1380    def _expect_log_message(self, new_open_jobs, with_failures):
1381        """Mock expected logging calls.
1382
1383        `_report_failed_jobs()` logs one message with the number
1384        of jobs removed from the open job set and the number of jobs
1385        still remaining.  Additionally, if there are reportable
1386        jobs, then it logs the number of jobs that haven't yet
1387        offloaded.
1388
1389        This sets up the logging calls using `new_open_jobs` to
1390        figure the job counts.  If `with_failures` is true, then
1391        the log message is set up assuming that all jobs in
1392        `new_open_jobs` have offload failures.
1393
1394        @param new_open_jobs New job set for calculating counts
1395                             in the messages.
1396        @param with_failures Whether the log message with a
1397                             failure count is expected.
1398
1399        """
1400        count = len(self._offloader._open_jobs) - len(new_open_jobs)
1401        logging.debug(mox.IgnoreArg(), count, len(new_open_jobs))
1402        if with_failures:
1403            logging.debug(mox.IgnoreArg(), len(new_open_jobs))
1404
1405
1406    def _run_update(self, new_open_jobs):
1407        """Call `_report_failed_jobs()`.
1408
1409        Initial conditions are set up by the caller.  This calls
1410        `_report_failed_jobs()` once, and then checks these
1411        assertions:
1412          * The offloader's new `_open_jobs` field contains only
1413            the entries in `new_open_jobs`.
1414
1415        @param new_open_jobs A dictionary representing the expected
1416                             new value of the offloader's
1417                             `_open_jobs` field.
1418        """
1419        self.mox.ReplayAll()
1420        self._offloader._report_failed_jobs()
1421        self._offloader._remove_offloaded_jobs()
1422        self.assertEqual(self._offloader._open_jobs, new_open_jobs)
1423        self.mox.VerifyAll()
1424        self.mox.ResetAll()
1425
1426
1427    def _expect_failed_jobs(self, failed_jobs):
1428        """Mock expected call to log the failed jobs on local disk.
1429
1430        TODO(crbug.com/686904): The fact that we have to mock an internal
1431        function for this test is evidence that we need to pull out the local
1432        file formatter in its own object in a future CL.
1433
1434        @param failed_jobs: The list of jobs being logged as failed.
1435        """
1436        self._offloader._log_failed_jobs_locally(failed_jobs)
1437
1438
1439    def test_no_jobs(self):
1440        """Test `_report_failed_jobs()` with no open jobs.
1441
1442        Initial conditions are an empty `_open_jobs` list.
1443        Expected result is an empty `_open_jobs` list.
1444
1445        """
1446        self._expect_log_message({}, False)
1447        self._expect_failed_jobs([])
1448        self._run_update({})
1449
1450
1451    def test_all_completed(self):
1452        """Test `_report_failed_jobs()` with only complete jobs.
1453
1454        Initial conditions are an `_open_jobs` list consisting of only completed
1455        jobs.
1456        Expected result is an empty `_open_jobs` list.
1457
1458        """
1459        for d in self.REGULAR_JOBLIST:
1460            self._add_job(d).set_complete()
1461        self._expect_log_message({}, False)
1462        self._expect_failed_jobs([])
1463        self._run_update({})
1464
1465
1466    def test_none_finished(self):
1467        """Test `_report_failed_jobs()` with only unfinished jobs.
1468
1469        Initial conditions are an `_open_jobs` list consisting of only
1470        unfinished jobs.
1471        Expected result is no change to the `_open_jobs` list.
1472
1473        """
1474        for d in self.REGULAR_JOBLIST:
1475            self._add_job(d)
1476        new_jobs = self._offloader._open_jobs.copy()
1477        self._expect_log_message(new_jobs, False)
1478        self._expect_failed_jobs([])
1479        self._run_update(new_jobs)
1480
1481
1482class GsOffloaderMockTests(_TempResultsDirTestCase):
1483    """Tests using mock instead of mox."""
1484
1485    def setUp(self):
1486        super(GsOffloaderMockTests, self).setUp()
1487        alarm = mock.patch('signal.alarm', return_value=0)
1488        alarm.start()
1489        self.addCleanup(alarm.stop)
1490
1491        self._saved_loglevel = logging.getLogger().getEffectiveLevel()
1492        logging.getLogger().setLevel(logging.CRITICAL + 1)
1493
1494        self._job = self.make_job(self.REGULAR_JOBLIST[0])
1495
1496
1497    def test_offload_timeout_early(self):
1498        """Test that `offload_dir()` times out correctly.
1499
1500        This test triggers timeout at the earliest possible moment,
1501        at the first call to set the timeout alarm.
1502
1503        """
1504        signal.alarm.side_effect = [0, timeout_util.TimeoutError('fubar')]
1505        with mock.patch.object(gs_offloader, '_upload_cts_testresult',
1506                               autospec=True) as upload:
1507            upload.return_value = None
1508            gs_offloader.GSOffloader(
1509                    utils.DEFAULT_OFFLOAD_GSURI, False, 0).offload(
1510                            self._job.queue_args[0],
1511                            self._job.queue_args[1],
1512                            self._job.queue_args[2])
1513            self.assertTrue(os.path.isdir(self._job.queue_args[0]))
1514
1515
1516    # TODO(ayatane): This tests passes when run locally, but it fails
1517    # when run on trybot.  I have no idea why, but the assert isdir
1518    # fails.
1519    #
1520    # This test is also kind of redundant since we are using the timeout
1521    # from chromite which has its own tests.
1522    @unittest.skip('This fails on trybot')
1523    def test_offload_timeout_late(self):
1524        """Test that `offload_dir()` times out correctly.
1525
1526        This test triggers timeout at the latest possible moment, at
1527        the call to clear the timeout alarm.
1528
1529        """
1530        signal.alarm.side_effect = [0, 0, timeout_util.TimeoutError('fubar')]
1531        with mock.patch.object(gs_offloader, '_upload_cts_testresult',
1532                               autospec=True) as upload, \
1533             mock.patch.object(gs_offloader, '_get_cmd_list',
1534                               autospec=True) as get_cmd_list:
1535            upload.return_value = None
1536            get_cmd_list.return_value = ['test', '-d', self._job.queue_args[0]]
1537            gs_offloader.GSOffloader(
1538                    utils.DEFAULT_OFFLOAD_GSURI, False, 0).offload(
1539                            self._job.queue_args[0],
1540                            self._job.queue_args[1],
1541                            self._job.queue_args[2])
1542            self.assertTrue(os.path.isdir(self._job.queue_args[0]))
1543
1544
1545
1546if __name__ == '__main__':
1547    unittest.main()
1548