1# Copyright (C) 2018 The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Module to update packages from GitHub archive."""
15
16import json
17import re
18import time
19import urllib.request
20import urllib.error
21from typing import List, Optional, Tuple
22
23import archive_utils
24from base_updater import Updater
25import git_utils
26# pylint: disable=import-error
27import metadata_pb2  # type: ignore
28import updater_utils
29
30GITHUB_URL_PATTERN: str = (r'^https:\/\/github.com\/([-\w]+)\/([-\w]+)\/' +
31                           r'(releases\/download\/|archive\/)')
32GITHUB_URL_RE: re.Pattern = re.compile(GITHUB_URL_PATTERN)
33
34
35def _edit_distance(str1: str, str2: str) -> int:
36    prev = list(range(0, len(str2) + 1))
37    for i, chr1 in enumerate(str1):
38        cur = [i + 1]
39        for j, chr2 in enumerate(str2):
40            if chr1 == chr2:
41                cur.append(prev[j])
42            else:
43                cur.append(min(prev[j + 1], prev[j], cur[j]) + 1)
44        prev = cur
45    return prev[len(str2)]
46
47
48def choose_best_url(urls: List[str], previous_url: str) -> str:
49    """Returns the best url to download from a list of candidate urls.
50
51    This function calculates similarity between previous url and each of new
52    urls. And returns the one best matches previous url.
53
54    Similarity is measured by editing distance.
55
56    Args:
57        urls: Array of candidate urls.
58        previous_url: String of the url used previously.
59
60    Returns:
61        One url from `urls`.
62    """
63    return min(urls,
64               default="",
65               key=lambda url: _edit_distance(url, previous_url))
66
67
68class GithubArchiveUpdater(Updater):
69    """Updater for archives from GitHub.
70
71    This updater supports release archives in GitHub. Version is determined by
72    release name in GitHub.
73    """
74
75    VERSION_FIELD: str = 'tag_name'
76    owner: str
77    repo: str
78
79    def is_supported_url(self) -> bool:
80        if self._old_url.type != metadata_pb2.URL.ARCHIVE:
81            return False
82        match = GITHUB_URL_RE.match(self._old_url.value)
83        if match is None:
84            return False
85        try:
86            self.owner, self.repo = match.group(1, 2)
87        except IndexError:
88            return False
89        return True
90
91    def _fetch_latest_release(self) -> Optional[Tuple[str, List[str]]]:
92        # pylint: disable=line-too-long
93        url = f'https://api.github.com/repos/{self.owner}/{self.repo}/releases/latest'
94        try:
95            with urllib.request.urlopen(url) as request:
96                data = json.loads(request.read().decode())
97        except urllib.error.HTTPError as err:
98            if err.code == 404:
99                return None
100            raise
101        supported_assets = [
102            a['browser_download_url'] for a in data['assets']
103            if archive_utils.is_supported_archive(a['browser_download_url'])
104        ]
105        return (data[self.VERSION_FIELD], supported_assets)
106
107    def _fetch_latest_tag(self) -> Tuple[str, List[str]]:
108        page = 1
109        tags: List[str] = []
110        # fetches at most 20 pages.
111        for page in range(1, 21):
112            # Sleeps 10s to avoid rate limit.
113            time.sleep(10)
114            # pylint: disable=line-too-long
115            url = f'https://api.github.com/repos/{self.owner}/{self.repo}/tags?page={page}'
116            with urllib.request.urlopen(url) as request:
117                data = json.loads(request.read().decode())
118            if len(data) == 0:
119                break
120            tags.extend(d['name'] for d in data)
121        return (updater_utils.get_latest_version(self._old_ver, tags), [])
122
123    def _fetch_latest_version(self) -> None:
124        """Checks upstream and gets the latest release tag."""
125        self._new_ver, urls = (self._fetch_latest_release()
126                               or self._fetch_latest_tag())
127
128        # Adds source code urls.
129        urls.append('https://github.com/{}/{}/archive/{}.tar.gz'.format(
130            self.owner, self.repo, self._new_ver))
131        urls.append('https://github.com/{}/{}/archive/{}.zip'.format(
132            self.owner, self.repo, self._new_ver))
133
134        self._new_url.value = choose_best_url(urls, self._old_url.value)
135
136    def _fetch_latest_commit(self) -> None:
137        """Checks upstream and gets the latest commit to master."""
138
139        # pylint: disable=line-too-long
140        url = f'https://api.github.com/repos/{self.owner}/{self.repo}/commits/master'
141        with urllib.request.urlopen(url) as request:
142            data = json.loads(request.read().decode())
143        self._new_ver = data['sha']
144        self._new_url.value = (
145            # pylint: disable=line-too-long
146            f'https://github.com/{self.owner}/{self.repo}/archive/{self._new_ver}.zip'
147        )
148
149    def check(self) -> None:
150        """Checks update for package.
151
152        Returns True if a new version is available.
153        """
154        if git_utils.is_commit(self._old_ver):
155            self._fetch_latest_commit()
156        else:
157            self._fetch_latest_version()
158
159    def update(self) -> None:
160        """Updates the package.
161
162        Has to call check() before this function.
163        """
164        temporary_dir = None
165        try:
166            temporary_dir = archive_utils.download_and_extract(
167                self._new_url.value)
168            package_dir = archive_utils.find_archive_root(temporary_dir)
169            updater_utils.replace_package(package_dir, self._proj_path)
170        finally:
171            # Don't remove the temporary directory, or it'll be impossible
172            # to debug the failure...
173            # shutil.rmtree(temporary_dir, ignore_errors=True)
174            urllib.request.urlcleanup()
175