1# Copyright (C) 2020 The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Module to check updates from crates.io."""
15
16import json
17import os
18# pylint: disable=g-importing-member
19from pathlib import Path
20import re
21import urllib.request
22
23import archive_utils
24from base_updater import Updater
25# pylint: disable=import-error
26import metadata_pb2  # type: ignore
27import updater_utils
28
29CRATES_IO_URL_PATTERN: str = (r"^https:\/\/crates.io\/crates\/([-\w]+)")
30
31CRATES_IO_URL_RE: re.Pattern = re.compile(CRATES_IO_URL_PATTERN)
32
33ALPHA_BETA_PATTERN: str = (r"^.*[0-9]+\.[0-9]+\.[0-9]+-(alpha|beta).*")
34
35ALPHA_BETA_RE: re.Pattern = re.compile(ALPHA_BETA_PATTERN)
36
37VERSION_PATTERN: str = (r"([0-9]+)\.([0-9]+)\.([0-9]+)")
38
39VERSION_MATCHER: re.Pattern = re.compile(VERSION_PATTERN)
40
41DESCRIPTION_PATTERN: str = (r"^description *= *(\".+\")")
42
43DESCRIPTION_MATCHER: re.Pattern = re.compile(DESCRIPTION_PATTERN)
44
45
46class CratesUpdater(Updater):
47    """Updater for crates.io packages."""
48
49    download_url: str
50    package: str
51
52    def is_supported_url(self) -> bool:
53        if self._old_url.type != metadata_pb2.URL.HOMEPAGE:
54            return False
55        match = CRATES_IO_URL_RE.match(self._old_url.value)
56        if match is None:
57            return False
58        self.package = match.group(1)
59        return True
60
61    def _get_version_numbers(self, version: str) -> (int, int, int):
62        match = VERSION_MATCHER.match(version)
63        if match is not None:
64            return tuple(int(match.group(i)) for i in range(1, 4))
65        return (0, 0, 0)
66
67    def _is_newer_version(self, prev_version: str, prev_id: int,
68                          check_version: str, check_id: int):
69        """Return true if check_version+id is newer than prev_version+id."""
70        return ((self._get_version_numbers(check_version), check_id) >
71                (self._get_version_numbers(prev_version), prev_id))
72
73    def _find_latest_non_test_version(self) -> None:
74        url = "https://crates.io/api/v1/crates/{}/versions".format(self.package)
75        with urllib.request.urlopen(url) as request:
76            data = json.loads(request.read().decode())
77        last_id = 0
78        self._new_ver = ""
79        for v in data["versions"]:
80            version = v["num"]
81            if (not v["yanked"] and not ALPHA_BETA_RE.match(version) and
82                self._is_newer_version(
83                    self._new_ver, last_id, version, int(v["id"]))):
84                last_id = int(v["id"])
85                self._new_ver = version
86                self.download_url = "https://crates.io" + v["dl_path"]
87
88    def check(self) -> None:
89        """Checks crates.io and returns whether a new version is available."""
90        url = "https://crates.io/api/v1/crates/" + self.package
91        with urllib.request.urlopen(url) as request:
92            data = json.loads(request.read().decode())
93            self._new_ver = data["crate"]["max_version"]
94        # Skip d.d.d-{alpha,beta}* versions
95        if ALPHA_BETA_RE.match(self._new_ver):
96            print("Ignore alpha or beta release: {}-{}."
97                  .format(self.package, self._new_ver))
98            self._find_latest_non_test_version()
99        else:
100            url = url + "/" + self._new_ver
101            with urllib.request.urlopen(url) as request:
102                data = json.loads(request.read().decode())
103                self.download_url = "https://crates.io" + data["version"]["dl_path"]
104
105    def use_current_as_latest(self):
106        Updater.use_current_as_latest(self)
107        # A shortcut to use the static download path.
108        self.download_url = "https://static.crates.io/crates/{}/{}-{}.crate".format(
109            self.package, self.package, self._new_ver)
110
111    def update(self) -> None:
112        """Updates the package.
113
114        Has to call check() before this function.
115        """
116        try:
117            temporary_dir = archive_utils.download_and_extract(self.download_url)
118            package_dir = archive_utils.find_archive_root(temporary_dir)
119            updater_utils.replace_package(package_dir, self._proj_path)
120            self.check_for_errors()
121        finally:
122            urllib.request.urlcleanup()
123
124    # pylint: disable=no-self-use
125    def update_metadata(self, metadata: metadata_pb2.MetaData,
126                        full_path: Path) -> None:
127        """Updates METADATA content."""
128        # copy only HOMEPAGE url, and then add new ARCHIVE url.
129        new_url_list = []
130        for url in metadata.third_party.url:
131            if url.type == metadata_pb2.URL.HOMEPAGE:
132                new_url_list.append(url)
133        new_url = metadata_pb2.URL()
134        new_url.type = metadata_pb2.URL.ARCHIVE
135        new_url.value = "https://static.crates.io/crates/{}/{}-{}.crate".format(
136            metadata.name, metadata.name, metadata.third_party.version)
137        new_url_list.append(new_url)
138        del metadata.third_party.url[:]
139        metadata.third_party.url.extend(new_url_list)
140        # copy description from Cargo.toml to METADATA
141        cargo_toml = os.path.join(full_path, "Cargo.toml")
142        description = self._get_cargo_description(cargo_toml)
143        if description and description != metadata.description:
144            print("New METADATA description:", description)
145            metadata.description = description
146
147    def check_for_errors(self) -> None:
148        # Check for .rej patches from failing to apply patches.
149        # If this has too many false positives, we could either
150        # check if the files are modified by patches or somehow
151        # track which files existed before the patching.
152        rejects = list(self._proj_path.glob('**/*.rej'))
153        if len(rejects) > 0:
154            print("Error: Found patch reject files: %s" % str(rejects))
155            self._has_errors = True
156        # Check for Cargo errors embedded in Android.bp.
157        # Note that this should stay in sync with cargo2android.py.
158        with open('%s/Android.bp' % self._proj_path, 'r') as bp_file:
159            for line in bp_file:
160                if line.strip() == "Errors in cargo.out:":
161                    print("Error: Found Cargo errors in Android.bp")
162                    self._has_errors = True
163                    return
164
165    def _toml2str(self, line: str) -> str:
166        """Convert a quoted toml string to a Python str without quotes."""
167        if line.startswith("\"\"\""):
168            return ""  # cannot handle broken multi-line description
169        # TOML string escapes: \b \t \n \f \r \" \\ (no unicode escape)
170        line = line[1:-1].replace("\\\\", "\n").replace("\\b", "")
171        line = line.replace("\\t", " ").replace("\\n", " ").replace("\\f", " ")
172        line = line.replace("\\r", "").replace("\\\"", "\"").replace("\n", "\\")
173        # replace a unicode quotation mark, used in the libloading crate
174        return line.replace("’", "'").strip()
175
176    def _get_cargo_description(self, cargo_toml: str) -> str:
177        """Return the description in Cargo.toml or empty string."""
178        if os.path.isfile(cargo_toml) and os.access(cargo_toml, os.R_OK):
179            with open(cargo_toml, "r") as toml_file:
180                for line in toml_file:
181                    match = DESCRIPTION_MATCHER.match(line)
182                    if match:
183                        return self._toml2str(match.group(1))
184        return ""
185