1#!/usr/bin/python2 2# Copyright 2016 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6import __builtin__ 7import Queue 8import json 9import logging 10import os 11import shutil 12import signal 13import stat 14import subprocess 15import sys 16import tarfile 17import tempfile 18import time 19import unittest 20 21import mock 22import mox 23 24import common 25from autotest_lib.client.common_lib import global_config 26from autotest_lib.client.common_lib import utils 27#For unittest without cloud_client.proto compiled. 28try: 29 from autotest_lib.site_utils import cloud_console_client 30except ImportError: 31 cloud_console_client = None 32from autotest_lib.site_utils import gs_offloader 33from autotest_lib.site_utils import job_directories 34from autotest_lib.site_utils import job_directories_unittest as jd_test 35from autotest_lib.tko import models 36from autotest_lib.utils import gslib 37from autotest_lib.site_utils import pubsub_utils 38from chromite.lib import timeout_util 39 40# Test value to use for `days_old`, if nothing else is required. 41_TEST_EXPIRATION_AGE = 7 42 43 44def _get_options(argv): 45 """Helper function to exercise command line parsing. 46 47 @param argv Value of sys.argv to be parsed. 48 49 """ 50 sys.argv = ['bogus.py'] + argv 51 return gs_offloader.parse_options() 52 53 54def is_fifo(path): 55 """Determines whether a path is a fifo. 56 57 @param path: fifo path string. 58 """ 59 return stat.S_ISFIFO(os.lstat(path).st_mode) 60 61 62def _get_fake_process(): 63 return FakeProcess() 64 65 66class FakeProcess(object): 67 """Fake process object.""" 68 69 def __init__(self): 70 self.returncode = 0 71 72 73 def wait(self): 74 return True 75 76 77class OffloaderOptionsTests(mox.MoxTestBase): 78 """Tests for the `Offloader` constructor. 79 80 Tests that offloader instance fields are set as expected 81 for given command line options. 82 83 """ 84 85 _REGULAR_ONLY = {job_directories.SwarmingJobDirectory, 86 job_directories.RegularJobDirectory} 87 _SPECIAL_ONLY = {job_directories.SwarmingJobDirectory, 88 job_directories.SpecialJobDirectory} 89 _BOTH = _REGULAR_ONLY | _SPECIAL_ONLY 90 91 92 def setUp(self): 93 super(OffloaderOptionsTests, self).setUp() 94 self.mox.StubOutWithMock(utils, 'get_offload_gsuri') 95 gs_offloader.GS_OFFLOADING_ENABLED = True 96 gs_offloader.GS_OFFLOADER_MULTIPROCESSING = False 97 98 99 def _mock_get_sub_offloader(self, is_moblab, multiprocessing=False, 100 console_client=None, delete_age=0): 101 """Mock the process of getting the offload_dir function.""" 102 if is_moblab: 103 expected_gsuri = '%sresults/%s/%s/' % ( 104 global_config.global_config.get_config_value( 105 'CROS', 'image_storage_server'), 106 'Fa:ke:ma:c0:12:34', 'rand0m-uu1d') 107 else: 108 expected_gsuri = utils.DEFAULT_OFFLOAD_GSURI 109 utils.get_offload_gsuri().AndReturn(expected_gsuri) 110 sub_offloader = gs_offloader.GSOffloader(expected_gsuri, 111 multiprocessing, delete_age, console_client) 112 self.mox.StubOutWithMock(gs_offloader, 'GSOffloader') 113 if cloud_console_client: 114 self.mox.StubOutWithMock(cloud_console_client, 115 'is_cloud_notification_enabled') 116 if console_client: 117 cloud_console_client.is_cloud_notification_enabled().AndReturn(True) 118 gs_offloader.GSOffloader( 119 expected_gsuri, multiprocessing, delete_age, 120 mox.IsA(cloud_console_client.PubSubBasedClient)).AndReturn( 121 sub_offloader) 122 else: 123 if cloud_console_client: 124 cloud_console_client.is_cloud_notification_enabled().AndReturn( 125 False) 126 gs_offloader.GSOffloader( 127 expected_gsuri, multiprocessing, delete_age, None).AndReturn( 128 sub_offloader) 129 self.mox.ReplayAll() 130 return sub_offloader 131 132 133 def test_process_no_options(self): 134 """Test default offloader options.""" 135 sub_offloader = self._mock_get_sub_offloader(False) 136 offloader = gs_offloader.Offloader(_get_options([])) 137 self.assertEqual(set(offloader._jobdir_classes), 138 self._REGULAR_ONLY) 139 self.assertEqual(offloader._processes, 1) 140 self.assertEqual(offloader._gs_offloader, 141 sub_offloader) 142 self.assertEqual(offloader._upload_age_limit, 0) 143 self.assertEqual(offloader._delete_age_limit, 0) 144 145 146 def test_process_all_option(self): 147 """Test offloader handling for the --all option.""" 148 sub_offloader = self._mock_get_sub_offloader(False) 149 offloader = gs_offloader.Offloader(_get_options(['--all'])) 150 self.assertEqual(set(offloader._jobdir_classes), self._BOTH) 151 self.assertEqual(offloader._processes, 1) 152 self.assertEqual(offloader._gs_offloader, 153 sub_offloader) 154 self.assertEqual(offloader._upload_age_limit, 0) 155 self.assertEqual(offloader._delete_age_limit, 0) 156 157 158 def test_process_hosts_option(self): 159 """Test offloader handling for the --hosts option.""" 160 sub_offloader = self._mock_get_sub_offloader(False) 161 offloader = gs_offloader.Offloader( 162 _get_options(['--hosts'])) 163 self.assertEqual(set(offloader._jobdir_classes), 164 self._SPECIAL_ONLY) 165 self.assertEqual(offloader._processes, 1) 166 self.assertEqual(offloader._gs_offloader, 167 sub_offloader) 168 self.assertEqual(offloader._upload_age_limit, 0) 169 self.assertEqual(offloader._delete_age_limit, 0) 170 171 172 def test_parallelism_option(self): 173 """Test offloader handling for the --parallelism option.""" 174 sub_offloader = self._mock_get_sub_offloader(False) 175 offloader = gs_offloader.Offloader( 176 _get_options(['--parallelism', '2'])) 177 self.assertEqual(set(offloader._jobdir_classes), 178 self._REGULAR_ONLY) 179 self.assertEqual(offloader._processes, 2) 180 self.assertEqual(offloader._gs_offloader, 181 sub_offloader) 182 self.assertEqual(offloader._upload_age_limit, 0) 183 self.assertEqual(offloader._delete_age_limit, 0) 184 185 186 def test_delete_only_option(self): 187 """Test offloader handling for the --delete_only option.""" 188 offloader = gs_offloader.Offloader( 189 _get_options(['--delete_only'])) 190 self.assertEqual(set(offloader._jobdir_classes), 191 self._REGULAR_ONLY) 192 self.assertEqual(offloader._processes, 1) 193 self.assertIsInstance(offloader._gs_offloader, 194 gs_offloader.FakeGSOffloader) 195 self.assertEqual(offloader._upload_age_limit, 0) 196 self.assertEqual(offloader._delete_age_limit, 0) 197 198 199 def test_days_old_option(self): 200 """Test offloader handling for the --days_old option.""" 201 sub_offloader = self._mock_get_sub_offloader(False, delete_age=7) 202 offloader = gs_offloader.Offloader( 203 _get_options(['--days_old', '7'])) 204 self.assertEqual(set(offloader._jobdir_classes), 205 self._REGULAR_ONLY) 206 self.assertEqual(offloader._processes, 1) 207 self.assertEqual(offloader._gs_offloader, 208 sub_offloader) 209 self.assertEqual(offloader._upload_age_limit, 7) 210 self.assertEqual(offloader._delete_age_limit, 7) 211 212 213 def test_moblab_gsuri_generation(self): 214 """Test offloader construction for Moblab.""" 215 sub_offloader = self._mock_get_sub_offloader(True) 216 offloader = gs_offloader.Offloader(_get_options([])) 217 self.assertEqual(set(offloader._jobdir_classes), 218 self._REGULAR_ONLY) 219 self.assertEqual(offloader._processes, 1) 220 self.assertEqual(offloader._gs_offloader, 221 sub_offloader) 222 self.assertEqual(offloader._upload_age_limit, 0) 223 self.assertEqual(offloader._delete_age_limit, 0) 224 225 226 def test_globalconfig_offloading_flag(self): 227 """Test enabling of --delete_only via global_config.""" 228 gs_offloader.GS_OFFLOADING_ENABLED = False 229 offloader = gs_offloader.Offloader( 230 _get_options([])) 231 self.assertIsInstance(offloader._gs_offloader, 232 gs_offloader.FakeGSOffloader) 233 234 def test_offloader_multiprocessing_flag_set(self): 235 """Test multiprocessing is set.""" 236 sub_offloader = self._mock_get_sub_offloader(True, True) 237 offloader = gs_offloader.Offloader(_get_options(['-m'])) 238 self.assertEqual(offloader._gs_offloader, 239 sub_offloader) 240 self.mox.VerifyAll() 241 242 def test_offloader_multiprocessing_flag_not_set_default_false(self): 243 """Test multiprocessing is set.""" 244 gs_offloader.GS_OFFLOADER_MULTIPROCESSING = False 245 sub_offloader = self._mock_get_sub_offloader(True, False) 246 offloader = gs_offloader.Offloader(_get_options([])) 247 self.assertEqual(offloader._gs_offloader, 248 sub_offloader) 249 self.mox.VerifyAll() 250 251 def test_offloader_multiprocessing_flag_not_set_default_true(self): 252 """Test multiprocessing is set.""" 253 gs_offloader.GS_OFFLOADER_MULTIPROCESSING = True 254 sub_offloader = self._mock_get_sub_offloader(True, True) 255 offloader = gs_offloader.Offloader(_get_options([])) 256 self.assertEqual(offloader._gs_offloader, 257 sub_offloader) 258 self.mox.VerifyAll() 259 260 261 def test_offloader_pubsub_enabled(self): 262 """Test multiprocessing is set.""" 263 if not cloud_console_client: 264 return 265 self.mox.StubOutWithMock(pubsub_utils, "PubSubClient") 266 sub_offloader = self._mock_get_sub_offloader(True, False, 267 cloud_console_client.PubSubBasedClient()) 268 offloader = gs_offloader.Offloader(_get_options([])) 269 self.assertEqual(offloader._gs_offloader, 270 sub_offloader) 271 self.mox.VerifyAll() 272 273 274class _MockJobDirectory(job_directories._JobDirectory): 275 """Subclass of `_JobDirectory` used as a helper for tests.""" 276 277 GLOB_PATTERN = '[0-9]*-*' 278 279 280 def __init__(self, resultsdir): 281 """Create new job in initial state.""" 282 super(_MockJobDirectory, self).__init__(resultsdir) 283 self._timestamp = None 284 self.queue_args = [resultsdir, os.path.dirname(resultsdir), self._timestamp] 285 286 287 def get_timestamp_if_finished(self): 288 return self._timestamp 289 290 291 def set_finished(self, days_old): 292 """Make this job appear to be finished. 293 294 After calling this function, calls to `enqueue_offload()` 295 will find this job as finished, but not expired and ready 296 for offload. Note that when `days_old` is 0, 297 `enqueue_offload()` will treat a finished job as eligible 298 for offload. 299 300 @param days_old The value of the `days_old` parameter that 301 will be passed to `enqueue_offload()` for 302 testing. 303 304 """ 305 self._timestamp = jd_test.make_timestamp(days_old, False) 306 self.queue_args[2] = self._timestamp 307 308 309 def set_expired(self, days_old): 310 """Make this job eligible to be offloaded. 311 312 After calling this function, calls to `offload` will attempt 313 to offload this job. 314 315 @param days_old The value of the `days_old` parameter that 316 will be passed to `enqueue_offload()` for 317 testing. 318 319 """ 320 self._timestamp = jd_test.make_timestamp(days_old, True) 321 self.queue_args[2] = self._timestamp 322 323 324 def set_incomplete(self): 325 """Make this job appear to have failed offload just once.""" 326 self.offload_count += 1 327 self.first_offload_start = time.time() 328 if not os.path.isdir(self.dirname): 329 os.mkdir(self.dirname) 330 331 332 def set_reportable(self): 333 """Make this job be reportable.""" 334 self.set_incomplete() 335 self.offload_count += 1 336 337 338 def set_complete(self): 339 """Make this job be completed.""" 340 self.offload_count += 1 341 if os.path.isdir(self.dirname): 342 os.rmdir(self.dirname) 343 344 345 def process_gs_instructions(self): 346 """Always still offload the job directory.""" 347 return True 348 349 350class CommandListTests(unittest.TestCase): 351 """Tests for `_get_cmd_list()`.""" 352 353 def _command_list_assertions(self, job, use_rsync=True, multi=False): 354 """Call `_get_cmd_list()` and check the return value. 355 356 Check the following assertions: 357 * The command name (argv[0]) is 'gsutil'. 358 * '-m' option (argv[1]) is on when the argument, multi, is True. 359 * The arguments contain the 'cp' subcommand. 360 * The next-to-last argument (the source directory) is the 361 job's `queue_args[0]`. 362 * The last argument (the destination URL) is the job's 363 'queue_args[1]'. 364 365 @param job A job with properly calculated arguments to 366 `_get_cmd_list()` 367 @param use_rsync True when using 'rsync'. False when using 'cp'. 368 @param multi True when using '-m' option for gsutil. 369 370 """ 371 test_bucket_uri = 'gs://a-test-bucket' 372 373 gs_offloader.USE_RSYNC_ENABLED = use_rsync 374 375 gs_path = os.path.join(test_bucket_uri, job.queue_args[1]) 376 377 command = gs_offloader._get_cmd_list( 378 multi, job.queue_args[0], gs_path) 379 380 self.assertEqual(command[0], 'gsutil') 381 if multi: 382 self.assertEqual(command[1], '-m') 383 self.assertEqual(command[-2], job.queue_args[0]) 384 385 if use_rsync: 386 self.assertTrue('rsync' in command) 387 self.assertEqual(command[-1], 388 os.path.join(test_bucket_uri, job.queue_args[0])) 389 else: 390 self.assertTrue('cp' in command) 391 self.assertEqual(command[-1], 392 os.path.join(test_bucket_uri, job.queue_args[1])) 393 394 finish_command = gs_offloader._get_finish_cmd_list(gs_path) 395 self.assertEqual(finish_command[0], 'gsutil') 396 self.assertEqual(finish_command[1], 'cp') 397 self.assertEqual(finish_command[2], '/dev/null') 398 self.assertEqual(finish_command[3], 399 os.path.join(gs_path, '.finished_offload')) 400 401 402 def test__get_cmd_list_regular(self): 403 """Test `_get_cmd_list()` as for a regular job.""" 404 job = _MockJobDirectory('118-debug') 405 self._command_list_assertions(job) 406 407 408 def test__get_cmd_list_special(self): 409 """Test `_get_cmd_list()` as for a special job.""" 410 job = _MockJobDirectory('hosts/host1/118-reset') 411 self._command_list_assertions(job) 412 413 414 def test_get_cmd_list_regular_no_rsync(self): 415 """Test `_get_cmd_list()` as for a regular job.""" 416 job = _MockJobDirectory('118-debug') 417 self._command_list_assertions(job, use_rsync=False) 418 419 420 def test_get_cmd_list_special_no_rsync(self): 421 """Test `_get_cmd_list()` as for a special job.""" 422 job = _MockJobDirectory('hosts/host1/118-reset') 423 self._command_list_assertions(job, use_rsync=False) 424 425 426 def test_get_cmd_list_regular_multi(self): 427 """Test `_get_cmd_list()` as for a regular job with True multi.""" 428 job = _MockJobDirectory('118-debug') 429 self._command_list_assertions(job, multi=True) 430 431 432 def test__get_cmd_list_special_multi(self): 433 """Test `_get_cmd_list()` as for a special job with True multi.""" 434 job = _MockJobDirectory('hosts/host1/118-reset') 435 self._command_list_assertions(job, multi=True) 436 437 438class _TempResultsDirTestCase(unittest.TestCase): 439 """Mixin class for tests using a temporary results directory.""" 440 441 REGULAR_JOBLIST = [ 442 '111-fubar', '112-fubar', '113-fubar', '114-snafu'] 443 HOST_LIST = ['host1', 'host2', 'host3'] 444 SPECIAL_JOBLIST = [ 445 'hosts/host1/333-reset', 'hosts/host1/334-reset', 446 'hosts/host2/444-reset', 'hosts/host3/555-reset'] 447 448 449 def setUp(self): 450 super(_TempResultsDirTestCase, self).setUp() 451 self._resultsroot = tempfile.mkdtemp() 452 self._cwd = os.getcwd() 453 os.chdir(self._resultsroot) 454 455 456 def tearDown(self): 457 os.chdir(self._cwd) 458 shutil.rmtree(self._resultsroot) 459 super(_TempResultsDirTestCase, self).tearDown() 460 461 462 def make_job(self, jobdir): 463 """Create a job with results in `self._resultsroot`. 464 465 @param jobdir Name of the subdirectory to be created in 466 `self._resultsroot`. 467 468 """ 469 os.makedirs(jobdir) 470 return _MockJobDirectory(jobdir) 471 472 473 def make_job_hierarchy(self): 474 """Create a sample hierarchy of job directories. 475 476 `self.REGULAR_JOBLIST` is a list of directories for regular 477 jobs to be created; `self.SPECIAL_JOBLIST` is a list of 478 directories for special jobs to be created. 479 480 """ 481 for d in self.REGULAR_JOBLIST: 482 os.mkdir(d) 483 hostsdir = 'hosts' 484 os.mkdir(hostsdir) 485 for host in self.HOST_LIST: 486 os.mkdir(os.path.join(hostsdir, host)) 487 for d in self.SPECIAL_JOBLIST: 488 os.mkdir(d) 489 490 491class _TempResultsDirTestBase(_TempResultsDirTestCase, mox.MoxTestBase): 492 """Base Mox test class for tests using a temporary results directory.""" 493 494 495class FailedOffloadsLogTest(_TempResultsDirTestBase): 496 """Test the formatting of failed offloads log file.""" 497 # Below is partial sample of a failed offload log file. This text is 498 # deliberately hard-coded and then parsed to create the test data; the idea 499 # is to make sure the actual text format will be reviewed by a human being. 500 # 501 # first offload count directory 502 # --+----1----+---- ----+ ----+----1----+----2----+----3 503 _SAMPLE_DIRECTORIES_REPORT = '''\ 504 =================== ====== ============================== 505 2014-03-14 15:09:26 1 118-fubar 506 2014-03-14 15:19:23 2 117-fubar 507 2014-03-14 15:29:20 6 116-fubar 508 2014-03-14 15:39:17 24 115-fubar 509 2014-03-14 15:49:14 120 114-fubar 510 2014-03-14 15:59:11 720 113-fubar 511 2014-03-14 16:09:08 5040 112-fubar 512 2014-03-14 16:19:05 40320 111-fubar 513 ''' 514 515 def setUp(self): 516 super(FailedOffloadsLogTest, self).setUp() 517 self._offloader = gs_offloader.Offloader(_get_options([])) 518 self._joblist = [] 519 for line in self._SAMPLE_DIRECTORIES_REPORT.split('\n')[1 : -1]: 520 date_, time_, count, dir_ = line.split() 521 job = _MockJobDirectory(dir_) 522 job.offload_count = int(count) 523 timestruct = time.strptime("%s %s" % (date_, time_), 524 gs_offloader.FAILED_OFFLOADS_TIME_FORMAT) 525 job.first_offload_start = time.mktime(timestruct) 526 # enter the jobs in reverse order, to make sure we 527 # test that the output will be sorted. 528 self._joblist.insert(0, job) 529 530 531 def assert_report_well_formatted(self, report_file): 532 """Assert that report file is well formatted. 533 534 @param report_file: Path to report file 535 """ 536 with open(report_file, 'r') as f: 537 report_lines = f.read().split() 538 539 for end_of_header_index in range(len(report_lines)): 540 if report_lines[end_of_header_index].startswith('=='): 541 break 542 self.assertLess(end_of_header_index, len(report_lines), 543 'Failed to find end-of-header marker in the report') 544 545 relevant_lines = report_lines[end_of_header_index:] 546 expected_lines = self._SAMPLE_DIRECTORIES_REPORT.split() 547 self.assertListEqual(relevant_lines, expected_lines) 548 549 550 def test_failed_offload_log_format(self): 551 """Trigger an e-mail report and check its contents.""" 552 log_file = os.path.join(self._resultsroot, 'failed_log') 553 report = self._offloader._log_failed_jobs_locally(self._joblist, 554 log_file=log_file) 555 self.assert_report_well_formatted(log_file) 556 557 558 def test_failed_offload_file_overwrite(self): 559 """Verify that we can saefly overwrite the log file.""" 560 log_file = os.path.join(self._resultsroot, 'failed_log') 561 with open(log_file, 'w') as f: 562 f.write('boohoohoo') 563 report = self._offloader._log_failed_jobs_locally(self._joblist, 564 log_file=log_file) 565 self.assert_report_well_formatted(log_file) 566 567 568class OffloadDirectoryTests(_TempResultsDirTestBase): 569 """Tests for `offload_dir()`.""" 570 571 def setUp(self): 572 super(OffloadDirectoryTests, self).setUp() 573 # offload_dir() logs messages; silence them. 574 self._saved_loglevel = logging.getLogger().getEffectiveLevel() 575 logging.getLogger().setLevel(logging.CRITICAL+1) 576 self._job = self.make_job(self.REGULAR_JOBLIST[0]) 577 self.mox.StubOutWithMock(gs_offloader, '_get_cmd_list') 578 alarm = mock.patch('signal.alarm', return_value=0) 579 alarm.start() 580 self.addCleanup(alarm.stop) 581 self.mox.StubOutWithMock(models.test, 'parse_job_keyval') 582 self.should_remove_sarming_req_dir = False 583 584 585 def tearDown(self): 586 logging.getLogger().setLevel(self._saved_loglevel) 587 super(OffloadDirectoryTests, self).tearDown() 588 589 def _mock__upload_cts_testresult(self): 590 self.mox.StubOutWithMock(gs_offloader, '_upload_cts_testresult') 591 gs_offloader._upload_cts_testresult( 592 mox.IgnoreArg(),mox.IgnoreArg()).AndReturn(None) 593 594 def _mock_create_marker_file(self): 595 self.mox.StubOutWithMock(__builtin__, 'open') 596 open(mox.IgnoreArg(), 'a').AndReturn(mock.MagicMock()) 597 598 599 def _mock_offload_dir_calls(self, command, queue_args, 600 marker_initially_exists=False): 601 """Mock out the calls needed by `offload_dir()`. 602 603 This covers only the calls made when there is no timeout. 604 605 @param command Command list to be returned by the mocked 606 call to `_get_cmd_list()`. 607 608 """ 609 self.mox.StubOutWithMock(os.path, 'isfile') 610 os.path.isfile(mox.IgnoreArg()).AndReturn(marker_initially_exists) 611 command.append(queue_args[0]) 612 gs_offloader._get_cmd_list( 613 False, queue_args[0], 614 '%s%s' % (utils.DEFAULT_OFFLOAD_GSURI, 615 queue_args[1])).AndReturn(command) 616 self._mock__upload_cts_testresult() 617 618 619 def _run_offload_dir(self, should_succeed, delete_age): 620 """Make one call to `offload_dir()`. 621 622 The caller ensures all mocks are set up already. 623 624 @param should_succeed True iff the call to `offload_dir()` 625 is expected to succeed and remove the 626 offloaded job directory. 627 628 """ 629 self.mox.ReplayAll() 630 gs_offloader.GSOffloader( 631 utils.DEFAULT_OFFLOAD_GSURI, False, delete_age).offload( 632 self._job.queue_args[0], 633 self._job.queue_args[1], 634 self._job.queue_args[2]) 635 self.mox.VerifyAll() 636 self.assertEqual(not should_succeed, 637 os.path.isdir(self._job.queue_args[0])) 638 swarming_req_dir = gs_offloader._get_swarming_req_dir( 639 self._job.queue_args[0]) 640 if swarming_req_dir: 641 self.assertEqual(not self.should_remove_sarming_req_dir, 642 os.path.exists(swarming_req_dir)) 643 644 645 def test_offload_success(self): 646 """Test that `offload_dir()` can succeed correctly.""" 647 self._mock_offload_dir_calls(['test', '-d'], 648 self._job.queue_args) 649 os.path.isfile(mox.IgnoreArg()).AndReturn(True) 650 self._mock_create_marker_file() 651 self._run_offload_dir(True, 0) 652 653 654 def test_offload_failure(self): 655 """Test that `offload_dir()` can fail correctly.""" 656 self._mock_offload_dir_calls(['test', '!', '-d'], 657 self._job.queue_args) 658 self._run_offload_dir(False, 0) 659 660 661 def test_offload_swarming_req_dir_remove(self): 662 """Test that `offload_dir()` can prune the empty swarming task dir.""" 663 should_remove = os.path.join('results', 'swarming-123abc0') 664 self._job = self.make_job(os.path.join(should_remove, '1')) 665 self._mock_offload_dir_calls(['test', '-d'], 666 self._job.queue_args) 667 668 os.path.isfile(mox.IgnoreArg()).AndReturn(True) 669 self.should_remove_sarming_req_dir = True 670 self._mock_create_marker_file() 671 self._run_offload_dir(True, 0) 672 673 674 def test_offload_swarming_req_dir_exist(self): 675 """Test that `offload_dir()` keeps the non-empty swarming task dir.""" 676 should_not_remove = os.path.join('results', 'swarming-456edf0') 677 self._job = self.make_job(os.path.join(should_not_remove, '1')) 678 self.make_job(os.path.join(should_not_remove, '2')) 679 self._mock_offload_dir_calls(['test', '-d'], 680 self._job.queue_args) 681 682 os.path.isfile(mox.IgnoreArg()).AndReturn(True) 683 self.should_remove_sarming_req_dir = False 684 self._mock_create_marker_file() 685 self._run_offload_dir(True, 0) 686 687 688 def test_sanitize_dir(self): 689 """Test that folder/file name with invalid character can be corrected. 690 """ 691 results_folder = tempfile.mkdtemp() 692 invalid_chars = '_'.join(['[', ']', '*', '?', '#']) 693 invalid_files = [] 694 invalid_folder_name = 'invalid_name_folder_%s' % invalid_chars 695 invalid_folder = os.path.join( 696 results_folder, 697 invalid_folder_name) 698 invalid_files.append(os.path.join( 699 invalid_folder, 700 'invalid_name_file_%s' % invalid_chars)) 701 good_folder = os.path.join(results_folder, 'valid_name_folder') 702 good_file = os.path.join(good_folder, 'valid_name_file') 703 for folder in [invalid_folder, good_folder]: 704 os.makedirs(folder) 705 for f in invalid_files + [good_file]: 706 with open(f, 'w'): 707 pass 708 # check that broken symlinks don't break sanitization 709 symlink = os.path.join(invalid_folder, 'broken-link') 710 os.symlink(os.path.join(results_folder, 'no-such-file'), 711 symlink) 712 fifo1 = os.path.join(results_folder, 'test_fifo1') 713 fifo2 = os.path.join(good_folder, 'test_fifo2') 714 fifo3 = os.path.join(invalid_folder, 'test_fifo3') 715 invalid_fifo4_name = 'test_fifo4_%s' % invalid_chars 716 fifo4 = os.path.join(invalid_folder, invalid_fifo4_name) 717 os.mkfifo(fifo1) 718 os.mkfifo(fifo2) 719 os.mkfifo(fifo3) 720 os.mkfifo(fifo4) 721 gs_offloader.sanitize_dir(results_folder) 722 for _, dirs, files in os.walk(results_folder): 723 for name in dirs + files: 724 self.assertEqual(name, gslib.escape(name)) 725 for c in name: 726 self.assertFalse(c in ['[', ']', '*', '?', '#']) 727 self.assertTrue(os.path.exists(good_file)) 728 729 self.assertTrue(os.path.exists(fifo1)) 730 self.assertFalse(is_fifo(fifo1)) 731 self.assertTrue(os.path.exists(fifo2)) 732 self.assertFalse(is_fifo(fifo2)) 733 corrected_folder = os.path.join( 734 results_folder, gslib.escape(invalid_folder_name)) 735 corrected_fifo3 = os.path.join( 736 corrected_folder, 737 'test_fifo3') 738 self.assertFalse(os.path.exists(fifo3)) 739 self.assertTrue(os.path.exists(corrected_fifo3)) 740 self.assertFalse(is_fifo(corrected_fifo3)) 741 corrected_fifo4 = os.path.join( 742 corrected_folder, gslib.escape(invalid_fifo4_name)) 743 self.assertFalse(os.path.exists(fifo4)) 744 self.assertTrue(os.path.exists(corrected_fifo4)) 745 self.assertFalse(is_fifo(corrected_fifo4)) 746 747 corrected_symlink = os.path.join( 748 corrected_folder, 749 'broken-link') 750 self.assertFalse(os.path.lexists(symlink)) 751 self.assertTrue(os.path.exists(corrected_symlink)) 752 self.assertFalse(os.path.islink(corrected_symlink)) 753 shutil.rmtree(results_folder) 754 755 756 def check_limit_file_count(self, is_test_job=True): 757 """Test that folder with too many files can be compressed. 758 759 @param is_test_job: True to check the method with test job result 760 folder. Set to False for special task folder. 761 """ 762 results_folder = tempfile.mkdtemp() 763 host_folder = os.path.join( 764 results_folder, 765 'lab1-host1' if is_test_job else 'hosts/lab1-host1/1-repair') 766 debug_folder = os.path.join(host_folder, 'debug') 767 sysinfo_folder = os.path.join(host_folder, 'sysinfo') 768 for folder in [debug_folder, sysinfo_folder]: 769 os.makedirs(folder) 770 for i in range(10): 771 with open(os.path.join(folder, str(i)), 'w') as f: 772 f.write('test') 773 774 gs_offloader._MAX_FILE_COUNT = 100 775 gs_offloader.limit_file_count( 776 results_folder if is_test_job else host_folder) 777 self.assertTrue(os.path.exists(sysinfo_folder)) 778 779 gs_offloader._MAX_FILE_COUNT = 10 780 gs_offloader.limit_file_count( 781 results_folder if is_test_job else host_folder) 782 self.assertFalse(os.path.exists(sysinfo_folder)) 783 self.assertTrue(os.path.exists(sysinfo_folder + '.tgz')) 784 self.assertTrue(os.path.exists(debug_folder)) 785 786 shutil.rmtree(results_folder) 787 788 789 def test_limit_file_count(self): 790 """Test that folder with too many files can be compressed. 791 """ 792 self.check_limit_file_count(is_test_job=True) 793 self.check_limit_file_count(is_test_job=False) 794 795 796 def test_is_valid_result(self): 797 """Test _is_valid_result.""" 798 release_build = 'veyron_minnie-cheets-release/R52-8248.0.0' 799 pfq_build = 'cyan-cheets-android-pfq/R54-8623.0.0-rc1' 800 trybot_build = 'trybot-samus-release/R54-8640.0.0-b5092' 801 trybot_2_build = 'trybot-samus-pfq/R54-8640.0.0-b5092' 802 release_2_build = 'test-trybot-release/R54-8640.0.0-b5092' 803 self.assertTrue(gs_offloader._is_valid_result( 804 release_build, gs_offloader.CTS_RESULT_PATTERN, 'arc-cts')) 805 self.assertTrue(gs_offloader._is_valid_result( 806 release_build, gs_offloader.CTS_RESULT_PATTERN, 'test_that_wrapper')) 807 self.assertTrue(gs_offloader._is_valid_result( 808 release_build, gs_offloader.CTS_RESULT_PATTERN, 'cros_test_platform')) 809 self.assertTrue(gs_offloader._is_valid_result( 810 release_build, gs_offloader.CTS_RESULT_PATTERN, 'bvt-arc')) 811 self.assertTrue(gs_offloader._is_valid_result( 812 release_build, gs_offloader.CTS_RESULT_PATTERN, 'bvt-perbuild')) 813 self.assertFalse(gs_offloader._is_valid_result( 814 release_build, gs_offloader.CTS_RESULT_PATTERN, 'bvt-cq')) 815 self.assertTrue(gs_offloader._is_valid_result( 816 release_build, gs_offloader.CTS_V2_RESULT_PATTERN, 'arc-gts')) 817 self.assertFalse(gs_offloader._is_valid_result( 818 None, gs_offloader.CTS_RESULT_PATTERN, 'arc-cts')) 819 self.assertFalse(gs_offloader._is_valid_result( 820 release_build, gs_offloader.CTS_RESULT_PATTERN, None)) 821 self.assertFalse(gs_offloader._is_valid_result( 822 pfq_build, gs_offloader.CTS_RESULT_PATTERN, 'arc-cts')) 823 self.assertFalse(gs_offloader._is_valid_result( 824 trybot_build, gs_offloader.CTS_RESULT_PATTERN, 'arc-cts')) 825 self.assertFalse(gs_offloader._is_valid_result( 826 trybot_2_build, gs_offloader.CTS_RESULT_PATTERN, 'arc-cts')) 827 self.assertTrue(gs_offloader._is_valid_result( 828 release_2_build, gs_offloader.CTS_RESULT_PATTERN, 'arc-cts')) 829 830 831 def create_results_folder(self): 832 """Create CTS/GTS results folders.""" 833 results_folder = tempfile.mkdtemp() 834 host_folder = os.path.join(results_folder, 'chromeos4-row9-rack11-host22') 835 debug_folder = os.path.join(host_folder, 'debug') 836 sysinfo_folder = os.path.join(host_folder, 'sysinfo') 837 cts_result_folder = os.path.join( 838 host_folder, 'cheets_CTS.android.dpi', 'results', 'cts-results') 839 cts_v2_result_folder = os.path.join(host_folder, 840 'cheets_CTS_N.CtsGraphicsTestCases', 'results', 'android-cts') 841 gts_result_folder = os.path.join( 842 host_folder, 'cheets_GTS.google.admin', 'results', 'android-gts') 843 timestamp_str = '2016.04.28_01.41.44' 844 timestamp_cts_folder = os.path.join(cts_result_folder, timestamp_str) 845 timestamp_cts_v2_folder = os.path.join(cts_v2_result_folder, timestamp_str) 846 timestamp_gts_folder = os.path.join(gts_result_folder, timestamp_str) 847 848 # Build host keyvals set to parse model info. 849 host_info_path = os.path.join(host_folder, 'host_keyvals') 850 dir_to_create = '/' 851 for tdir in host_info_path.split('/'): 852 dir_to_create = os.path.join(dir_to_create, tdir) 853 if not os.path.exists(dir_to_create): 854 os.mkdir(dir_to_create) 855 with open(os.path.join(host_info_path, 'chromeos4-row9-rack11-host22'), 'w') as store_file: 856 store_file.write('labels=board%3Acoral,hw_video_acc_vp9,cros,'+ 857 'hw_jpeg_acc_dec,bluetooth,model%3Arobo360,'+ 858 'accel%3Acros-ec,'+ 859 'sku%3Arobo360_IntelR_CeleronR_CPU_N3450_1_10GHz_4Gb') 860 861 # .autoserv_execute file is needed for the test results package to look 862 # legit. 863 autoserve_path = os.path.join(host_folder, '.autoserv_execute') 864 with open(autoserve_path, 'w') as temp_file: 865 temp_file.write(' ') 866 867 # Test results in cts_result_folder with a different time-stamp. 868 timestamp_str_2 = '2016.04.28_10.41.44' 869 timestamp_cts_folder_2 = os.path.join(cts_result_folder, timestamp_str_2) 870 871 for folder in [debug_folder, sysinfo_folder, cts_result_folder, 872 timestamp_cts_folder, timestamp_cts_folder_2, 873 timestamp_cts_v2_folder, timestamp_gts_folder]: 874 os.makedirs(folder) 875 876 path_pattern_pair = [(timestamp_cts_folder, gs_offloader.CTS_RESULT_PATTERN), 877 (timestamp_cts_folder_2, gs_offloader.CTS_RESULT_PATTERN), 878 (timestamp_cts_folder_2, gs_offloader.CTS_COMPRESSED_RESULT_PATTERN), 879 (timestamp_cts_v2_folder, gs_offloader.CTS_V2_RESULT_PATTERN), 880 (timestamp_cts_v2_folder, gs_offloader.CTS_V2_COMPRESSED_RESULT_PATTERN), 881 (timestamp_gts_folder, gs_offloader.CTS_V2_RESULT_PATTERN)] 882 883 # Create timestamp.zip file_path. 884 cts_zip_file = os.path.join(cts_result_folder, timestamp_str + '.zip') 885 cts_zip_file_2 = os.path.join(cts_result_folder, timestamp_str_2 + '.zip') 886 cts_v2_zip_file = os.path.join(cts_v2_result_folder, timestamp_str + '.zip') 887 gts_zip_file = os.path.join(gts_result_folder, timestamp_str + '.zip') 888 889 # Create xml file_path. 890 cts_result_file = os.path.join(timestamp_cts_folder, 'testResult.xml') 891 cts_result_file_2 = os.path.join(timestamp_cts_folder_2, 892 'testResult.xml') 893 cts_result_compressed_file_2 = os.path.join(timestamp_cts_folder_2, 894 'testResult.xml.tgz') 895 gts_result_file = os.path.join(timestamp_gts_folder, 'test_result.xml') 896 cts_v2_result_file = os.path.join(timestamp_cts_v2_folder, 897 'test_result.xml') 898 cts_v2_result_compressed_file = os.path.join(timestamp_cts_v2_folder, 899 'test_result.xml.tgz') 900 901 for file_path in [cts_zip_file, cts_zip_file_2, cts_v2_zip_file, 902 gts_zip_file, cts_result_file, cts_result_file_2, 903 cts_result_compressed_file_2, gts_result_file, 904 cts_v2_result_file, cts_v2_result_compressed_file]: 905 if file_path.endswith('tgz'): 906 test_result_file = gs_offloader.CTS_COMPRESSED_RESULT_TYPES[ 907 os.path.basename(file_path)] 908 with open(test_result_file, 'w') as f: 909 f.write('test') 910 with tarfile.open(file_path, 'w:gz') as tar_file: 911 tar_file.add(test_result_file) 912 os.remove(test_result_file) 913 else: 914 with open(file_path, 'w') as f: 915 f.write('test') 916 917 return (results_folder, host_folder, path_pattern_pair) 918 919 920 def test__upload_cts_testresult(self): 921 """Test _upload_cts_testresult.""" 922 results_folder, host_folder, path_pattern_pair = self.create_results_folder() 923 924 self.mox.StubOutWithMock(gs_offloader, '_upload_files') 925 gs_offloader._upload_files( 926 mox.IgnoreArg(), mox.IgnoreArg(), mox.IgnoreArg(), False, 927 mox.IgnoreArg(), mox.IgnoreArg()).AndReturn( 928 ['test', '-d', host_folder]) 929 gs_offloader._upload_files( 930 mox.IgnoreArg(), mox.IgnoreArg(), mox.IgnoreArg(), False, 931 mox.IgnoreArg(), mox.IgnoreArg()).AndReturn( 932 ['test', '-d', host_folder]) 933 gs_offloader._upload_files( 934 mox.IgnoreArg(), mox.IgnoreArg(), mox.IgnoreArg(), False, 935 mox.IgnoreArg(), mox.IgnoreArg()).AndReturn( 936 ['test', '-d', host_folder]) 937 938 self.mox.ReplayAll() 939 gs_offloader._upload_cts_testresult(results_folder, False) 940 self.mox.VerifyAll() 941 shutil.rmtree(results_folder) 942 943 944 def test_parse_cts_job_results_file_path(self): 945 # A autotest path 946 path = ('/317739475-chromeos-test/chromeos4-row9-rack11-host22/' 947 'cheets_CTS.android.dpi/results/cts-results/' 948 '2016.04.28_01.41.44') 949 job_id, package, timestamp = \ 950 gs_offloader._parse_cts_job_results_file_path(path) 951 self.assertEqual('317739475-chromeos-test', job_id) 952 self.assertEqual('cheets_CTS.android.dpi', package) 953 self.assertEqual('2016.04.28_01.41.44', timestamp) 954 955 956 # A skylab path 957 path = ('/swarming-458e3a3a7fc6f210/1/autoserv_test/' 958 'cheets_CTS.android.dpi/results/cts-results/' 959 '2016.04.28_01.41.44') 960 job_id, package, timestamp = \ 961 gs_offloader._parse_cts_job_results_file_path(path) 962 self.assertEqual('swarming-458e3a3a7fc6f210-1', job_id) 963 self.assertEqual('cheets_CTS.android.dpi', package) 964 self.assertEqual('2016.04.28_01.41.44', timestamp) 965 966 967 def test_upload_files(self): 968 """Test upload_files""" 969 results_folder, host_folder, path_pattern_pair = self.create_results_folder() 970 971 for path, pattern in path_pattern_pair: 972 models.test.parse_job_keyval(mox.IgnoreArg()).AndReturn({ 973 'build': 'veyron_minnie-cheets-release/R52-8248.0.0', 974 'hostname': 'chromeos4-row9-rack11-host22', 975 'parent_job_id': 'p_id', 976 'suite': 'arc-cts' 977 }) 978 979 gs_offloader._get_cmd_list( 980 False, mox.IgnoreArg(), mox.IgnoreArg()).AndReturn( 981 ['test', '-d', path]) 982 gs_offloader._get_cmd_list( 983 False, mox.IgnoreArg(), mox.IgnoreArg()).AndReturn( 984 ['test', '-d', path]) 985 986 self.mox.ReplayAll() 987 gs_offloader._upload_files(host_folder, path, pattern, False, 988 'gs://a-test-bucket/', 989 'gs://a-test-apfe-bucket/') 990 self.mox.VerifyAll() 991 self.mox.ResetAll() 992 993 shutil.rmtree(results_folder) 994 995 996 def test_get_metrics_fields(self): 997 """Test method _get_metrics_fields.""" 998 results_folder, host_folder, _ = self.create_results_folder() 999 models.test.parse_job_keyval(mox.IgnoreArg()).AndReturn({ 1000 'build': 'veyron_minnie-cheets-release/R52-8248.0.0', 1001 'parent_job_id': 'p_id', 1002 'suite': 'arc-cts' 1003 }) 1004 try: 1005 self.mox.ReplayAll() 1006 self.assertEqual({'board': 'veyron_minnie-cheets', 1007 'milestone': 'R52'}, 1008 gs_offloader._get_metrics_fields(host_folder)) 1009 self.mox.VerifyAll() 1010 finally: 1011 shutil.rmtree(results_folder) 1012 1013 1014class OffladerConfigTests(_TempResultsDirTestBase): 1015 """Tests for the `Offloader` to follow side_effect config.""" 1016 1017 def setUp(self): 1018 super(OffladerConfigTests, self).setUp() 1019 gs_offloader.GS_OFFLOADING_ENABLED = True 1020 gs_offloader.GS_OFFLOADER_MULTIPROCESSING = True 1021 self.dest_path = '/results' 1022 self.mox.StubOutWithMock(gs_offloader, '_get_metrics_fields') 1023 self.mox.StubOutWithMock(gs_offloader, '_OffloadError') 1024 self.mox.StubOutWithMock(gs_offloader, '_upload_cts_testresult') 1025 self.mox.StubOutWithMock(gs_offloader, '_emit_offload_metrics') 1026 self.mox.StubOutWithMock(gs_offloader, '_get_cmd_list') 1027 self.mox.StubOutWithMock(subprocess, 'Popen') 1028 self.mox.StubOutWithMock(gs_offloader, '_emit_gs_returncode_metric') 1029 1030 1031 def _run(self, results_dir, gs_bucket, expect_dest, cts_enabled): 1032 stdout = os.path.join(results_dir, 'std.log') 1033 stderr = os.path.join(results_dir, 'std.err') 1034 config = { 1035 'tko': { 1036 'proxy_socket': '/file-system/foo-socket', 1037 'mysql_user': 'foo-user', 1038 'mysql_password_file': '/file-system/foo-password-file' 1039 }, 1040 'google_storage': { 1041 'bucket': gs_bucket, 1042 'credentials_file': '/foo-creds' 1043 }, 1044 'cts': { 1045 'enabled': cts_enabled, 1046 }, 1047 'this_field_is_ignored': True 1048 } 1049 path = os.path.join(results_dir, 'side_effects_config.json') 1050 with open(path, 'w') as f: 1051 f.write(json.dumps(config)) 1052 gs_offloader._get_metrics_fields(results_dir) 1053 if cts_enabled: 1054 gs_offloader._upload_cts_testresult(results_dir, True) 1055 gs_offloader._get_cmd_list( 1056 True, 1057 mox.IgnoreArg(), 1058 expect_dest).AndReturn(['test', '-d', expect_dest]) 1059 subprocess.Popen(mox.IgnoreArg(), 1060 stdout=stdout, 1061 stderr=stderr).AndReturn(_get_fake_process()) 1062 gs_offloader._OffloadError(mox.IgnoreArg()) 1063 gs_offloader._emit_gs_returncode_metric(mox.IgnoreArg()).AndReturn(True) 1064 gs_offloader._emit_offload_metrics(mox.IgnoreArg()).AndReturn(True) 1065 sub_offloader = gs_offloader.GSOffloader(results_dir, True, 0, None) 1066 subprocess.Popen(mox.IgnoreArg(), 1067 stdout=stdout, 1068 stderr=stderr).AndReturn(_get_fake_process()) 1069 self.mox.ReplayAll() 1070 sub_offloader._try_offload(results_dir, self.dest_path, stdout, stderr) 1071 self.mox.VerifyAll() 1072 self.mox.ResetAll() 1073 shutil.rmtree(results_dir) 1074 1075 1076 def test_upload_files_to_dev(self): 1077 """Test upload results to dev gs bucket and skip cts uploading.""" 1078 res = tempfile.mkdtemp() 1079 gs_bucket = 'dev-bucket' 1080 expect_dest = 'gs://' + gs_bucket + self.dest_path 1081 self._run(res, gs_bucket, expect_dest, False) 1082 1083 1084 def test_upload_files_prod(self): 1085 """Test upload results to the prod gs bucket and also upload to cts.""" 1086 res = tempfile.mkdtemp() 1087 gs_bucket = 'prod-bucket' 1088 expect_dest = 'gs://' + gs_bucket + self.dest_path 1089 self._run(res, gs_bucket, expect_dest, True) 1090 1091 1092 def test_skip_gs_prefix(self): 1093 """Test skip the 'gs://' prefix if already presented.""" 1094 res = tempfile.mkdtemp() 1095 gs_bucket = 'gs://prod-bucket' 1096 expect_dest = gs_bucket + self.dest_path 1097 self._run(res, gs_bucket, expect_dest, True) 1098 1099 1100class JobDirectoryOffloadTests(_TempResultsDirTestBase): 1101 """Tests for `_JobDirectory.enqueue_offload()`. 1102 1103 When testing with a `days_old` parameter of 0, we use 1104 `set_finished()` instead of `set_expired()`. This causes the 1105 job's timestamp to be set in the future. This is done so as 1106 to test that when `days_old` is 0, the job is always treated 1107 as eligible for offload, regardless of the timestamp's value. 1108 1109 Testing covers the following assertions: 1110 A. Each time `enqueue_offload()` is called, a message that 1111 includes the job's directory name will be logged using 1112 `logging.debug()`, regardless of whether the job was 1113 enqueued. Nothing else is allowed to be logged. 1114 B. If the job is not eligible to be offloaded, 1115 `first_offload_start` and `offload_count` are 0. 1116 C. If the job is not eligible for offload, nothing is 1117 enqueued in `queue`. 1118 D. When the job is offloaded, `offload_count` increments 1119 each time. 1120 E. When the job is offloaded, the appropriate parameters are 1121 enqueued exactly once. 1122 F. The first time a job is offloaded, `first_offload_start` is 1123 set to the current time. 1124 G. `first_offload_start` only changes the first time that the 1125 job is offloaded. 1126 1127 The test cases below are designed to exercise all of the 1128 meaningful state transitions at least once. 1129 1130 """ 1131 1132 def setUp(self): 1133 super(JobDirectoryOffloadTests, self).setUp() 1134 self._job = self.make_job(self.REGULAR_JOBLIST[0]) 1135 self._queue = Queue.Queue() 1136 1137 1138 def _offload_unexpired_job(self, days_old): 1139 """Make calls to `enqueue_offload()` for an unexpired job. 1140 1141 This method tests assertions B and C that calling 1142 `enqueue_offload()` has no effect. 1143 1144 """ 1145 self.assertEqual(self._job.offload_count, 0) 1146 self.assertEqual(self._job.first_offload_start, 0) 1147 gs_offloader._enqueue_offload(self._job, self._queue, days_old) 1148 gs_offloader._enqueue_offload(self._job, self._queue, days_old) 1149 self.assertTrue(self._queue.empty()) 1150 self.assertEqual(self._job.offload_count, 0) 1151 self.assertEqual(self._job.first_offload_start, 0) 1152 1153 1154 def _offload_expired_once(self, days_old, count): 1155 """Make one call to `enqueue_offload()` for an expired job. 1156 1157 This method tests assertions D and E regarding side-effects 1158 expected when a job is offloaded. 1159 1160 """ 1161 gs_offloader._enqueue_offload(self._job, self._queue, days_old) 1162 self.assertEqual(self._job.offload_count, count) 1163 self.assertFalse(self._queue.empty()) 1164 v = self._queue.get_nowait() 1165 self.assertTrue(self._queue.empty()) 1166 self.assertEqual(v, self._job.queue_args) 1167 1168 1169 def _offload_expired_job(self, days_old): 1170 """Make calls to `enqueue_offload()` for a just-expired job. 1171 1172 This method directly tests assertions F and G regarding 1173 side-effects on `first_offload_start`. 1174 1175 """ 1176 t0 = time.time() 1177 self._offload_expired_once(days_old, 1) 1178 t1 = self._job.first_offload_start 1179 self.assertLessEqual(t1, time.time()) 1180 self.assertGreaterEqual(t1, t0) 1181 self._offload_expired_once(days_old, 2) 1182 self.assertEqual(self._job.first_offload_start, t1) 1183 self._offload_expired_once(days_old, 3) 1184 self.assertEqual(self._job.first_offload_start, t1) 1185 1186 1187 def test_case_1_no_expiration(self): 1188 """Test a series of `enqueue_offload()` calls with `days_old` of 0. 1189 1190 This tests that offload works as expected if calls are 1191 made both before and after the job becomes expired. 1192 1193 """ 1194 self._offload_unexpired_job(0) 1195 self._job.set_finished(0) 1196 self._offload_expired_job(0) 1197 1198 1199 def test_case_2_no_expiration(self): 1200 """Test a series of `enqueue_offload()` calls with `days_old` of 0. 1201 1202 This tests that offload works as expected if calls are made 1203 only after the job becomes expired. 1204 1205 """ 1206 self._job.set_finished(0) 1207 self._offload_expired_job(0) 1208 1209 1210 def test_case_1_with_expiration(self): 1211 """Test a series of `enqueue_offload()` calls with `days_old` non-zero. 1212 1213 This tests that offload works as expected if calls are made 1214 before the job finishes, before the job expires, and after 1215 the job expires. 1216 1217 """ 1218 self._offload_unexpired_job(_TEST_EXPIRATION_AGE) 1219 self._job.set_finished(_TEST_EXPIRATION_AGE) 1220 self._offload_unexpired_job(_TEST_EXPIRATION_AGE) 1221 self._job.set_expired(_TEST_EXPIRATION_AGE) 1222 self._offload_expired_job(_TEST_EXPIRATION_AGE) 1223 1224 1225 def test_case_2_with_expiration(self): 1226 """Test a series of `enqueue_offload()` calls with `days_old` non-zero. 1227 1228 This tests that offload works as expected if calls are made 1229 between finishing and expiration, and after the job expires. 1230 1231 """ 1232 self._job.set_finished(_TEST_EXPIRATION_AGE) 1233 self._offload_unexpired_job(_TEST_EXPIRATION_AGE) 1234 self._job.set_expired(_TEST_EXPIRATION_AGE) 1235 self._offload_expired_job(_TEST_EXPIRATION_AGE) 1236 1237 1238 def test_case_3_with_expiration(self): 1239 """Test a series of `enqueue_offload()` calls with `days_old` non-zero. 1240 1241 This tests that offload works as expected if calls are made 1242 only before finishing and after expiration. 1243 1244 """ 1245 self._offload_unexpired_job(_TEST_EXPIRATION_AGE) 1246 self._job.set_expired(_TEST_EXPIRATION_AGE) 1247 self._offload_expired_job(_TEST_EXPIRATION_AGE) 1248 1249 1250 def test_case_4_with_expiration(self): 1251 """Test a series of `enqueue_offload()` calls with `days_old` non-zero. 1252 1253 This tests that offload works as expected if calls are made 1254 only after expiration. 1255 1256 """ 1257 self._job.set_expired(_TEST_EXPIRATION_AGE) 1258 self._offload_expired_job(_TEST_EXPIRATION_AGE) 1259 1260 1261class GetJobDirectoriesTests(_TempResultsDirTestBase): 1262 """Tests for `_JobDirectory.get_job_directories()`.""" 1263 1264 def setUp(self): 1265 super(GetJobDirectoriesTests, self).setUp() 1266 self.make_job_hierarchy() 1267 os.mkdir('not-a-job') 1268 open('not-a-dir', 'w').close() 1269 1270 1271 def _run_get_directories(self, cls, expected_list): 1272 """Test `get_job_directories()` for the given class. 1273 1274 Calls the method, and asserts that the returned list of 1275 directories matches the expected return value. 1276 1277 @param expected_list Expected return value from the call. 1278 """ 1279 dirlist = cls.get_job_directories() 1280 self.assertEqual(set(dirlist), set(expected_list)) 1281 1282 1283 def test_get_regular_jobs(self): 1284 """Test `RegularJobDirectory.get_job_directories()`.""" 1285 self._run_get_directories(job_directories.RegularJobDirectory, 1286 self.REGULAR_JOBLIST) 1287 1288 1289 def test_get_special_jobs(self): 1290 """Test `SpecialJobDirectory.get_job_directories()`.""" 1291 self._run_get_directories(job_directories.SpecialJobDirectory, 1292 self.SPECIAL_JOBLIST) 1293 1294 1295class AddJobsTests(_TempResultsDirTestBase): 1296 """Tests for `Offloader._add_new_jobs()`.""" 1297 1298 MOREJOBS = ['115-fubar', '116-fubar', '117-fubar', '118-snafu'] 1299 1300 def setUp(self): 1301 super(AddJobsTests, self).setUp() 1302 self._initial_job_names = ( 1303 set(self.REGULAR_JOBLIST) | set(self.SPECIAL_JOBLIST)) 1304 self.make_job_hierarchy() 1305 self._offloader = gs_offloader.Offloader(_get_options(['-a'])) 1306 self.mox.StubOutWithMock(logging, 'debug') 1307 1308 1309 def _run_add_new_jobs(self, expected_key_set): 1310 """Basic test assertions for `_add_new_jobs()`. 1311 1312 Asserts the following: 1313 * The keys in the offloader's `_open_jobs` dictionary 1314 matches the expected set of keys. 1315 * For every job in `_open_jobs`, the job has the expected 1316 directory name. 1317 1318 """ 1319 count = len(expected_key_set) - len(self._offloader._open_jobs) 1320 logging.debug(mox.IgnoreArg(), count) 1321 self.mox.ReplayAll() 1322 self._offloader._add_new_jobs() 1323 self.assertEqual(expected_key_set, 1324 set(self._offloader._open_jobs.keys())) 1325 for jobkey, job in self._offloader._open_jobs.items(): 1326 self.assertEqual(jobkey, job.dirname) 1327 self.mox.VerifyAll() 1328 self.mox.ResetAll() 1329 1330 1331 def test_add_jobs_empty(self): 1332 """Test adding jobs to an empty dictionary. 1333 1334 Calls the offloader's `_add_new_jobs()`, then perform 1335 the assertions of `self._check_open_jobs()`. 1336 1337 """ 1338 self._run_add_new_jobs(self._initial_job_names) 1339 1340 1341 def test_add_jobs_non_empty(self): 1342 """Test adding jobs to a non-empty dictionary. 1343 1344 Calls the offloader's `_add_new_jobs()` twice; once from 1345 initial conditions, and then again after adding more 1346 directories. After the second call, perform the assertions 1347 of `self._check_open_jobs()`. Additionally, assert that 1348 keys added by the first call still map to their original 1349 job object after the second call. 1350 1351 """ 1352 self._run_add_new_jobs(self._initial_job_names) 1353 jobs_copy = self._offloader._open_jobs.copy() 1354 for d in self.MOREJOBS: 1355 os.mkdir(d) 1356 self._run_add_new_jobs(self._initial_job_names | 1357 set(self.MOREJOBS)) 1358 for key in jobs_copy.keys(): 1359 self.assertIs(jobs_copy[key], 1360 self._offloader._open_jobs[key]) 1361 1362 1363class ReportingTests(_TempResultsDirTestBase): 1364 """Tests for `Offloader._report_failed_jobs()`.""" 1365 1366 def setUp(self): 1367 super(ReportingTests, self).setUp() 1368 self._offloader = gs_offloader.Offloader(_get_options([])) 1369 self.mox.StubOutWithMock(self._offloader, '_log_failed_jobs_locally') 1370 self.mox.StubOutWithMock(logging, 'debug') 1371 1372 1373 def _add_job(self, jobdir): 1374 """Add a job to the dictionary of unfinished jobs.""" 1375 j = self.make_job(jobdir) 1376 self._offloader._open_jobs[j.dirname] = j 1377 return j 1378 1379 1380 def _expect_log_message(self, new_open_jobs, with_failures): 1381 """Mock expected logging calls. 1382 1383 `_report_failed_jobs()` logs one message with the number 1384 of jobs removed from the open job set and the number of jobs 1385 still remaining. Additionally, if there are reportable 1386 jobs, then it logs the number of jobs that haven't yet 1387 offloaded. 1388 1389 This sets up the logging calls using `new_open_jobs` to 1390 figure the job counts. If `with_failures` is true, then 1391 the log message is set up assuming that all jobs in 1392 `new_open_jobs` have offload failures. 1393 1394 @param new_open_jobs New job set for calculating counts 1395 in the messages. 1396 @param with_failures Whether the log message with a 1397 failure count is expected. 1398 1399 """ 1400 count = len(self._offloader._open_jobs) - len(new_open_jobs) 1401 logging.debug(mox.IgnoreArg(), count, len(new_open_jobs)) 1402 if with_failures: 1403 logging.debug(mox.IgnoreArg(), len(new_open_jobs)) 1404 1405 1406 def _run_update(self, new_open_jobs): 1407 """Call `_report_failed_jobs()`. 1408 1409 Initial conditions are set up by the caller. This calls 1410 `_report_failed_jobs()` once, and then checks these 1411 assertions: 1412 * The offloader's new `_open_jobs` field contains only 1413 the entries in `new_open_jobs`. 1414 1415 @param new_open_jobs A dictionary representing the expected 1416 new value of the offloader's 1417 `_open_jobs` field. 1418 """ 1419 self.mox.ReplayAll() 1420 self._offloader._report_failed_jobs() 1421 self._offloader._remove_offloaded_jobs() 1422 self.assertEqual(self._offloader._open_jobs, new_open_jobs) 1423 self.mox.VerifyAll() 1424 self.mox.ResetAll() 1425 1426 1427 def _expect_failed_jobs(self, failed_jobs): 1428 """Mock expected call to log the failed jobs on local disk. 1429 1430 TODO(crbug.com/686904): The fact that we have to mock an internal 1431 function for this test is evidence that we need to pull out the local 1432 file formatter in its own object in a future CL. 1433 1434 @param failed_jobs: The list of jobs being logged as failed. 1435 """ 1436 self._offloader._log_failed_jobs_locally(failed_jobs) 1437 1438 1439 def test_no_jobs(self): 1440 """Test `_report_failed_jobs()` with no open jobs. 1441 1442 Initial conditions are an empty `_open_jobs` list. 1443 Expected result is an empty `_open_jobs` list. 1444 1445 """ 1446 self._expect_log_message({}, False) 1447 self._expect_failed_jobs([]) 1448 self._run_update({}) 1449 1450 1451 def test_all_completed(self): 1452 """Test `_report_failed_jobs()` with only complete jobs. 1453 1454 Initial conditions are an `_open_jobs` list consisting of only completed 1455 jobs. 1456 Expected result is an empty `_open_jobs` list. 1457 1458 """ 1459 for d in self.REGULAR_JOBLIST: 1460 self._add_job(d).set_complete() 1461 self._expect_log_message({}, False) 1462 self._expect_failed_jobs([]) 1463 self._run_update({}) 1464 1465 1466 def test_none_finished(self): 1467 """Test `_report_failed_jobs()` with only unfinished jobs. 1468 1469 Initial conditions are an `_open_jobs` list consisting of only 1470 unfinished jobs. 1471 Expected result is no change to the `_open_jobs` list. 1472 1473 """ 1474 for d in self.REGULAR_JOBLIST: 1475 self._add_job(d) 1476 new_jobs = self._offloader._open_jobs.copy() 1477 self._expect_log_message(new_jobs, False) 1478 self._expect_failed_jobs([]) 1479 self._run_update(new_jobs) 1480 1481 1482class GsOffloaderMockTests(_TempResultsDirTestCase): 1483 """Tests using mock instead of mox.""" 1484 1485 def setUp(self): 1486 super(GsOffloaderMockTests, self).setUp() 1487 alarm = mock.patch('signal.alarm', return_value=0) 1488 alarm.start() 1489 self.addCleanup(alarm.stop) 1490 1491 self._saved_loglevel = logging.getLogger().getEffectiveLevel() 1492 logging.getLogger().setLevel(logging.CRITICAL + 1) 1493 1494 self._job = self.make_job(self.REGULAR_JOBLIST[0]) 1495 1496 1497 def test_offload_timeout_early(self): 1498 """Test that `offload_dir()` times out correctly. 1499 1500 This test triggers timeout at the earliest possible moment, 1501 at the first call to set the timeout alarm. 1502 1503 """ 1504 signal.alarm.side_effect = [0, timeout_util.TimeoutError('fubar')] 1505 with mock.patch.object(gs_offloader, '_upload_cts_testresult', 1506 autospec=True) as upload: 1507 upload.return_value = None 1508 gs_offloader.GSOffloader( 1509 utils.DEFAULT_OFFLOAD_GSURI, False, 0).offload( 1510 self._job.queue_args[0], 1511 self._job.queue_args[1], 1512 self._job.queue_args[2]) 1513 self.assertTrue(os.path.isdir(self._job.queue_args[0])) 1514 1515 1516 # TODO(ayatane): This tests passes when run locally, but it fails 1517 # when run on trybot. I have no idea why, but the assert isdir 1518 # fails. 1519 # 1520 # This test is also kind of redundant since we are using the timeout 1521 # from chromite which has its own tests. 1522 @unittest.skip('This fails on trybot') 1523 def test_offload_timeout_late(self): 1524 """Test that `offload_dir()` times out correctly. 1525 1526 This test triggers timeout at the latest possible moment, at 1527 the call to clear the timeout alarm. 1528 1529 """ 1530 signal.alarm.side_effect = [0, 0, timeout_util.TimeoutError('fubar')] 1531 with mock.patch.object(gs_offloader, '_upload_cts_testresult', 1532 autospec=True) as upload, \ 1533 mock.patch.object(gs_offloader, '_get_cmd_list', 1534 autospec=True) as get_cmd_list: 1535 upload.return_value = None 1536 get_cmd_list.return_value = ['test', '-d', self._job.queue_args[0]] 1537 gs_offloader.GSOffloader( 1538 utils.DEFAULT_OFFLOAD_GSURI, False, 0).offload( 1539 self._job.queue_args[0], 1540 self._job.queue_args[1], 1541 self._job.queue_args[2]) 1542 self.assertTrue(os.path.isdir(self._job.queue_args[0])) 1543 1544 1545 1546if __name__ == '__main__': 1547 unittest.main() 1548