1# Copyright 2019 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Class to manage a git repository via python.
15
16This class is to be used to implement git commands over
17a python API and manage the current state of the git repo.
18
19  Typical usage example:
20
21    r_man =  RepoManager('https://github.com/google/oss-fuzz.git')
22    r_man.checkout('5668cc422c2c92d38a370545d3591039fb5bb8d4')
23"""
24import datetime
25import logging
26import os
27import shutil
28
29import utils
30
31
32class RepoManager:
33  """Repo manager."""
34
35  def __init__(self, repo_dir):
36    self.repo_dir = repo_dir
37
38  def _is_git_repo(self):
39    """Test if the current repo dir is a git repo or not.
40
41    Returns:
42      True if the current repo_dir is a valid git repo.
43    """
44    git_path = os.path.join(self.repo_dir, '.git')
45    return os.path.isdir(git_path)
46
47  def git(self, cmd, check_result=False):
48    """Run a git command.
49
50    Args:
51      command: The git command as a list to be run.
52      check_result: Should an exception be thrown on failed command.
53
54    Returns:
55      stdout, stderr, error code.
56    """
57    return utils.execute(['git'] + cmd,
58                         location=self.repo_dir,
59                         check_result=check_result)
60
61  def commit_exists(self, commit):
62    """Checks to see if a commit exists in the project repo.
63
64    Args:
65      commit: The commit SHA you are checking.
66
67    Returns:
68      True if the commit exits in the project.
69    """
70    if not commit.rstrip():
71      return False
72
73    _, _, err_code = self.git(['cat-file', '-e', commit])
74    return not err_code
75
76  def commit_date(self, commit):
77    """Get the date of a commit.
78
79    Args:
80      commit: The commit hash.
81
82    Returns:
83      A datetime representing the date of the commit.
84    """
85    out, _, _ = self.git(['show', '-s', '--format=%ct', commit],
86                         check_result=True)
87    return datetime.datetime.fromtimestamp(int(out), tz=datetime.timezone.utc)
88
89  def get_git_diff(self, base='origin...'):
90    """Gets a list of files that have changed from the repo head.
91
92    Returns:
93      A list of changed file paths or None on Error.
94    """
95    self.fetch_unshallow()
96    # Add '--' so that git knows we aren't talking about files.
97    command = ['diff', '--name-only', base, '--']
98    out, err_msg, err_code = self.git(command)
99    if err_code:
100      logging.error('Git diff failed with error message %s.', err_msg)
101      return None
102    if not out:
103      logging.error('No diff was found.')
104      return None
105    return [line for line in out.splitlines() if line]
106
107  def get_current_commit(self):
108    """Gets the current commit SHA of the repo.
109
110    Returns:
111      The current active commit SHA.
112    """
113    out, _, _ = self.git(['rev-parse', 'HEAD'], check_result=True)
114    return out.strip()
115
116  def get_parent(self, commit, count):
117    """Gets the count'th parent of the given commit.
118
119    Returns:
120      The parent commit SHA.
121    """
122    self.fetch_unshallow()
123    out, _, err_code = self.git(['rev-parse', commit + '~' + str(count)],
124                                check_result=False)
125    if err_code:
126      return None
127
128    return out.strip()
129
130  def fetch_all_remotes(self):
131    """Fetch all remotes for checkouts that track a single branch."""
132    self.git([
133        'config', 'remote.origin.fetch', '+refs/heads/*:refs/remotes/origin/*'
134    ],
135             check_result=True)
136    self.git(['remote', 'update'], check_result=True)
137
138  def get_commit_list(self, newest_commit, oldest_commit=None):
139    """Gets the list of commits(inclusive) between the old and new commits.
140
141    Args:
142      newest_commit: The newest commit to be in the list.
143      oldest_commit: The (optional) oldest commit to be in the list.
144
145    Returns:
146      The list of commit SHAs from newest to oldest.
147
148    Raises:
149      ValueError: When either the oldest or newest commit does not exist.
150      RuntimeError: When there is an error getting the commit list.
151    """
152    self.fetch_unshallow()
153    if oldest_commit and not self.commit_exists(oldest_commit):
154      raise ValueError('The oldest commit %s does not exist' % oldest_commit)
155    if not self.commit_exists(newest_commit):
156      raise ValueError('The newest commit %s does not exist' % newest_commit)
157    if oldest_commit == newest_commit:
158      return [oldest_commit]
159
160    if oldest_commit:
161      commit_range = oldest_commit + '..' + newest_commit
162    else:
163      commit_range = newest_commit
164
165    out, _, err_code = self.git(['rev-list', commit_range])
166    commits = out.split('\n')
167    commits = [commit for commit in commits if commit]
168    if err_code or not commits:
169      raise RuntimeError('Error getting commit list between %s and %s ' %
170                         (oldest_commit, newest_commit))
171
172    # Make sure result is inclusive
173    if oldest_commit:
174      commits.append(oldest_commit)
175    return commits
176
177  def fetch_branch(self, branch):
178    """Fetches a remote branch from origin."""
179    return self.git(
180        ['fetch', 'origin', '{branch}:{branch}'.format(branch=branch)])
181
182  def fetch_unshallow(self):
183    """Gets the current git repository history."""
184    shallow_file = os.path.join(self.repo_dir, '.git', 'shallow')
185    if os.path.exists(shallow_file):
186      _, err, err_code = self.git(['fetch', '--unshallow'], check_result=False)
187      if err_code:
188        logging.error('Unshallow returned non-zero code: %s', err)
189
190  def checkout_pr(self, pr_ref):
191    """Checks out a remote pull request.
192
193    Args:
194      pr_ref: The pull request reference to be checked out.
195    """
196    self.fetch_unshallow()
197    self.git(['fetch', 'origin', pr_ref], check_result=True)
198    self.git(['checkout', '-f', 'FETCH_HEAD'], check_result=True)
199
200  def checkout_commit(self, commit, clean=True):
201    """Checks out a specific commit from the repo.
202
203    Args:
204      commit: The commit SHA to be checked out.
205
206    Raises:
207      RuntimeError: when checkout is not successful.
208      ValueError: when commit does not exist.
209    """
210    self.fetch_unshallow()
211    if not self.commit_exists(commit):
212      raise ValueError('Commit %s does not exist in current branch' % commit)
213    self.git(['checkout', '-f', commit], check_result=True)
214    if clean:
215      self.git(['clean', '-fxd'], check_result=True)
216    if self.get_current_commit() != commit:
217      raise RuntimeError('Error checking out commit %s' % commit)
218
219  def remove_repo(self):
220    """Removes the git repo from disk."""
221    if os.path.isdir(self.repo_dir):
222      shutil.rmtree(self.repo_dir)
223
224
225def clone_repo_and_get_manager(repo_url, base_dir, repo_name=None):
226  """Clones a repo and constructs a repo manager class.
227
228    Args:
229      repo_url: The github url needed to clone.
230      base_dir: The full file-path where the git repo is located.
231      repo_name: The name of the directory the repo is cloned to.
232    """
233  if repo_name is None:
234    repo_name = os.path.basename(repo_url).replace('.git', '')
235  repo_dir = os.path.join(base_dir, repo_name)
236  manager = RepoManager(repo_dir)
237
238  if not os.path.exists(repo_dir):
239    _clone(repo_url, base_dir, repo_name)
240
241  return manager
242
243
244def _clone(repo_url, base_dir, repo_name):
245  """Creates a clone of the repo in the specified directory.
246
247     Raises:
248       ValueError: when the repo is not able to be cloned.
249  """
250  utils.execute(['git', 'clone', repo_url, repo_name],
251                location=base_dir,
252                check_result=True)
253