1#!/usr/bin/env python
2#
3# Copyright 2016 Google Inc.
4#
5# Use of this source code is governed by a BSD-style license that can be
6# found in the LICENSE file.
7
8
9"""Tests for asset_utils."""
10
11
12import asset_utils
13import os
14import shutil
15import subprocess
16import sys
17import tempfile
18import unittest
19import uuid
20
21
22FILE_DIR = os.path.dirname(os.path.abspath(__file__))
23INFRA_BOTS_DIR = os.path.realpath(os.path.join(FILE_DIR, os.pardir))
24sys.path.insert(0, INFRA_BOTS_DIR)
25import test_utils
26import utils
27
28
29CIPD_DEV_SERVICE_URL = 'https://chrome-infra-packages-dev.appspot.com'
30GS_BUCKET = 'skia-infra-testdata'
31
32
33def _fake_prompt(result):
34  """Make a function that pretends to prompt for input and returns a result."""
35  return lambda s: result
36
37
38def _write_stuff(target_dir):
39  """Write some files and directories into target_dir."""
40  fw = test_utils.FileWriter(target_dir)
41  fw.mkdir('mydir')
42  fw.mkdir('anotherdir', 0666)
43  fw.mkdir('dir3', 0600)
44  fw.mkdir('subdir')
45  fw.write('a.txt', 0777)
46  fw.write('b.txt', 0751)
47  fw.write('c.txt', 0640)
48  fw.write(os.path.join('subdir', 'd.txt'), 0640)
49
50
51class _LocalStore(object):
52  """Local store used for testing."""
53  def __init__(self):
54    self.dir = tempfile.mkdtemp()
55
56  def get_available_versions(self, name):
57    target = os.path.join(self.dir, name)
58    if not os.path.isdir(target):
59      return []
60    contents = os.listdir(os.path.join(self.dir, name))
61    return sorted([int(d) for d in contents])
62
63  def upload(self, name, version, target_dir, extra_tags=None):
64    shutil.copytree(target_dir, os.path.join(self.dir, name, str(version)))
65
66  def download(self, name, version, target_dir):
67    shutil.copytree(os.path.join(self.dir, name, str(version)), target_dir)
68
69  def delete_contents(self, name):
70    try:
71      shutil.rmtree(self.dir)
72    except OSError:
73      if os.path.exists(self.dir):
74        raise
75
76
77class StoreTest(unittest.TestCase):
78  """Superclass used for testing one of the stores."""
79  def setUp(self):
80    self.asset_name = str(uuid.uuid4())
81
82  def tearDown(self):
83    pass
84
85  def _test_upload_download(self, store):
86    with utils.tmp_dir():
87      # Create input files and directories.
88      input_dir = os.path.join(os.getcwd(), 'input')
89      _write_stuff(input_dir)
90
91      # Upload a version, download it again.
92      store.upload(self.asset_name, 0, input_dir)
93      output_dir = os.path.join(os.getcwd(), 'output')
94      store.download(self.asset_name, 0, output_dir)
95
96      # Compare.
97      test_utils.compare_trees(self, input_dir, output_dir)
98
99  def _test_versions(self, store):
100    with utils.tmp_dir():
101      # Create input files and directories.
102      input_dir = os.path.join(os.getcwd(), 'input')
103      _write_stuff(input_dir)
104      self.assertEqual(store.get_available_versions(self.asset_name), [])
105      store.upload(self.asset_name, 0, input_dir)
106      self.assertEqual(store.get_available_versions(self.asset_name), [0])
107      store.upload(self.asset_name, 1, input_dir)
108      self.assertEqual(store.get_available_versions(self.asset_name), [0, 1])
109      store.delete_contents(self.asset_name)
110      self.assertEqual(store.get_available_versions(self.asset_name), [])
111
112
113class LocalStoreTest(StoreTest):
114  """Test the local store."""
115  def setUp(self):
116    super(LocalStoreTest, self).setUp()
117    self._store = _LocalStore()
118
119  def tearDown(self):
120    self._store.delete_contents(self.asset_name)
121    super(LocalStoreTest, self).tearDown()
122
123  def test_upload_download(self):
124    self._test_upload_download(self._store)
125
126  def test_versions(self):
127    self._test_versions(self._store)
128
129
130# This test is disabled due to permissions issues with CIPD.
131#class CIPDStoreTest(StoreTest):
132#  """Test the CIPD store."""
133#  def setUp(self):
134#    super(CIPDStoreTest, self).setUp()
135#    self._store = asset_utils.CIPDStore(cipd_url=CIPD_DEV_SERVICE_URL)
136#
137#  def tearDown(self):
138#    self._store.delete_contents(self.asset_name)
139#    super(CIPDStoreTest, self).tearDown()
140#
141#  def test_upload_download(self):
142#    self._test_upload_download(self._store)
143#
144#  def test_versions(self):
145#    self._test_versions(self._store)
146
147
148# This test is disabled because the interactions with GS can be flaky.
149#class GSStoreTest(StoreTest):
150#  """Test the GS store."""
151#  def setUp(self):
152#    super(GSStoreTest, self).setUp()
153#    self._store = asset_utils.GSStore(gsutil=None, bucket=GS_BUCKET)
154#
155#  def tearDown(self):
156#    self._store.delete_contents(self.asset_name)
157#    super(GSStoreTest, self).tearDown()
158#
159#  def test_upload_download(self):
160#    self._test_upload_download(self._store)
161#
162#  def test_versions(self):
163#    self._test_versions(self._store)
164
165
166class AssetTest(unittest.TestCase):
167  """Test Asset operations using a local store."""
168  def setUp(self):
169    self.asset_name = str(uuid.uuid4())
170    self.old_prompt = asset_utils._prompt
171    asset_utils._prompt = _fake_prompt('y')
172    self._store = _LocalStore()
173    self.a = asset_utils.Asset.add(self.asset_name, self._store)
174
175  def tearDown(self):
176    if self.a:
177      self.a.remove(remove_in_store=True)
178    asset_utils._prompt = self.old_prompt
179
180    gs_path = 'gs://%s/assets/%s' % (GS_BUCKET, self.asset_name)
181    attempt_delete = True
182    try:
183      subprocess.check_call(['gsutil', 'ls', gs_path])
184    except subprocess.CalledProcessError:
185      attempt_delete = False
186    if attempt_delete:
187      subprocess.check_call(['gsutil', 'rm', '-rf', gs_path])
188
189  def test_add_remove(self):
190    # Ensure that we can't create an asset twice.
191    with self.assertRaises(Exception):
192      asset_utils.Asset.add(self.asset_name, self._store)
193
194    # Ensure that the asset dir exists.
195    asset_dir = os.path.join(FILE_DIR, self.asset_name)
196    self.assertTrue(os.path.isdir(asset_dir))
197
198    # Remove the asset, ensure that it's gone.
199    self.a.remove()
200    self.a = None
201    self.assertFalse(os.path.exists(asset_dir))
202
203  def test_upload_download(self):
204    with utils.tmp_dir():
205      # Create input files and directories.
206      input_dir = os.path.join(os.getcwd(), 'input')
207      _write_stuff(input_dir)
208
209      # Upload a version, download it again.
210      self.a.upload_new_version(input_dir)
211      output_dir = os.path.join(os.getcwd(), 'output')
212      self.a.download_current_version(output_dir)
213
214      # Compare.
215      test_utils.compare_trees(self, input_dir, output_dir)
216
217  def test_versions(self):
218    with utils.tmp_dir():
219      # Create input files and directories.
220      input_dir = os.path.join(os.getcwd(), 'input')
221      _write_stuff(input_dir)
222
223      self.assertEqual(self.a.get_current_version(), -1)
224      self.assertEqual(self.a.get_available_versions(), [])
225      self.assertEqual(self.a.get_next_version(), 0)
226
227      self.a.upload_new_version(input_dir)
228
229      self.assertEqual(self.a.get_current_version(), 0)
230      self.assertEqual(self.a.get_available_versions(), [0])
231      self.assertEqual(self.a.get_next_version(), 1)
232
233      self.a.upload_new_version(input_dir)
234
235      self.assertEqual(self.a.get_current_version(), 1)
236      self.assertEqual(self.a.get_available_versions(), [0, 1])
237      self.assertEqual(self.a.get_next_version(), 2)
238
239
240if __name__ == '__main__':
241  unittest.main()
242