1# -*- coding: utf-8 -*-
2# Copyright 2013 Google Inc. All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15"""Contains gsutil base integration test case class."""
16
17from __future__ import absolute_import
18
19from contextlib import contextmanager
20import cStringIO
21import locale
22import logging
23import os
24import subprocess
25import sys
26import tempfile
27
28import boto
29from boto.exception import StorageResponseError
30from boto.s3.deletemarker import DeleteMarker
31from boto.storage_uri import BucketStorageUri
32
33import gslib
34from gslib.gcs_json_api import GcsJsonApi
35from gslib.hashing_helper import Base64ToHexHash
36from gslib.project_id import GOOG_PROJ_ID_HDR
37from gslib.project_id import PopulateProjectId
38from gslib.tests.testcase import base
39import gslib.tests.util as util
40from gslib.tests.util import ObjectToURI as suri
41from gslib.tests.util import RUN_S3_TESTS
42from gslib.tests.util import SetBotoConfigFileForTest
43from gslib.tests.util import SetBotoConfigForTest
44from gslib.tests.util import SetEnvironmentForTest
45from gslib.tests.util import unittest
46import gslib.third_party.storage_apitools.storage_v1_messages as apitools_messages
47from gslib.util import IS_WINDOWS
48from gslib.util import Retry
49from gslib.util import UTF8
50
51
52LOGGER = logging.getLogger('integration-test')
53
54# Contents of boto config file that will tell gsutil not to override the real
55# error message with a warning about anonymous access if no credentials are
56# provided in the config file. Also, because we retry 401s, reduce the number
57# of retries so we don't go through a long exponential backoff in tests.
58BOTO_CONFIG_CONTENTS_IGNORE_ANON_WARNING = """
59[Boto]
60num_retries = 2
61[Tests]
62bypass_anonymous_access_warning = True
63"""
64
65
66def SkipForGS(reason):
67  if not RUN_S3_TESTS:
68    return unittest.skip(reason)
69  else:
70    return lambda func: func
71
72
73def SkipForS3(reason):
74  if RUN_S3_TESTS:
75    return unittest.skip(reason)
76  else:
77    return lambda func: func
78
79
80# TODO: Right now, most tests use the XML API. Instead, they should respect
81# prefer_api in the same way that commands do.
82@unittest.skipUnless(util.RUN_INTEGRATION_TESTS,
83                     'Not running integration tests.')
84class GsUtilIntegrationTestCase(base.GsUtilTestCase):
85  """Base class for gsutil integration tests."""
86  GROUP_TEST_ADDRESS = 'gs-discussion@googlegroups.com'
87  GROUP_TEST_ID = (
88      '00b4903a97d097895ab58ef505d535916a712215b79c3e54932c2eb502ad97f5')
89  USER_TEST_ADDRESS = 'gsutiltestuser@gmail.com'
90  USER_TEST_ID = (
91      '00b4903a97b201e40d2a5a3ddfe044bb1ab79c75b2e817cbe350297eccc81c84')
92  DOMAIN_TEST = 'google.com'
93  # No one can create this bucket without owning the gmail.com domain, and we
94  # won't create this bucket, so it shouldn't exist.
95  # It would be nice to use google.com here but JSON API disallows
96  # 'google' in resource IDs.
97  nonexistent_bucket_name = 'nonexistent-bucket-foobar.gmail.com'
98
99  def setUp(self):
100    """Creates base configuration for integration tests."""
101    super(GsUtilIntegrationTestCase, self).setUp()
102    self.bucket_uris = []
103
104    # Set up API version and project ID handler.
105    self.api_version = boto.config.get_value(
106        'GSUtil', 'default_api_version', '1')
107
108    # Instantiate a JSON API for use by the current integration test.
109    self.json_api = GcsJsonApi(BucketStorageUri, logging.getLogger(),
110                               'gs')
111
112    if util.RUN_S3_TESTS:
113      self.nonexistent_bucket_name = (
114          'nonexistentbucket-asf801rj3r9as90mfnnkjxpo02')
115
116  # Retry with an exponential backoff if a server error is received. This
117  # ensures that we try *really* hard to clean up after ourselves.
118  # TODO: As long as we're still using boto to do the teardown,
119  # we decorate with boto exceptions.  Eventually this should be migrated
120  # to CloudApi exceptions.
121  @Retry(StorageResponseError, tries=7, timeout_secs=1)
122  def tearDown(self):
123    super(GsUtilIntegrationTestCase, self).tearDown()
124
125    while self.bucket_uris:
126      bucket_uri = self.bucket_uris[-1]
127      try:
128        bucket_list = self._ListBucket(bucket_uri)
129      except StorageResponseError, e:
130        # This can happen for tests of rm -r command, which for bucket-only
131        # URIs delete the bucket at the end.
132        if e.status == 404:
133          self.bucket_uris.pop()
134          continue
135        else:
136          raise
137      while bucket_list:
138        error = None
139        for k in bucket_list:
140          try:
141            if isinstance(k, DeleteMarker):
142              bucket_uri.get_bucket().delete_key(k.name,
143                                                 version_id=k.version_id)
144            else:
145              k.delete()
146          except StorageResponseError, e:
147            # This could happen if objects that have already been deleted are
148            # still showing up in the listing due to eventual consistency. In
149            # that case, we continue on until we've tried to deleted every
150            # object in the listing before raising the error on which to retry.
151            if e.status == 404:
152              error = e
153            else:
154              raise
155        if error:
156          raise error  # pylint: disable=raising-bad-type
157        bucket_list = self._ListBucket(bucket_uri)
158      bucket_uri.delete_bucket()
159      self.bucket_uris.pop()
160
161  def _ListBucket(self, bucket_uri):
162    if bucket_uri.scheme == 's3':
163      # storage_uri will omit delete markers from bucket listings, but
164      # these must be deleted before we can remove an S3 bucket.
165      return list(v for v in bucket_uri.get_bucket().list_versions())
166    return list(bucket_uri.list_bucket(all_versions=True))
167
168  def AssertNObjectsInBucket(self, bucket_uri, num_objects, versioned=False):
169    """Checks (with retries) that 'ls bucket_uri/**' returns num_objects.
170
171    This is a common test pattern to deal with eventual listing consistency for
172    tests that rely on a set of objects to be listed.
173
174    Args:
175      bucket_uri: storage_uri for the bucket.
176      num_objects: number of objects expected in the bucket.
177      versioned: If True, perform a versioned listing.
178
179    Raises:
180      AssertionError if number of objects does not match expected value.
181
182    Returns:
183      Listing split across lines.
184    """
185    # Use @Retry as hedge against bucket listing eventual consistency.
186    @Retry(AssertionError, tries=5, timeout_secs=1)
187    def _Check1():
188      command = ['ls', '-a'] if versioned else ['ls']
189      b_uri = [suri(bucket_uri) + '/**'] if num_objects else [suri(bucket_uri)]
190      listing = self.RunGsUtil(command + b_uri, return_stdout=True).split('\n')
191      # num_objects + one trailing newline.
192      self.assertEquals(len(listing), num_objects + 1)
193      return listing
194    return _Check1()
195
196  def CreateBucket(self, bucket_name=None, test_objects=0, storage_class=None,
197                   provider=None, prefer_json_api=False):
198    """Creates a test bucket.
199
200    The bucket and all of its contents will be deleted after the test.
201
202    Args:
203      bucket_name: Create the bucket with this name. If not provided, a
204                   temporary test bucket name is constructed.
205      test_objects: The number of objects that should be placed in the bucket.
206                    Defaults to 0.
207      storage_class: storage class to use. If not provided we us standard.
208      provider: Provider to use - either "gs" (the default) or "s3".
209      prefer_json_api: If true, use the JSON creation functions where possible.
210
211    Returns:
212      StorageUri for the created bucket.
213    """
214    if not provider:
215      provider = self.default_provider
216
217    if prefer_json_api and provider == 'gs':
218      json_bucket = self.CreateBucketJson(bucket_name=bucket_name,
219                                          test_objects=test_objects,
220                                          storage_class=storage_class)
221      bucket_uri = boto.storage_uri(
222          'gs://%s' % json_bucket.name.encode(UTF8).lower(),
223          suppress_consec_slashes=False)
224      self.bucket_uris.append(bucket_uri)
225      return bucket_uri
226
227    bucket_name = bucket_name or self.MakeTempName('bucket')
228
229    bucket_uri = boto.storage_uri('%s://%s' % (provider, bucket_name.lower()),
230                                  suppress_consec_slashes=False)
231
232    if provider == 'gs':
233      # Apply API version and project ID headers if necessary.
234      headers = {'x-goog-api-version': self.api_version}
235      headers[GOOG_PROJ_ID_HDR] = PopulateProjectId()
236    else:
237      headers = {}
238
239    # Parallel tests can easily run into bucket creation quotas.
240    # Retry with exponential backoff so that we create them as fast as we
241    # reasonably can.
242    @Retry(StorageResponseError, tries=7, timeout_secs=1)
243    def _CreateBucketWithExponentialBackoff():
244      bucket_uri.create_bucket(storage_class=storage_class, headers=headers)
245
246    _CreateBucketWithExponentialBackoff()
247    self.bucket_uris.append(bucket_uri)
248    for i in range(test_objects):
249      self.CreateObject(bucket_uri=bucket_uri,
250                        object_name=self.MakeTempName('obj'),
251                        contents='test %d' % i)
252    return bucket_uri
253
254  def CreateVersionedBucket(self, bucket_name=None, test_objects=0):
255    """Creates a versioned test bucket.
256
257    The bucket and all of its contents will be deleted after the test.
258
259    Args:
260      bucket_name: Create the bucket with this name. If not provided, a
261                   temporary test bucket name is constructed.
262      test_objects: The number of objects that should be placed in the bucket.
263                    Defaults to 0.
264
265    Returns:
266      StorageUri for the created bucket with versioning enabled.
267    """
268    bucket_uri = self.CreateBucket(bucket_name=bucket_name,
269                                   test_objects=test_objects)
270    bucket_uri.configure_versioning(True)
271    return bucket_uri
272
273  def CreateObject(self, bucket_uri=None, object_name=None, contents=None,
274                   prefer_json_api=False):
275    """Creates a test object.
276
277    Args:
278      bucket_uri: The URI of the bucket to place the object in. If not
279                  specified, a new temporary bucket is created.
280      object_name: The name to use for the object. If not specified, a temporary
281                   test object name is constructed.
282      contents: The contents to write to the object. If not specified, the key
283                is not written to, which means that it isn't actually created
284                yet on the server.
285      prefer_json_api: If true, use the JSON creation functions where possible.
286
287    Returns:
288      A StorageUri for the created object.
289    """
290    bucket_uri = bucket_uri or self.CreateBucket()
291
292    if prefer_json_api and bucket_uri.scheme == 'gs' and contents:
293      object_name = object_name or self.MakeTempName('obj')
294      json_object = self.CreateObjectJson(contents=contents,
295                                          bucket_name=bucket_uri.bucket_name,
296                                          object_name=object_name)
297      object_uri = bucket_uri.clone_replace_name(object_name)
298      # pylint: disable=protected-access
299      # Need to update the StorageUri with the correct values while
300      # avoiding creating a versioned string.
301      object_uri._update_from_values(None,
302                                     json_object.generation,
303                                     True,
304                                     md5=(Base64ToHexHash(json_object.md5Hash),
305                                          json_object.md5Hash.strip('\n"\'')))
306      # pylint: enable=protected-access
307      return object_uri
308
309    bucket_uri = bucket_uri or self.CreateBucket()
310    object_name = object_name or self.MakeTempName('obj')
311    key_uri = bucket_uri.clone_replace_name(object_name)
312    if contents is not None:
313      key_uri.set_contents_from_string(contents)
314    return key_uri
315
316  def CreateBucketJson(self, bucket_name=None, test_objects=0,
317                       storage_class=None):
318    """Creates a test bucket using the JSON API.
319
320    The bucket and all of its contents will be deleted after the test.
321
322    Args:
323      bucket_name: Create the bucket with this name. If not provided, a
324                   temporary test bucket name is constructed.
325      test_objects: The number of objects that should be placed in the bucket.
326                    Defaults to 0.
327      storage_class: storage class to use. If not provided we us standard.
328
329    Returns:
330      Apitools Bucket for the created bucket.
331    """
332    bucket_name = bucket_name or self.MakeTempName('bucket')
333    bucket_metadata = None
334    if storage_class:
335      bucket_metadata = apitools_messages.Bucket(
336          name=bucket_name.lower(),
337          storageClass=storage_class)
338
339    # TODO: Add retry and exponential backoff.
340    bucket = self.json_api.CreateBucket(bucket_name.lower(),
341                                        metadata=bucket_metadata)
342    # Add bucket to list of buckets to be cleaned up.
343    # TODO: Clean up JSON buckets using JSON API.
344    self.bucket_uris.append(
345        boto.storage_uri('gs://%s' % (bucket_name.lower()),
346                         suppress_consec_slashes=False))
347    for i in range(test_objects):
348      self.CreateObjectJson(bucket_name=bucket_name,
349                            object_name=self.MakeTempName('obj'),
350                            contents='test %d' % i)
351    return bucket
352
353  def CreateObjectJson(self, contents, bucket_name=None, object_name=None):
354    """Creates a test object (GCS provider only) using the JSON API.
355
356    Args:
357      contents: The contents to write to the object.
358      bucket_name: Name of bucket to place the object in. If not
359                   specified, a new temporary bucket is created.
360      object_name: The name to use for the object. If not specified, a temporary
361                   test object name is constructed.
362
363    Returns:
364      An apitools Object for the created object.
365    """
366    bucket_name = bucket_name or self.CreateBucketJson().name
367    object_name = object_name or self.MakeTempName('obj')
368    object_metadata = apitools_messages.Object(
369        name=object_name,
370        bucket=bucket_name,
371        contentType='application/octet-stream')
372    return self.json_api.UploadObject(cStringIO.StringIO(contents),
373                                      object_metadata, provider='gs')
374
375  def RunGsUtil(self, cmd, return_status=False, return_stdout=False,
376                return_stderr=False, expected_status=0, stdin=None):
377    """Runs the gsutil command.
378
379    Args:
380      cmd: The command to run, as a list, e.g. ['cp', 'foo', 'bar']
381      return_status: If True, the exit status code is returned.
382      return_stdout: If True, the standard output of the command is returned.
383      return_stderr: If True, the standard error of the command is returned.
384      expected_status: The expected return code. If not specified, defaults to
385                       0. If the return code is a different value, an exception
386                       is raised.
387      stdin: A string of data to pipe to the process as standard input.
388
389    Returns:
390      A tuple containing the desired return values specified by the return_*
391      arguments.
392    """
393    cmd = ([gslib.GSUTIL_PATH] + ['--testexceptiontraces'] +
394           ['-o', 'GSUtil:default_project_id=' + PopulateProjectId()] +
395           cmd)
396    if IS_WINDOWS:
397      cmd = [sys.executable] + cmd
398    p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
399                         stdin=subprocess.PIPE)
400    (stdout, stderr) = p.communicate(stdin)
401    status = p.returncode
402
403    if expected_status is not None:
404      self.assertEqual(
405          status, expected_status,
406          msg='Expected status %d, got %d.\nCommand:\n%s\n\nstderr:\n%s' % (
407              expected_status, status, ' '.join(cmd), stderr))
408
409    toreturn = []
410    if return_status:
411      toreturn.append(status)
412    if return_stdout:
413      if IS_WINDOWS:
414        stdout = stdout.replace('\r\n', '\n')
415      toreturn.append(stdout)
416    if return_stderr:
417      if IS_WINDOWS:
418        stderr = stderr.replace('\r\n', '\n')
419      toreturn.append(stderr)
420
421    if len(toreturn) == 1:
422      return toreturn[0]
423    elif toreturn:
424      return tuple(toreturn)
425
426  def RunGsUtilTabCompletion(self, cmd, expected_results=None):
427    """Runs the gsutil command in tab completion mode.
428
429    Args:
430      cmd: The command to run, as a list, e.g. ['cp', 'foo', 'bar']
431      expected_results: The expected tab completion results for the given input.
432    """
433    cmd = [gslib.GSUTIL_PATH] + ['--testexceptiontraces'] + cmd
434    cmd_str = ' '.join(cmd)
435
436    @Retry(AssertionError, tries=5, timeout_secs=1)
437    def _RunTabCompletion():
438      """Runs the tab completion operation with retries."""
439      results_string = None
440      with tempfile.NamedTemporaryFile(
441          delete=False) as tab_complete_result_file:
442        # argcomplete returns results via the '8' file descriptor so we
443        # redirect to a file so we can capture them.
444        cmd_str_with_result_redirect = '%s 8>%s' % (
445            cmd_str, tab_complete_result_file.name)
446        env = os.environ.copy()
447        env['_ARGCOMPLETE'] = '1'
448        env['COMP_LINE'] = cmd_str
449        env['COMP_POINT'] = str(len(cmd_str))
450        subprocess.call(cmd_str_with_result_redirect, env=env, shell=True)
451        results_string = tab_complete_result_file.read().decode(
452            locale.getpreferredencoding())
453      if results_string:
454        results = results_string.split('\013')
455      else:
456        results = []
457      self.assertEqual(results, expected_results)
458
459    # When tests are run in parallel, tab completion could take a long time,
460    # so choose a long timeout value.
461    with SetBotoConfigForTest([('GSUtil', 'tab_completion_timeout', '120')]):
462      _RunTabCompletion()
463
464  @contextmanager
465  def SetAnonymousBotoCreds(self):
466    boto_config_path = self.CreateTempFile(
467        contents=BOTO_CONFIG_CONTENTS_IGNORE_ANON_WARNING)
468    with SetBotoConfigFileForTest(boto_config_path):
469      # Make sure to reset Developer Shell credential port so that the child
470      # gsutil process is really anonymous.
471      with SetEnvironmentForTest({'DEVSHELL_CLIENT_PORT': None}):
472        yield
473