1# -*- coding: utf-8 -*-
2# Copyright 2013 Google Inc. All Rights Reserved.
3#
4# Permission is hereby granted, free of charge, to any person obtaining a
5# copy of this software and associated documentation files (the
6# "Software"), to deal in the Software without restriction, including
7# without limitation the rights to use, copy, modify, merge, publish, dis-
8# tribute, sublicense, and/or sell copies of the Software, and to permit
9# persons to whom the Software is furnished to do so, subject to the fol-
10# lowing conditions:
11#
12# The above copyright notice and this permission notice shall be included
13# in all copies or substantial portions of the Software.
14#
15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
16# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
17# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
18# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
19# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
21# IN THE SOFTWARE.
22"""Unit tests for gsutil parallelism framework."""
23
24from __future__ import absolute_import
25
26import functools
27import os
28import signal
29import threading
30import time
31
32from boto.storage_uri import BucketStorageUri
33from gslib import cs_api_map
34from gslib.command import Command
35from gslib.command import CreateGsutilLogger
36from gslib.command import DummyArgChecker
37import gslib.tests.testcase as testcase
38from gslib.tests.testcase.base import RequiresIsolation
39from gslib.tests.util import unittest
40from gslib.util import CheckMultiprocessingAvailableAndInit
41from gslib.util import IS_WINDOWS
42
43
44# Amount of time for an individual test to run before timing out. We need a
45# reasonably high value since if many tests are running in parallel, an
46# individual test may take a while to complete.
47_TEST_TIMEOUT_SECONDS = 120
48
49
50def Timeout(func):
51  """Decorator used to provide a timeout for functions."""
52  @functools.wraps(func)
53  def Wrapper(*args, **kwargs):
54    if not IS_WINDOWS:
55      signal.signal(signal.SIGALRM, _HandleAlarm)
56      signal.alarm(_TEST_TIMEOUT_SECONDS)
57    try:
58      func(*args, **kwargs)
59    finally:
60      if not IS_WINDOWS:
61        signal.alarm(0)  # Cancel the alarm.
62  return Wrapper
63
64
65# pylint: disable=unused-argument
66def _HandleAlarm(signal_num, cur_stack_frame):
67  raise Exception('Test timed out.')
68
69
70class CustomException(Exception):
71
72  def __init__(self, exception_str):
73    super(CustomException, self).__init__(exception_str)
74
75
76def _ReturnOneValue(cls, args, thread_state=None):
77  return 1
78
79
80def _ReturnProcAndThreadId(cls, args, thread_state=None):
81  return (os.getpid(), threading.currentThread().ident)
82
83
84def _SleepThenReturnProcAndThreadId(cls, args, thread_state=None):
85  # This can fail if the total time to spawn new processes and threads takes
86  # longer than 5 seconds, but if that occurs, then we have a performance
87  # problem that needs to be addressed.
88  time.sleep(5)
89  return _ReturnProcAndThreadId(cls, args, thread_state=thread_state)
90
91
92def _FailureFunc(cls, args, thread_state=None):
93  raise CustomException('Failing on purpose.')
94
95
96def _FailingExceptionHandler(cls, e):
97  cls.failure_count += 1
98  raise CustomException('Exception handler failing on purpose.')
99
100
101def _ExceptionHandler(cls, e):
102  cls.logger.exception(e)
103  cls.failure_count += 1
104
105
106def _IncrementByLength(cls, args, thread_state=None):
107  cls.arg_length_sum += len(args)
108
109
110def _AdjustProcessCountIfWindows(process_count):
111  if IS_WINDOWS:
112    return 1
113  else:
114    return process_count
115
116
117def _ReApplyWithReplicatedArguments(cls, args, thread_state=None):
118  """Calls Apply with arguments repeated seven times.
119
120  The first two elements of args should be the process and thread counts,
121  respectively, to be used for the recursive calls.
122
123  Args:
124    cls: The Command class to call Apply on.
125    args: Arguments to pass to Apply.
126    thread_state: Unused, required by function signature.
127
128  Returns:
129    Number of values returned by the two calls to Apply.
130  """
131  new_args = [args] * 7
132  process_count = _AdjustProcessCountIfWindows(args[0])
133  thread_count = args[1]
134  return_values = cls.Apply(_PerformNRecursiveCalls, new_args,
135                            _ExceptionHandler, arg_checker=DummyArgChecker,
136                            process_count=process_count,
137                            thread_count=thread_count,
138                            should_return_results=True)
139  ret = sum(return_values)
140
141  return_values = cls.Apply(_ReturnOneValue, new_args,
142                            _ExceptionHandler, arg_checker=DummyArgChecker,
143                            process_count=process_count,
144                            thread_count=thread_count,
145                            should_return_results=True)
146
147  return len(return_values) + ret
148
149
150def _PerformNRecursiveCalls(cls, args, thread_state=None):
151  """Calls Apply to perform N recursive calls.
152
153  The first two elements of args should be the process and thread counts,
154  respectively, to be used for the recursive calls, while N is the third element
155  (the number of recursive calls to make).
156
157  Args:
158    cls: The Command class to call Apply on.
159    args: Arguments to pass to Apply.
160    thread_state: Unused, required by function signature.
161
162  Returns:
163    Number of values returned by the call to Apply.
164  """
165  process_count = _AdjustProcessCountIfWindows(args[0])
166  thread_count = args[1]
167  return_values = cls.Apply(_ReturnOneValue, [()] * args[2], _ExceptionHandler,
168                            arg_checker=DummyArgChecker,
169                            process_count=process_count,
170                            thread_count=thread_count,
171                            should_return_results=True)
172  return len(return_values)
173
174
175def _SkipEvenNumbersArgChecker(cls, arg):
176  return arg % 2 != 0
177
178
179class FailingIterator(object):
180
181  def __init__(self, size, failure_indices):
182    self.size = size
183    self.failure_indices = failure_indices
184    self.current_index = 0
185
186  def __iter__(self):
187    return self
188
189  def next(self):
190    if self.current_index == self.size:
191      raise StopIteration('')
192    elif self.current_index in self.failure_indices:
193      self.current_index += 1
194      raise CustomException(
195          'Iterator failing on purpose at index %d.' % self.current_index)
196    else:
197      self.current_index += 1
198      return self.current_index - 1
199
200
201class FakeCommand(Command):
202  """Fake command class for overriding command instance state."""
203  command_spec = Command.CreateCommandSpec(
204      'fake',
205      command_name_aliases=[],
206  )
207  # Help specification. See help_provider.py for documentation.
208  help_spec = Command.HelpSpec(
209      help_name='fake',
210      help_name_aliases=[],
211      help_type='command_help',
212      help_one_line_summary='Something to take up space.',
213      help_text='Something else to take up space.',
214      subcommand_help_text={},
215  )
216
217  def __init__(self, do_parallel):
218    self.bucket_storage_uri_class = BucketStorageUri
219    support_map = {
220        'gs': ['JSON'],
221        's3': ['XML']
222    }
223    default_map = {
224        'gs': 'JSON',
225        's3': 'XML'
226    }
227    self.gsutil_api_map = cs_api_map.GsutilApiMapFactory.GetApiMap(
228        cs_api_map.GsutilApiClassMapFactory, support_map, default_map)
229    self.logger = CreateGsutilLogger('FakeCommand')
230    self.parallel_operations = do_parallel
231    self.failure_count = 0
232    self.multiprocessing_is_available = (
233        CheckMultiprocessingAvailableAndInit().is_available)
234    self.debug = 0
235
236
237class FakeCommandWithoutMultiprocessingModule(FakeCommand):
238
239  def __init__(self, do_parallel):
240    super(FakeCommandWithoutMultiprocessingModule, self).__init__(do_parallel)
241    self.multiprocessing_is_available = False
242
243
244# TODO: Figure out a good way to test that ctrl+C really stops execution,
245#       and also that ctrl+C works when there are still tasks enqueued.
246class TestParallelismFramework(testcase.GsUtilUnitTestCase):
247  """gsutil parallelism framework test suite."""
248
249  command_class = FakeCommand
250
251  def _RunApply(self, func, args_iterator, process_count, thread_count,
252                command_inst=None, shared_attrs=None, fail_on_error=False,
253                thr_exc_handler=None, arg_checker=DummyArgChecker):
254    command_inst = command_inst or self.command_class(True)
255    exception_handler = thr_exc_handler or _ExceptionHandler
256
257    return command_inst.Apply(func, args_iterator, exception_handler,
258                              thread_count=thread_count,
259                              process_count=process_count,
260                              arg_checker=arg_checker,
261                              should_return_results=True,
262                              shared_attrs=shared_attrs,
263                              fail_on_error=fail_on_error)
264
265  @RequiresIsolation
266  def testBasicApplySingleProcessSingleThread(self):
267    self._TestBasicApply(1, 1)
268
269  @RequiresIsolation
270  def testBasicApplySingleProcessMultiThread(self):
271    self._TestBasicApply(1, 3)
272
273  @RequiresIsolation
274  @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
275  def testBasicApplyMultiProcessSingleThread(self):
276    self._TestBasicApply(3, 1)
277
278  @RequiresIsolation
279  @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
280  def testBasicApplyMultiProcessMultiThread(self):
281    self._TestBasicApply(3, 3)
282
283  @Timeout
284  def _TestBasicApply(self, process_count, thread_count):
285    args = [()] * (17 * process_count * thread_count + 1)
286
287    results = self._RunApply(_ReturnOneValue, args, process_count, thread_count)
288    self.assertEqual(len(args), len(results))
289
290  @RequiresIsolation
291  def testNoTasksSingleProcessSingleThread(self):
292    self._TestApplyWithNoTasks(1, 1)
293
294  @RequiresIsolation
295  def testNoTasksSingleProcessMultiThread(self):
296    self._TestApplyWithNoTasks(1, 3)
297
298  @RequiresIsolation
299  @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
300  def testNoTasksMultiProcessSingleThread(self):
301    self._TestApplyWithNoTasks(3, 1)
302
303  @RequiresIsolation
304  @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
305  def testNoTasksMultiProcessMultiThread(self):
306    self._TestApplyWithNoTasks(3, 3)
307
308  @Timeout
309  def _TestApplyWithNoTasks(self, process_count, thread_count):
310    """Tests that calling Apply with no tasks releases locks/semaphores."""
311    empty_args = [()]
312
313    for _ in range(process_count * thread_count + 1):
314      self._RunApply(_ReturnOneValue, empty_args, process_count, thread_count)
315
316    # Ensure that work can still be performed.
317    self._TestBasicApply(process_count, thread_count)
318
319  @RequiresIsolation
320  @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
321  def testApplySaturatesMultiProcessSingleThread(self):
322    self._TestApplySaturatesAvailableProcessesAndThreads(3, 1)
323
324  @RequiresIsolation
325  def testApplySaturatesSingleProcessMultiThread(self):
326    self._TestApplySaturatesAvailableProcessesAndThreads(1, 3)
327
328  @RequiresIsolation
329  @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
330  def testApplySaturatesMultiProcessMultiThread(self):
331    self._TestApplySaturatesAvailableProcessesAndThreads(3, 3)
332
333  @RequiresIsolation
334  def _TestApplySaturatesAvailableProcessesAndThreads(self, process_count,
335                                                      thread_count):
336    """Tests that created processes and threads evenly share tasks."""
337    calls_per_thread = 2
338    args = [()] * (process_count * thread_count * calls_per_thread)
339    expected_calls_per_thread = calls_per_thread
340
341    if not self.command_class(True).multiprocessing_is_available:
342      # When multiprocessing is unavailable, only a single process is used.
343      # Calls should be evenly distributed across threads.
344      expected_calls_per_thread = calls_per_thread * process_count
345
346    results = self._RunApply(_SleepThenReturnProcAndThreadId, args,
347                             process_count, thread_count)
348    usage_dict = {}  # (process_id, thread_id): number of tasks performed
349    for (process_id, thread_id) in results:
350      usage_dict[(process_id, thread_id)] = (
351          usage_dict.get((process_id, thread_id), 0) + 1)
352
353    for (id_tuple, num_tasks_completed) in usage_dict.iteritems():
354      self.assertEqual(num_tasks_completed, expected_calls_per_thread,
355                       'Process %s thread %s completed %s tasks. Expected: %s' %
356                       (id_tuple[0], id_tuple[1], num_tasks_completed,
357                        expected_calls_per_thread))
358
359  @RequiresIsolation
360  def testIteratorFailureSingleProcessSingleThread(self):
361    self._TestIteratorFailure(1, 1)
362
363  @RequiresIsolation
364  def testIteratorFailureSingleProcessMultiThread(self):
365    self._TestIteratorFailure(1, 3)
366
367  @RequiresIsolation
368  @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
369  def testIteratorFailureMultiProcessSingleThread(self):
370    self._TestIteratorFailure(3, 1)
371
372  @RequiresIsolation
373  @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
374  def testIteratorFailureMultiProcessMultiThread(self):
375    self._TestIteratorFailure(3, 3)
376
377  @Timeout
378  def _TestIteratorFailure(self, process_count, thread_count):
379    """Tests apply with a failing iterator."""
380    # Tests for fail_on_error == False.
381
382    args = FailingIterator(10, [0])
383    results = self._RunApply(_ReturnOneValue, args, process_count, thread_count)
384    self.assertEqual(9, len(results))
385
386    args = FailingIterator(10, [5])
387    results = self._RunApply(_ReturnOneValue, args, process_count, thread_count)
388    self.assertEqual(9, len(results))
389
390    args = FailingIterator(10, [9])
391    results = self._RunApply(_ReturnOneValue, args, process_count, thread_count)
392    self.assertEqual(9, len(results))
393
394    if process_count * thread_count > 1:
395      # In this case, we should ignore the fail_on_error flag.
396      args = FailingIterator(10, [9])
397      results = self._RunApply(_ReturnOneValue, args, process_count,
398                               thread_count, fail_on_error=True)
399      self.assertEqual(9, len(results))
400
401    args = FailingIterator(10, range(10))
402    results = self._RunApply(_ReturnOneValue, args, process_count, thread_count)
403    self.assertEqual(0, len(results))
404
405    args = FailingIterator(0, [])
406    results = self._RunApply(_ReturnOneValue, args, process_count, thread_count)
407    self.assertEqual(0, len(results))
408
409  @RequiresIsolation
410  def testTestSharedAttrsWorkSingleProcessSingleThread(self):
411    self._TestSharedAttrsWork(1, 1)
412
413  @RequiresIsolation
414  def testTestSharedAttrsWorkSingleProcessMultiThread(self):
415    self._TestSharedAttrsWork(1, 3)
416
417  @RequiresIsolation
418  @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
419  def testTestSharedAttrsWorkMultiProcessSingleThread(self):
420    self._TestSharedAttrsWork(3, 1)
421
422  @RequiresIsolation
423  @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
424  def testTestSharedAttrsWorkMultiProcessMultiThread(self):
425    self._TestSharedAttrsWork(3, 3)
426
427  @Timeout
428  def _TestSharedAttrsWork(self, process_count, thread_count):
429    """Tests that Apply successfully uses shared_attrs."""
430    command_inst = self.command_class(True)
431    command_inst.arg_length_sum = 19
432    args = ['foo', ['bar', 'baz'], [], ['x', 'y'], [], 'abcd']
433    self._RunApply(_IncrementByLength, args, process_count,
434                   thread_count, command_inst=command_inst,
435                   shared_attrs=['arg_length_sum'])
436    expected_sum = 19
437    for arg in args:
438      expected_sum += len(arg)
439    self.assertEqual(expected_sum, command_inst.arg_length_sum)
440
441    # Test that shared variables work when the iterator fails at the beginning,
442    # middle, and end.
443    for (failing_iterator, expected_failure_count) in (
444        (FailingIterator(5, [0]), 1),
445        (FailingIterator(10, [1, 3, 5]), 3),
446        (FailingIterator(5, [4]), 1)):
447      command_inst = self.command_class(True)
448      args = failing_iterator
449      self._RunApply(_ReturnOneValue, args, process_count, thread_count,
450                     command_inst=command_inst, shared_attrs=['failure_count'])
451      self.assertEqual(
452          expected_failure_count, command_inst.failure_count,
453          msg='Failure count did not match. Expected: %s, actual: %s '
454          'for failing iterator of size %s, failing indices %s' %
455          (expected_failure_count, command_inst.failure_count,
456           failing_iterator.size, failing_iterator.failure_indices))
457
458  @RequiresIsolation
459  def testThreadsSurviveExceptionsInFuncSingleProcessSingleThread(self):
460    self._TestThreadsSurviveExceptionsInFunc(1, 1)
461
462  @RequiresIsolation
463  def testThreadsSurviveExceptionsInFuncSingleProcessMultiThread(self):
464    self._TestThreadsSurviveExceptionsInFunc(1, 3)
465
466  @RequiresIsolation
467  @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
468  def testThreadsSurviveExceptionsInFuncMultiProcessSingleThread(self):
469    self._TestThreadsSurviveExceptionsInFunc(3, 1)
470
471  @RequiresIsolation
472  @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
473  def testThreadsSurviveExceptionsInFuncMultiProcessMultiThread(self):
474    self._TestThreadsSurviveExceptionsInFunc(3, 3)
475
476  @Timeout
477  def _TestThreadsSurviveExceptionsInFunc(self, process_count, thread_count):
478    command_inst = self.command_class(True)
479    args = ([()] * 5)
480    self._RunApply(_FailureFunc, args, process_count, thread_count,
481                   command_inst=command_inst, shared_attrs=['failure_count'],
482                   thr_exc_handler=_FailingExceptionHandler)
483    self.assertEqual(len(args), command_inst.failure_count)
484
485  @RequiresIsolation
486  def testThreadsSurviveExceptionsInHandlerSingleProcessSingleThread(self):
487    self._TestThreadsSurviveExceptionsInHandler(1, 1)
488
489  @RequiresIsolation
490  def testThreadsSurviveExceptionsInHandlerSingleProcessMultiThread(self):
491    self._TestThreadsSurviveExceptionsInHandler(1, 3)
492
493  @RequiresIsolation
494  @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
495  def testThreadsSurviveExceptionsInHandlerMultiProcessSingleThread(self):
496    self._TestThreadsSurviveExceptionsInHandler(3, 1)
497
498  @RequiresIsolation
499  @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
500  def testThreadsSurviveExceptionsInHandlerMultiProcessMultiThread(self):
501    self._TestThreadsSurviveExceptionsInHandler(3, 3)
502
503  @Timeout
504  def _TestThreadsSurviveExceptionsInHandler(self, process_count, thread_count):
505    command_inst = self.command_class(True)
506    args = ([()] * 5)
507    self._RunApply(_FailureFunc, args, process_count, thread_count,
508                   command_inst=command_inst, shared_attrs=['failure_count'],
509                   thr_exc_handler=_FailingExceptionHandler)
510    self.assertEqual(len(args), command_inst.failure_count)
511
512  @RequiresIsolation
513  @Timeout
514  def testFailOnErrorFlag(self):
515    """Tests that fail_on_error produces the correct exception on failure."""
516    def _ExpectCustomException(test_func):
517      try:
518        test_func()
519        self.fail(
520            'Setting fail_on_error should raise any exception encountered.')
521      except CustomException, e:
522        pass
523      except Exception, e:
524        self.fail('Got unexpected error: ' + str(e))
525
526    def _RunFailureFunc():
527      command_inst = self.command_class(True)
528      args = ([()] * 5)
529      self._RunApply(_FailureFunc, args, 1, 1, command_inst=command_inst,
530                     shared_attrs=['failure_count'], fail_on_error=True)
531    _ExpectCustomException(_RunFailureFunc)
532
533    def _RunFailingIteratorFirstPosition():
534      args = FailingIterator(10, [0])
535      results = self._RunApply(_ReturnOneValue, args, 1, 1, fail_on_error=True)
536      self.assertEqual(0, len(results))
537    _ExpectCustomException(_RunFailingIteratorFirstPosition)
538
539    def _RunFailingIteratorPositionMiddlePosition():
540      args = FailingIterator(10, [5])
541      results = self._RunApply(_ReturnOneValue, args, 1, 1, fail_on_error=True)
542      self.assertEqual(5, len(results))
543    _ExpectCustomException(_RunFailingIteratorPositionMiddlePosition)
544
545    def _RunFailingIteratorLastPosition():
546      args = FailingIterator(10, [9])
547      results = self._RunApply(_ReturnOneValue, args, 1, 1, fail_on_error=True)
548      self.assertEqual(9, len(results))
549    _ExpectCustomException(_RunFailingIteratorLastPosition)
550
551    def _RunFailingIteratorMultiplePositions():
552      args = FailingIterator(10, [1, 3, 5])
553      results = self._RunApply(_ReturnOneValue, args, 1, 1, fail_on_error=True)
554      self.assertEqual(1, len(results))
555    _ExpectCustomException(_RunFailingIteratorMultiplePositions)
556
557  @RequiresIsolation
558  def testRecursiveDepthThreeDifferentFunctionsSingleProcessSingleThread(self):
559    self._TestRecursiveDepthThreeDifferentFunctions(1, 1)
560
561  @RequiresIsolation
562  def testRecursiveDepthThreeDifferentFunctionsSingleProcessMultiThread(self):
563    self._TestRecursiveDepthThreeDifferentFunctions(1, 3)
564
565  @RequiresIsolation
566  @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
567  def testRecursiveDepthThreeDifferentFunctionsMultiProcessSingleThread(self):
568    self._TestRecursiveDepthThreeDifferentFunctions(3, 1)
569
570  @RequiresIsolation
571  @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
572  def testRecursiveDepthThreeDifferentFunctionsMultiProcessMultiThread(self):
573    self._TestRecursiveDepthThreeDifferentFunctions(3, 3)
574
575  @Timeout
576  def _TestRecursiveDepthThreeDifferentFunctions(self, process_count,
577                                                 thread_count):
578    """Tests recursive application of Apply.
579
580    Calls Apply(A), where A calls Apply(B) followed by Apply(C) and B calls
581    Apply(C).
582
583    Args:
584      process_count: Number of processes to use.
585      thread_count: Number of threads to use.
586    """
587    base_args = [3, 1, 4, 1, 5]
588    args = [[process_count, thread_count, count] for count in base_args]
589
590    results = self._RunApply(_ReApplyWithReplicatedArguments, args,
591                             process_count, thread_count)
592    self.assertEqual(7 * (sum(base_args) + len(base_args)), sum(results))
593
594  @RequiresIsolation
595  def testExceptionInProducerRaisesAndTerminatesSingleProcessSingleThread(self):
596    self._TestExceptionInProducerRaisesAndTerminates(1, 1)
597
598  @RequiresIsolation
599  def testExceptionInProducerRaisesAndTerminatesSingleProcessMultiThread(self):
600    self._TestExceptionInProducerRaisesAndTerminates(1, 3)
601
602  @RequiresIsolation
603  @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
604  def testExceptionInProducerRaisesAndTerminatesMultiProcessSingleThread(self):
605    self._TestExceptionInProducerRaisesAndTerminates(3, 1)
606
607  @RequiresIsolation
608  @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
609  def testExceptionInProducerRaisesAndTerminatesMultiProcessMultiThread(self):
610    self._TestExceptionInProducerRaisesAndTerminates(3, 3)
611
612  @Timeout
613  def _TestExceptionInProducerRaisesAndTerminates(self, process_count,
614                                                  thread_count):
615    args = self  # The ProducerThread will try and fail to iterate over this.
616    try:
617      self._RunApply(_ReturnOneValue, args, process_count, thread_count)
618      self.fail('Did not raise expected exception.')
619    except TypeError:
620      pass
621
622  @RequiresIsolation
623  def testSkippedArgumentsSingleThreadSingleProcess(self):
624    self._TestSkippedArguments(1, 1)
625
626  @RequiresIsolation
627  def testSkippedArgumentsMultiThreadSingleProcess(self):
628    self._TestSkippedArguments(1, 3)
629
630  @RequiresIsolation
631  @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
632  def testSkippedArgumentsSingleThreadMultiProcess(self):
633    self._TestSkippedArguments(3, 1)
634
635  @RequiresIsolation
636  @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
637  def testSkippedArgumentsMultiThreadMultiProcess(self):
638    self._TestSkippedArguments(3, 3)
639
640  @Timeout
641  def _TestSkippedArguments(self, process_count, thread_count):
642
643    # Skip a proper subset of the arguments.
644    n = 2 * process_count * thread_count
645    args = range(1, n + 1)
646    results = self._RunApply(_ReturnOneValue, args, process_count, thread_count,
647                             arg_checker=_SkipEvenNumbersArgChecker)
648    self.assertEqual(n / 2, len(results))  # We know n is even.
649    self.assertEqual(n / 2, sum(results))
650
651    # Skip all arguments.
652    args = [2 * x for x in args]
653    results = self._RunApply(_ReturnOneValue, args, process_count, thread_count,
654                             arg_checker=_SkipEvenNumbersArgChecker)
655    self.assertEqual(0, len(results))
656
657
658class TestParallelismFrameworkWithoutMultiprocessing(TestParallelismFramework):
659  """Tests parallelism framework works with multiprocessing module unavailable.
660
661  Notably, this test has no way to override previous calls
662  to gslib.util.CheckMultiprocessingAvailableAndInit to prevent the
663  initialization of all of the global variables in command.py, so this still
664  behaves slightly differently than the behavior one would see on a machine
665  where the multiprocessing functionality is actually not available (in
666  particular, it will not catch the case where a global variable that is not
667  available for the sequential path is referenced before initialization).
668  """
669  command_class = FakeCommandWithoutMultiprocessingModule
670