1# -*- coding: utf-8 -*-
2# Copyright 2011 Google Inc. All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15"""Unit and integration tests for gsutil command_runner module."""
16
17from __future__ import absolute_import
18
19import logging
20import os
21import time
22
23import gslib
24from gslib import command_runner
25from gslib.command import Command
26from gslib.command_argument import CommandArgument
27from gslib.command_runner import CommandRunner
28from gslib.command_runner import HandleArgCoding
29from gslib.exception import CommandException
30from gslib.tab_complete import CloudObjectCompleter
31from gslib.tab_complete import CloudOrLocalObjectCompleter
32from gslib.tab_complete import LocalObjectCompleter
33from gslib.tab_complete import LocalObjectOrCannedACLCompleter
34from gslib.tab_complete import NoOpCompleter
35import gslib.tests.testcase as testcase
36import gslib.tests.util as util
37from gslib.tests.util import ARGCOMPLETE_AVAILABLE
38from gslib.tests.util import SetBotoConfigFileForTest
39from gslib.tests.util import SetBotoConfigForTest
40from gslib.tests.util import unittest
41from gslib.util import GSUTIL_PUB_TARBALL
42from gslib.util import SECONDS_PER_DAY
43
44
45class FakeArgparseArgument(object):
46  """Fake for argparse parser argument."""
47  pass
48
49
50class FakeArgparseParser(object):
51  """Fake for argparse parser."""
52
53  def __init__(self):
54    self.arguments = []
55
56  def add_argument(self, *unused_args, **unused_kwargs):
57    argument = FakeArgparseArgument()
58    self.arguments.append(argument)
59    return argument
60
61
62class FakeArgparseSubparsers(object):
63  """Container for nested parsers."""
64
65  def __init__(self):
66    self.parsers = []
67
68  def add_parser(self, unused_name, **unused_kwargs):
69    parser = FakeArgparseParser()
70    self.parsers.append(parser)
71    return parser
72
73
74class FakeCommandWithInvalidCompleter(Command):
75  """Command with an invalid completer on an argument."""
76
77  command_spec = Command.CreateCommandSpec(
78      'fake1',
79      argparse_arguments=[
80          CommandArgument('arg', completer='BAD')
81      ]
82  )
83
84  help_spec = Command.HelpSpec(
85      help_name='fake1',
86      help_name_aliases=[],
87      help_type='command_help',
88      help_one_line_summary='fake command for tests',
89      help_text='fake command for tests',
90      subcommand_help_text={}
91  )
92
93  def __init__(self):
94    pass
95
96
97class FakeCommandWithCompleters(Command):
98  """Command with various completer types."""
99
100  command_spec = Command.CreateCommandSpec(
101      'fake2',
102      argparse_arguments=[
103          CommandArgument.MakeZeroOrMoreCloudURLsArgument(),
104          CommandArgument.MakeZeroOrMoreFileURLsArgument(),
105          CommandArgument.MakeZeroOrMoreCloudOrFileURLsArgument(),
106          CommandArgument.MakeFreeTextArgument(),
107          CommandArgument.MakeZeroOrMoreCloudBucketURLsArgument(),
108          CommandArgument.MakeFileURLOrCannedACLArgument(),
109      ]
110  )
111
112  help_spec = Command.HelpSpec(
113      help_name='fake2',
114      help_name_aliases=[],
115      help_type='command_help',
116      help_one_line_summary='fake command for tests',
117      help_text='fake command for tests',
118      subcommand_help_text={}
119  )
120
121  def __init__(self):
122    pass
123
124
125class TestCommandRunnerUnitTests(
126    testcase.unit_testcase.GsUtilUnitTestCase):
127  """Unit tests for gsutil update check in command_runner module."""
128
129  def setUp(self):
130    """Sets up the command runner mock objects."""
131    super(TestCommandRunnerUnitTests, self).setUp()
132
133    # Mock out the timestamp file so we can manipulate it.
134    self.previous_update_file = (
135        command_runner.LAST_CHECKED_FOR_GSUTIL_UPDATE_TIMESTAMP_FILE)
136    self.timestamp_file = self.CreateTempFile()
137    command_runner.LAST_CHECKED_FOR_GSUTIL_UPDATE_TIMESTAMP_FILE = (
138        self.timestamp_file)
139
140    # Mock out the gsutil version checker.
141    base_version = unicode(gslib.VERSION)
142    while not base_version.isnumeric():
143      if not base_version:
144        raise CommandException(
145            'Version number (%s) is not numeric.' % gslib.VERSION)
146      base_version = base_version[:-1]
147    command_runner.LookUpGsutilVersion = lambda u, v: float(base_version) + 1
148
149    # Mock out raw_input to trigger yes prompt.
150    command_runner.raw_input = lambda p: 'y'
151
152    # Mock out TTY check to pretend we're on a TTY even if we're not.
153    self.running_interactively = True
154    command_runner.IsRunningInteractively = lambda: self.running_interactively
155
156    # Mock out the modified time of the VERSION file.
157    self.version_mod_time = 0
158    self.previous_version_mod_time = command_runner.GetGsutilVersionModifiedTime
159    command_runner.GetGsutilVersionModifiedTime = lambda: self.version_mod_time
160
161    # Create a fake pub tarball that will be used to check for gsutil version.
162    self.pub_bucket_uri = self.CreateBucket('pub')
163    self.gsutil_tarball_uri = self.CreateObject(
164        bucket_uri=self.pub_bucket_uri, object_name='gsutil.tar.gz',
165        contents='foo')
166
167  def tearDown(self):
168    """Tears down the command runner mock objects."""
169    super(TestCommandRunnerUnitTests, self).tearDown()
170
171    command_runner.LAST_CHECKED_FOR_GSUTIL_UPDATE_TIMESTAMP_FILE = (
172        self.previous_update_file)
173    command_runner.LookUpGsutilVersion = gslib.util.LookUpGsutilVersion
174    command_runner.raw_input = raw_input
175
176    command_runner.GetGsutilVersionModifiedTime = self.previous_version_mod_time
177
178    command_runner.IsRunningInteractively = gslib.util.IsRunningInteractively
179
180    self.gsutil_tarball_uri.delete_key()
181    self.pub_bucket_uri.delete_bucket()
182
183  def _IsPackageOrCloudSDKInstall(self):
184    # Update should not trigger for package installs or Cloud SDK installs.
185    return (gslib.IS_PACKAGE_INSTALL or
186            os.environ.get('CLOUDSDK_WRAPPER') == '1')
187
188  @unittest.skipUnless(not util.HAS_GS_HOST, 'gs_host is defined in config')
189  def test_not_interactive(self):
190    """Tests that update is not triggered if not running interactively."""
191    with SetBotoConfigForTest([
192        ('GSUtil', 'software_update_check_period', '1')]):
193      with open(self.timestamp_file, 'w') as f:
194        f.write(str(int(time.time() - 2 * SECONDS_PER_DAY)))
195      self.running_interactively = False
196      self.assertEqual(
197          False,
198          self.command_runner.MaybeCheckForAndOfferSoftwareUpdate('ls', 0))
199
200  @unittest.skipUnless(not util.HAS_GS_HOST, 'gs_host is defined in config')
201  def test_no_tracker_file_version_recent(self):
202    """Tests when no timestamp file exists and VERSION file is recent."""
203    if os.path.exists(self.timestamp_file):
204      os.remove(self.timestamp_file)
205    self.assertFalse(os.path.exists(self.timestamp_file))
206    self.version_mod_time = time.time()
207    self.assertEqual(
208        False,
209        self.command_runner.MaybeCheckForAndOfferSoftwareUpdate('ls', 0))
210
211  @unittest.skipUnless(not util.HAS_GS_HOST, 'gs_host is defined in config')
212  def test_no_tracker_file_version_old(self):
213    """Tests when no timestamp file exists and VERSION file is old."""
214    if os.path.exists(self.timestamp_file):
215      os.remove(self.timestamp_file)
216    self.assertFalse(os.path.exists(self.timestamp_file))
217    self.version_mod_time = 0
218    expected = not self._IsPackageOrCloudSDKInstall()
219    self.assertEqual(
220        expected,
221        self.command_runner.MaybeCheckForAndOfferSoftwareUpdate('ls', 0))
222
223  @unittest.skipUnless(not util.HAS_GS_HOST, 'gs_host is defined in config')
224  def test_invalid_commands(self):
225    """Tests that update is not triggered for certain commands."""
226    self.assertEqual(
227        False,
228        self.command_runner.MaybeCheckForAndOfferSoftwareUpdate('update', 0))
229
230  @unittest.skipUnless(not util.HAS_GS_HOST, 'gs_host is defined in config')
231  def test_invalid_file_contents(self):
232    """Tests no update if timestamp file has invalid value."""
233    with open(self.timestamp_file, 'w') as f:
234      f.write('NaN')
235    self.assertEqual(
236        False,
237        self.command_runner.MaybeCheckForAndOfferSoftwareUpdate('ls', 0))
238
239  @unittest.skipUnless(not util.HAS_GS_HOST, 'gs_host is defined in config')
240  def test_update_should_trigger(self):
241    """Tests update should be triggered if time is up."""
242    with SetBotoConfigForTest([
243        ('GSUtil', 'software_update_check_period', '1')]):
244      with open(self.timestamp_file, 'w') as f:
245        f.write(str(int(time.time() - 2 * SECONDS_PER_DAY)))
246      expected = not self._IsPackageOrCloudSDKInstall()
247      self.assertEqual(
248          expected,
249          self.command_runner.MaybeCheckForAndOfferSoftwareUpdate('ls', 0))
250
251  @unittest.skipUnless(not util.HAS_GS_HOST, 'gs_host is defined in config')
252  def test_not_time_for_update_yet(self):
253    """Tests update not triggered if not time yet."""
254    with SetBotoConfigForTest([
255        ('GSUtil', 'software_update_check_period', '3')]):
256      with open(self.timestamp_file, 'w') as f:
257        f.write(str(int(time.time() - 2 * SECONDS_PER_DAY)))
258      self.assertEqual(
259          False,
260          self.command_runner.MaybeCheckForAndOfferSoftwareUpdate('ls', 0))
261
262  def test_user_says_no_to_update(self):
263    """Tests no update triggered if user says no at the prompt."""
264    with SetBotoConfigForTest([
265        ('GSUtil', 'software_update_check_period', '1')]):
266      with open(self.timestamp_file, 'w') as f:
267        f.write(str(int(time.time() - 2 * SECONDS_PER_DAY)))
268      command_runner.raw_input = lambda p: 'n'
269      self.assertEqual(
270          False,
271          self.command_runner.MaybeCheckForAndOfferSoftwareUpdate('ls', 0))
272
273  @unittest.skipUnless(not util.HAS_GS_HOST, 'gs_host is defined in config')
274  def test_update_check_skipped_with_quiet_mode(self):
275    """Tests that update isn't triggered when loglevel is in quiet mode."""
276    with SetBotoConfigForTest([
277        ('GSUtil', 'software_update_check_period', '1')]):
278      with open(self.timestamp_file, 'w') as f:
279        f.write(str(int(time.time() - 2 * SECONDS_PER_DAY)))
280
281      expected = not self._IsPackageOrCloudSDKInstall()
282      self.assertEqual(
283          expected,
284          self.command_runner.MaybeCheckForAndOfferSoftwareUpdate('ls', 0))
285
286      prev_loglevel = logging.getLogger().getEffectiveLevel()
287      try:
288        logging.getLogger().setLevel(logging.ERROR)
289        # With reduced loglevel, should return False.
290        self.assertEqual(
291            False,
292            self.command_runner.MaybeCheckForAndOfferSoftwareUpdate('ls', 0))
293      finally:
294        logging.getLogger().setLevel(prev_loglevel)
295
296  def test_command_argument_parser_setup_invalid_completer(self):
297
298    command_map = {
299        FakeCommandWithInvalidCompleter.command_spec.command_name:
300            FakeCommandWithInvalidCompleter()
301    }
302
303    runner = CommandRunner(
304        bucket_storage_uri_class=self.mock_bucket_storage_uri,
305        gsutil_api_class_map_factory=self.mock_gsutil_api_class_map_factory,
306        command_map=command_map)
307
308    subparsers = FakeArgparseSubparsers()
309    try:
310      runner.ConfigureCommandArgumentParsers(subparsers)
311    except RuntimeError as e:
312      self.assertIn('Unknown completer', e.message)
313
314  @unittest.skipUnless(ARGCOMPLETE_AVAILABLE,
315                       'Tab completion requires argcomplete')
316  def test_command_argument_parser_setup_completers(self):
317
318    command_map = {
319        FakeCommandWithCompleters.command_spec.command_name:
320            FakeCommandWithCompleters()
321    }
322
323    runner = CommandRunner(
324        bucket_storage_uri_class=self.mock_bucket_storage_uri,
325        gsutil_api_class_map_factory=self.mock_gsutil_api_class_map_factory,
326        command_map=command_map)
327
328    subparsers = FakeArgparseSubparsers()
329    runner.ConfigureCommandArgumentParsers(subparsers)
330
331    self.assertEqual(1, len(subparsers.parsers))
332    parser = subparsers.parsers[0]
333    self.assertEqual(6, len(parser.arguments))
334    self.assertEqual(CloudObjectCompleter, type(parser.arguments[0].completer))
335    self.assertEqual(LocalObjectCompleter, type(parser.arguments[1].completer))
336    self.assertEqual(
337        CloudOrLocalObjectCompleter, type(parser.arguments[2].completer))
338    self.assertEqual(
339        NoOpCompleter, type(parser.arguments[3].completer))
340    self.assertEqual(CloudObjectCompleter, type(parser.arguments[4].completer))
341    self.assertEqual(
342        LocalObjectOrCannedACLCompleter, type(parser.arguments[5].completer))
343
344  # pylint: disable=invalid-encoded-data
345  def test_valid_arg_coding(self):
346    """Tests that gsutil encodes valid args correctly."""
347    # Args other than -h and -p should be utf-8 decoded.
348    args = HandleArgCoding(['ls', '-l'])
349    self.assertIs(type(args[0]), unicode)
350    self.assertIs(type(args[1]), unicode)
351
352    # -p and -h args other than x-goog-meta should not be decoded.
353    args = HandleArgCoding(['ls', '-p', 'abc:def', 'gs://bucket'])
354    self.assertIs(type(args[0]), unicode)
355    self.assertIs(type(args[1]), unicode)
356    self.assertIsNot(type(args[2]), unicode)
357    self.assertIs(type(args[3]), unicode)
358
359    args = HandleArgCoding(['gsutil', '-h', 'content-type:text/plain', 'cp',
360                            'a', 'gs://bucket'])
361    self.assertIs(type(args[0]), unicode)
362    self.assertIs(type(args[1]), unicode)
363    self.assertIsNot(type(args[2]), unicode)
364    self.assertIs(type(args[3]), unicode)
365    self.assertIs(type(args[4]), unicode)
366    self.assertIs(type(args[5]), unicode)
367
368    # -h x-goog-meta args should be decoded.
369    args = HandleArgCoding(['gsutil', '-h', 'x-goog-meta-abc', '1234'])
370    self.assertIs(type(args[0]), unicode)
371    self.assertIs(type(args[1]), unicode)
372    self.assertIs(type(args[2]), unicode)
373    self.assertIs(type(args[3]), unicode)
374
375    # -p and -h args with non-ASCII content should raise CommandException.
376    try:
377      HandleArgCoding(['ls', '-p', '碼'])
378      # Ensure exception is raised.
379      self.assertTrue(False)
380    except CommandException as e:
381      self.assertIn('Invalid non-ASCII header', e.reason)
382    try:
383      HandleArgCoding(['-h', '碼', 'ls'])
384      # Ensure exception is raised.
385      self.assertTrue(False)
386    except CommandException as e:
387      self.assertIn('Invalid non-ASCII header', e.reason)
388
389
390class TestCommandRunnerIntegrationTests(
391    testcase.GsUtilIntegrationTestCase):
392  """Integration tests for gsutil update check in command_runner module."""
393
394  def setUp(self):
395    """Sets up the command runner mock objects."""
396    super(TestCommandRunnerIntegrationTests, self).setUp()
397
398    # Mock out the timestamp file so we can manipulate it.
399    self.previous_update_file = (
400        command_runner.LAST_CHECKED_FOR_GSUTIL_UPDATE_TIMESTAMP_FILE)
401    self.timestamp_file = self.CreateTempFile(contents='0')
402    command_runner.LAST_CHECKED_FOR_GSUTIL_UPDATE_TIMESTAMP_FILE = (
403        self.timestamp_file)
404
405    # Mock out raw_input to trigger yes prompt.
406    command_runner.raw_input = lambda p: 'y'
407
408  def tearDown(self):
409    """Tears down the command runner mock objects."""
410    super(TestCommandRunnerIntegrationTests, self).tearDown()
411    command_runner.LAST_CHECKED_FOR_GSUTIL_UPDATE_TIMESTAMP_FILE = (
412        self.previous_update_file)
413    command_runner.raw_input = raw_input
414
415  @unittest.skipUnless(not util.HAS_GS_HOST, 'gs_host is defined in config')
416  def test_lookup_version_without_credentials(self):
417    """Tests that gsutil tarball version lookup works without credentials."""
418    with SetBotoConfigFileForTest(self.CreateTempFile(
419        contents='[GSUtil]\nsoftware_update_check_period=1')):
420      self.command_runner = command_runner.CommandRunner()
421      # Looking up software version shouldn't get auth failure exception.
422      self.command_runner.RunNamedCommand('ls', [GSUTIL_PUB_TARBALL])
423