2# Copyright (C) 2018 The Android Open Source Project
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
8#      http://www.apache.org/licenses/LICENSE-2.0
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
17import datetime
18import random
19import string
20import unittest
23    from unittest import mock
24except ImportError:
25    import mock
27from google.appengine.ext import ndb
28from google.appengine.ext import testbed
30from webapp.src import vtslab_status as Status
31from webapp.src.proto import model
34class UnitTestBase(unittest.TestCase):
35    """Base class for unittest.
37    Attributes:
38        testbed: A Testbed instance which provides local unit testing.
39        random_strs: a list of strings generated by GetRandomString() method
40                     in order to avoid duplicates.
41    """
42    random_strs = []
44    def setUp(self):
45        """Initializes unittest."""
46        # Create the Testbed class instance and initialize service stubs.
47        self.testbed = testbed.Testbed()
48        self.testbed.activate()
49        self.testbed.init_datastore_v3_stub()
50        self.testbed.init_memcache_stub()
51        self.testbed.init_mail_stub()
52        self.testbed.setup_env(app_id="vtslab-schedule-unittest")
53        # Clear cache between tests.
54        ndb.get_context().clear_cache()
56    def tearDown(self):
57        self.testbed.deactivate()
59    def GetRandomString(self, length=7):
60        """Generates and returns a random string.
62        Args:
63            length: an integer, string length.
65        Returns:
66            a random string.
67        """
68        new_str = ""
69        while new_str == "" or new_str in self.random_strs:
70            new_str = "".join(
71                random.choice(string.ascii_letters + string.digits)
72                for _ in range(length))
73        return new_str
75    def GenerateLabModel(self, lab_name=None, host_name=None):
76        """Builds model.LabModel with given information.
78        Args:
79            lab_name: a string, lab name.
80            host_name: a string, host name.
82        Returns:
83            model.LabModel instance.
84        """
85        lab = model.LabModel()
86        lab.name = lab_name if lab_name else self.GetRandomString()
87        lab.hostname = host_name if host_name else self.GetRandomString()
88        lab.owner = "test@abc.com"
89        lab.ip = ""
90        return lab
92    def GenerateDeviceModel(
93            self,
94            status=Status.DEVICE_STATUS_DICT["fastboot"],
95            scheduling_status=Status.DEVICE_SCHEDULING_STATUS_DICT["free"],
96            **kwargs):
97        """Builds model.DeviceModel with given information.
99        Args:
100            status: an integer, device's initial status.
101            scheduling_status: an integer, device's initial scheduling status.
102            **kwargs: the optional arguments.
104        Returns:
105            model.DeviceModel instance.
106        """
107        device = model.DeviceModel()
108        device.status = status
109        device.scheduling_status = scheduling_status
110        device.timestamp = datetime.datetime.now()
112        skip_list = ["status", "scheduling_status", "timestamp"]
113        set_or_empty = []
114        for arg in device._properties:
115            if arg in skip_list or (arg in set_or_empty and arg not in kwargs):
116                continue
117            if arg in kwargs:
118                value = kwargs[arg]
119            elif isinstance(device._properties[arg], ndb.StringProperty):
120                value = self.GetRandomString()
121            elif isinstance(device._properties[arg], ndb.IntegerProperty):
122                value = 0
123            elif isinstance(device._properties[arg], ndb.BooleanProperty):
124                value = False
125            else:
126                print("A type of property '{}' is not supported.".format(arg))
127                continue
128            if device._properties[arg]._repeated and type(value) is not list:
129                value = [value]
130            setattr(device, arg, value)
131        return device
133    def GenerateScheduleModel(
134            self,
135            device_model=None,
136            lab_model=None,
137            priority="medium",
138            period=360,
139            retry_count=1,
140            shards=1,
141            lab_name=None,
142            device_storage_type=Status.STORAGE_TYPE_DICT["PAB"],
143            device_branch=None,
144            device_target=None,
145            gsi_storage_type=Status.STORAGE_TYPE_DICT["PAB"],
146            gsi_build_target=None,
147            test_storage_type=Status.STORAGE_TYPE_DICT["PAB"],
148            test_build_target=None,
149            required_signed_device_build=False,
150            **kwargs):
151        """Builds model.ScheduleModel with given information.
153        Args:
154            device_model: a model.DeviceModel instance to refer device product.
155            lab_model: a model.LabModel instance to refer host name.
156            priority: a string, scheduling priority
157            period: an integer, scheduling period.
158            retry_count: an integer, scheduling retry count.
159            shards: an integer, # ways of device shards.
160            lab_name: a string, target lab name.
161            device_storage_type: an integer, device storage type
162            device_branch: a string, device build branch.
163            device_target: a string, device build target.
164            gsi_storage_type: an integer, GSI storage type
165            gsi_build_target: a string, GSI build target.
166            test_storage_type: an integer, test storage type
167            test_build_target: a string, test build target.
168            required_signed_device_build: a boolean, True to schedule for signed
169                                          device build, False if not.
170            **kwargs: the optional arguments.
172        Returns:
173            model.ScheduleModel instance.
174        """
176        if device_model:
177            device_product = device_model.product
178            device_target = self.GetRandomString(4)
179        elif device_target:
180            device_product, device_target = device_target.split("-")
181        else:
182            device_product = self.GetRandomString(7)
183            device_target = self.GetRandomString(4)
185        if lab_model:
186            lab = lab_model.name
187        elif lab_name:
188            lab = lab_name
189        else:
190            lab = self.GetRandomString()
192        schedule = model.ScheduleModel()
193        schedule.priority = priority
194        schedule.priority_value = Status.GetPriorityValue(schedule.priority)
195        schedule.period = period
196        schedule.shards = shards
197        schedule.retry_count = retry_count
198        schedule.required_signed_device_build = required_signed_device_build
199        schedule.build_storage_type = device_storage_type
200        schedule.manifest_branch = (device_branch if device_branch else
201                                    self.GetRandomString())
202        schedule.build_target = "-".join([device_product, device_target])
204        schedule.gsi_storage_type = gsi_storage_type
205        schedule.gsi_build_target = (gsi_build_target
206                                     if gsi_build_target else "-".join([
207                                         self.GetRandomString(),
208                                         self.GetRandomString(4)
209                                     ]))
210        schedule.test_storage_type = test_storage_type
211        schedule.test_build_target = (test_build_target
212                                      if test_build_target else "-".join([
213                                          self.GetRandomString(),
214                                          self.GetRandomString(4)
215                                      ]))
216        schedule.device = []
217        schedule.device.append("/".join([lab, device_product]))
219        schedule.timestamp = datetime.datetime.now()
221        skip_list = [
222            "priority", "priority_value", "period", "shards",
223            "retry_count", "required_signed_device_build",
224            "build_storage_type", "manifest_branch", "build_target",
225            "gsi_storage_type", "gsi_build_target",
226            "test_storage_type", "test_build_target", "device",
227            "children_jobs"]
228        set_or_empty = ["required_host_equipment", "required_device_equipment"]
229        for arg in schedule._properties:
230            if arg in skip_list or (arg in set_or_empty and arg not in kwargs):
231                continue
232            if arg in kwargs:
233                value = kwargs[arg]
234            elif isinstance(schedule._properties[arg], ndb.StringProperty):
235                value = self.GetRandomString()
236            elif isinstance(schedule._properties[arg], ndb.IntegerProperty):
237                value = 0
238            elif isinstance(schedule._properties[arg], ndb.BooleanProperty):
239                value = False
240            else:
241                print("A type of property '{}' is not supported.".format(arg))
242                continue
243            if schedule._properties[arg]._repeated and type(value) is not list:
244                value = [value]
245            setattr(schedule, arg, value)
247        return schedule
249    def GenerateBuildModel(self, schedule, targets=None):
250        """Builds model.BuildModel with given information.
252        Args:
253            schedule: a model.ScheduleModel instance to look up build info.
254            targets: a list of strings which indicates artifact type.
256        Returns:
257            model.BuildModel instance.
258        """
259        build_dict = {}
260        if targets is None:
261            targets = ["device", "gsi", "test"]
262        for target in targets:
263            build = model.BuildModel()
264            build.artifact_type = target
265            build.timestamp = datetime.datetime.now()
266            if target == "device":
267                build.signed = schedule.required_signed_device_build
268                build.manifest_branch = schedule.manifest_branch
269                build.build_target, build.build_type = (
270                    schedule.build_target.split("-"))
271            elif target == "gsi":
272                build.manifest_branch = schedule.gsi_branch
273                build.build_target, build.build_type = (
274                    schedule.gsi_build_target.split("-"))
275            elif target == "test":
276                build.manifest_branch = schedule.test_branch
277                build.build_target, build.build_type = (
278                    schedule.test_build_target.split("-"))
279            build.build_id = self.GetNewBuildId(build)
280            build_dict[target] = build
281        return build_dict
283    def GetNewBuildId(self, build):
284        """Generates build ID.
286        This method always generates newest (higher number) build ID than other
287        builds stored in testbed datastore.
289        Args:
290            build: a model.BuildModel instance to look up build information
291                   from testbed datastore.
293        Returns:
294            a string, build ID.
295        """
296        format_string = "{0:07d}"
297        build_query = model.BuildModel.query(
298            model.BuildModel.artifact_type == build.artifact_type,
299            model.BuildModel.build_target == build.build_target,
300            model.BuildModel.signed == build.signed,
301            model.BuildModel.manifest_branch == build.manifest_branch)
302        exiting_builds = build_query.fetch()
303        if exiting_builds:
304            exiting_builds.sort(key=lambda x: x.build_id, reverse=True)
305            latest_build_id = int(exiting_builds[0].build_id)
306            return format_string.format(latest_build_id + 1)
307        else:
308            return format_string.format(1)
310    def PassTime(self, hours=0, minutes=0, seconds=0):
311        """Assumes that a certain amount of time has passed.
313        This method changes does not change actual system time but changes all
314        jobs timestamp to assume time has passed.
316        Args:
317            hours: an integer, number of hours to pass time.
318            minutes: an integer, number of minutes to pass time.
319            seconds: an integer, number of seconds to pass time.
320        """
321        if not hours and not minutes and not seconds:
322            return
324        jobs = model.JobModel.query().fetch()
325        to_put = []
326        for job in jobs:
327            if job.timestamp:
328                job.timestamp -= datetime.timedelta(
329                    hours=hours, minutes=minutes, seconds=seconds)
330            if job.heartbeat_stamp:
331                job.heartbeat_stamp -= datetime.timedelta(
332                    hours=hours, minutes=minutes, seconds=seconds)
333            to_put.append(job)
334        if to_put:
335            ndb.put_multi(to_put)
337    def ResetDevices(self):
338        """Resets all devices to ready status."""
339        devices = model.DeviceModel.query().fetch()
340        to_put = []
341        for device in devices:
342            device.status = Status.DEVICE_STATUS_DICT["fastboot"]
343            device.scheduling_status = Status.DEVICE_SCHEDULING_STATUS_DICT[
344                "free"]
345            to_put.append(device)
346        if to_put:
347            ndb.put_multi(to_put)