diff --git a/src/macaron/errors.py b/src/macaron/errors.py index 1353de360..fc66d30d0 100644 --- a/src/macaron/errors.py +++ b/src/macaron/errors.py @@ -30,3 +30,7 @@ class ConfigurationError(MacaronError): class CloneError(MacaronError): """Happens when cannot clone a git repository.""" + + +class RepoCheckOutError(MacaronError): + """Happens when there is an error when checking out the correct revision of a git repository.""" diff --git a/src/macaron/slsa_analyzer/analyzer.py b/src/macaron/slsa_analyzer/analyzer.py index 400829dc4..086285ac3 100644 --- a/src/macaron/slsa_analyzer/analyzer.py +++ b/src/macaron/slsa_analyzer/analyzer.py @@ -27,7 +27,7 @@ NoneDependencyAnalyzer, ) from macaron.dependency_analyzer.cyclonedx import get_deps_from_sbom -from macaron.errors import CloneError +from macaron.errors import CloneError, RepoCheckOutError from macaron.output_reporter.reporter import FileReporter from macaron.output_reporter.results import Record, Report, SCMStatus from macaron.slsa_analyzer import git_url @@ -477,7 +477,7 @@ def _prepare_repo( repo_path: str, branch_name: str = "", digest: str = "", - ) -> Git: + ) -> Git | None: """Prepare the target repository for analysis. If ``repo_path`` is a remote path, the target repo is cloned to ``{target_dir}/{unique_path}``. @@ -501,9 +501,10 @@ def _prepare_repo( Returns ------- - Git + Git | None The pydriller.Git object of the repository or None if error. """ + # TODO: separate the logic for handling remote and local repos instead of putting them into this method. # Cannot specify a commit hash without specifying the branch. if not branch_name and digest: logger.error( @@ -560,8 +561,31 @@ def _prepare_repo( logger.error("Cannot reset the target repository.") return None - if not git_url.check_out_repo_target(git_obj, branch_name, digest, (not is_remote)): - logger.error("Cannot checkout the specific branch or commit of the target repo.") + # Checking out the specific branch or commit. This operation varies depends on the git service that the + # repository uses. + if not is_remote: + # If the repo path provided by the user is a local path, we need to get the actual origin remote URL of + # the repo to decide on the suitable git service. + origin_remote_url = git_url.get_remote_origin_of_local_repo(git_obj) + if git_url.is_remote_repo(origin_remote_url): + # The local repo's origin remote url is a remote URL (e.g https://host.com/a/b): In this case, we obtain + # the corresponding git service using ``self.get_git_service``. + git_service = self.get_git_service(origin_remote_url) + else: + # The local repo's origin remote url is a local path (e.g /path/to/local/...). This happens when the + # target repository is a clone from another local repo or is a clone from a git archive - + # https://git-scm.com/docs/git-archive: In this case, we fall-back to the generic function + # ``git_url.check_out_repo_target``. + if not git_url.check_out_repo_target(git_obj, branch_name, digest, not is_remote): + logger.error("Cannot checkout the specific branch or commit of the target repo.") + return None + + return git_obj + + try: + git_service.check_out_repo(git_obj, branch_name, digest, not is_remote) + except RepoCheckOutError as error: + logger.error(error) return None return git_obj diff --git a/src/macaron/slsa_analyzer/git_service/base_git_service.py b/src/macaron/slsa_analyzer/git_service/base_git_service.py index abc283586..07ce940b3 100644 --- a/src/macaron/slsa_analyzer/git_service/base_git_service.py +++ b/src/macaron/slsa_analyzer/git_service/base_git_service.py @@ -5,8 +5,10 @@ from abc import abstractmethod +from pydriller.git import Git + from macaron.config.defaults import defaults -from macaron.errors import CloneError, ConfigurationError +from macaron.errors import CloneError, ConfigurationError, RepoCheckOutError from macaron.slsa_analyzer import git_url @@ -27,7 +29,6 @@ def __init__(self, name: str) -> None: @abstractmethod def load_defaults(self) -> None: """Load the values for this git service from the ini configuration.""" - raise NotImplementedError def load_domain(self, section_name: str) -> str | None: """Load the domain of the git service from the ini configuration section ``section_name``. @@ -110,7 +111,32 @@ def clone_repo(self, clone_dir: str, url: str) -> None: CloneError If there is an error cloning the repo. """ - raise NotImplementedError() + + @abstractmethod + def check_out_repo(self, git_obj: Git, branch: str, digest: str, offline_mode: bool) -> Git: + """Checkout the branch and commit specified by the user of a repository. + + Parameters + ---------- + git_obj : Git + The Git object for the repository to check out. + branch : str + The branch to check out. + digest : str + The sha of the commit to check out. + offline_mode: bool + If true, no fetching is performed. + + Returns + ------- + Git + The same Git object from the input. + + Raises + ------ + RepoError + If there is an error while checking out the specific branch or commit. + """ class NoneGitService(BaseGitService): @@ -154,3 +180,19 @@ def clone_repo(self, _clone_dir: str, url: str) -> None: Always raise, since this method should not be used to clone any repository. """ raise CloneError(f"Internal error encountered when cloning the repo '{url}'.") + + def check_out_repo(self, git_obj: Git, branch: str, digest: str, offline_mode: bool) -> Git: + """Checkout the branch and commit specified by the user of a repository. + + In this particular case, since this class represents a ``None`` git service, + we do nothing but raise a ``RepoError``. + + Raises + ------ + RepoError + Always raise, since this method should not be used to check out in any repository. + """ + raise RepoCheckOutError( + f"Cannot check out branch {branch} and commit {digest} for repo {git_obj.project_name} " + + "from an empty git service" + ) diff --git a/src/macaron/slsa_analyzer/git_service/bitbucket.py b/src/macaron/slsa_analyzer/git_service/bitbucket.py index 576cfd6ca..d129ee347 100644 --- a/src/macaron/slsa_analyzer/git_service/bitbucket.py +++ b/src/macaron/slsa_analyzer/git_service/bitbucket.py @@ -5,6 +5,9 @@ import logging +from pydriller.git import Git + +from macaron.errors import RepoCheckOutError from macaron.slsa_analyzer.git_service.base_git_service import BaseGitService logger: logging.Logger = logging.getLogger(__name__) @@ -26,3 +29,7 @@ def clone_repo(self, _clone_dir: str, _url: str) -> None: """Clone a BitBucket repo.""" # TODO: implement this once support for BitBucket is added. logger.info("Cloning BitBucket repositories is not supported yet. Please clone the repository manually.") + + def check_out_repo(self, git_obj: Git, branch: str, digest: str, offline_mode: bool) -> Git: + """Checkout the branch and commit specified by the user of a repository.""" + raise RepoCheckOutError("Checking out a branch or commit on a Bitbucket repository is not supported yet.") diff --git a/src/macaron/slsa_analyzer/git_service/github.py b/src/macaron/slsa_analyzer/git_service/github.py index da361f79f..49044776c 100644 --- a/src/macaron/slsa_analyzer/git_service/github.py +++ b/src/macaron/slsa_analyzer/git_service/github.py @@ -3,8 +3,10 @@ """This module contains the spec for the GitHub service.""" +from pydriller.git import Git + from macaron.config.global_config import global_config -from macaron.errors import ConfigurationError +from macaron.errors import ConfigurationError, RepoCheckOutError from macaron.slsa_analyzer import git_url from macaron.slsa_analyzer.git_service.api_client import GhAPIClient, get_default_gh_client from macaron.slsa_analyzer.git_service.base_git_service import BaseGitService @@ -56,3 +58,34 @@ def clone_repo(self, clone_dir: str, url: str) -> None: If there is an error cloning the repo. """ git_url.clone_remote_repo(clone_dir, url) + + def check_out_repo(self, git_obj: Git, branch: str, digest: str, offline_mode: bool) -> Git: + """Checkout the branch and commit specified by the user of a repository. + + Parameters + ---------- + git_obj : Git + The Git object for the repository to check out. + branch : str + The branch to check out. + digest : str + The sha of the commit to check out. + offline_mode: bool + If true, no fetching is performed. + + Returns + ------- + Git + The same Git object from the input. + + Raises + ------ + RepoError + If there is error while checkout the specific branch and digest. + """ + if not git_url.check_out_repo_target(git_obj, branch, digest, offline_mode): + raise RepoCheckOutError( + f"Failed to check out branch {branch} and commit {digest} for repo {git_obj.project_name}." + ) + + return git_obj diff --git a/src/macaron/slsa_analyzer/git_service/gitlab.py b/src/macaron/slsa_analyzer/git_service/gitlab.py index 2efb1da0d..2c5abb756 100644 --- a/src/macaron/slsa_analyzer/git_service/gitlab.py +++ b/src/macaron/slsa_analyzer/git_service/gitlab.py @@ -23,7 +23,9 @@ from abc import abstractmethod from urllib.parse import ParseResult, urlunparse -from macaron.errors import CloneError, ConfigurationError +from pydriller.git import Git + +from macaron.errors import CloneError, ConfigurationError, RepoCheckOutError from macaron.slsa_analyzer import git_url from macaron.slsa_analyzer.git_service.base_git_service import BaseGitService @@ -103,6 +105,10 @@ def clone_repo(self, clone_dir: str, url: str) -> None: To clone a GitLab repository with access token, we embed the access token in the https URL. See GitLab documentation: https://docs.gitlab.com/ee/gitlab-basics/start-using-git.html#clone-using-a-token. + If we clone using the https URL with the token embedded, this URL will be stored as plain text in .git/config as + the origin remote URL. Therefore, after a repository is cloned, this remote origin URL will be set + with the value of the original ``url`` (which does not have the embedded token). + Parameters ---------- clone_dir: str @@ -117,7 +123,78 @@ def clone_repo(self, clone_dir: str, url: str) -> None: If there is an error cloning the repository. """ clone_url = self.construct_clone_url(url) - git_url.clone_remote_repo(clone_dir, clone_url) + # In the ``git_url.clone_remote_repo`` function, CloneError exception is raised whenever the repository + # has not been cloned or the clone attempts failed. + # In both cases, the repository would not be available on the file system to contain the token-included URL. + # Therefore, we don't need to catch and handle the CloneError exceptions here. + repo = git_url.clone_remote_repo(clone_dir, clone_url) + + # If ``git_url.clone_remote_repo`` returns an Repo instance, this means that the repository is freshly cloned + # with the token embedded URL. We will set its value back to the original non-token URL. + # If ``git_url.clone_remote_repo`` returns None, it means that the repository already exists so we don't need + # to do anything. + if repo: + try: + origin_remote = repo.remote("origin") + except ValueError as error: + raise CloneError("Cannot find the remote origin for this repository.") from error + + origin_remote.set_url(url) + + def check_out_repo(self, git_obj: Git, branch: str, digest: str, offline_mode: bool) -> Git: + """Checkout the branch and commit specified by the user of a repository. + + For GitLab, this method set the origin remote URL of the target repository to the token-embedded URL if + a token is available before performing the checkout operation. + + After the checkout operation finishes, the origin remote URL is set back again to ensure that no token-embedded + URL remains. + + Parameters + ---------- + git_obj : Git + The Git object for the repository to check out. + branch : str + The branch to check out. + digest : str + The sha of the commit to check out. + offline_mode: bool + If true, no fetching is performed. + + Returns + ------- + Git + The same Git object from the input. + + Raises + ------ + RepoError + If there is error while checkout the specific branch and digest. + """ + remote_origin_url = git_url.get_remote_origin_of_local_repo(git_obj) + + try: + origin_remote = git_obj.repo.remote("origin") + except ValueError as error: + raise RepoCheckOutError("Cannot find the remote origin for this repository.") from error + + try: + reconstructed_url = self.construct_clone_url(remote_origin_url) + except CloneError as error: + raise RepoCheckOutError("Cannot parse the remote origin URL of this repository.") from error + + origin_remote.set_url(reconstructed_url, remote_origin_url) + + check_out_status = git_url.check_out_repo_target(git_obj, branch, digest, offline_mode) + + origin_remote.set_url(remote_origin_url, reconstructed_url) + + if not check_out_status: + raise RepoCheckOutError( + f"Failed to check out branch {branch} and commit {digest} for repo {git_obj.project_name}." + ) + + return git_obj class SelfHostedGitLab(GitLab): diff --git a/tests/slsa_analyzer/git_service/resources/self_hosted_gitlab_repo/test.txt b/tests/slsa_analyzer/git_service/resources/self_hosted_gitlab_repo/test.txt new file mode 100644 index 000000000..6de7b8c69 --- /dev/null +++ b/tests/slsa_analyzer/git_service/resources/self_hosted_gitlab_repo/test.txt @@ -0,0 +1 @@ +This is a test file. diff --git a/tests/slsa_analyzer/git_service/test_gitlab.py b/tests/slsa_analyzer/git_service/test_gitlab.py index 6603bc7eb..aa235e400 100644 --- a/tests/slsa_analyzer/git_service/test_gitlab.py +++ b/tests/slsa_analyzer/git_service/test_gitlab.py @@ -8,10 +8,13 @@ from unittest import mock import pytest +from pydriller.git import Git from macaron.config.defaults import load_defaults from macaron.errors import ConfigurationError +from macaron.slsa_analyzer import git_url from macaron.slsa_analyzer.git_service.gitlab import PubliclyHostedGitLab, SelfHostedGitLab +from tests.slsa_analyzer.mock_git_utils import commit_files, initiate_repo @pytest.mark.parametrize( @@ -111,3 +114,95 @@ def test_self_hosted_gitlab_without_env_set(tmp_path: Path) -> None: with pytest.raises(ConfigurationError): gitlab.load_defaults() + + +@pytest.fixture(name="self_hosted_gitlab") +def self_hosted_gitlab_repo_fixture(request: pytest.FixtureRequest) -> Git: + """Create a mock GitLab self_hosted repo. + + This fixture expects ONE parameter of type STR which will be the origin remote url initialized for the self_hosted + GitLab repo. To see how we could utilize this with pytest.mark.parameterize, see: + https://docs.pytest.org/en/7.1.x/example/parametrize.html?highlight=indirect#indirect-parametrization + https://docs.pytest.org/en/7.1.x/example/parametrize.html?highlight=indirect#apply-indirect-on-particular-arguments + + Returns + ------- + Git + The pydriller.git.Git wrapper object for the self_hosted GitLab repo. + """ + test_repo_path = Path(__file__).parent.joinpath("resources", "self_hosted_gitlab_repo") + + gitlab_repo = initiate_repo(test_repo_path) + + # Commit untracked test files in the mock repo. + # This would only happen the first time this test case is run. + if gitlab_repo.repo.untracked_files: + commit_files(gitlab_repo, gitlab_repo.repo.untracked_files) + + # Assigning the origin remote url to the passed in parameter. + remote_url = request.param + + # For newly init git repositories, the origin remote does not exist. Therefore, we create one and assign it with + # ``remote_url``. + # If the origin remote already exist we reset its URL to be the ``remote_url``. + if "origin" not in gitlab_repo.repo.remotes: + remote_name = "origin" + gitlab_repo.repo.create_remote(remote_name, remote_url) + else: + gitlab_repo.repo.remote("origin").set_url(remote_url) + + yield gitlab_repo + + gitlab_repo.clear() + + +# The indirect parameter is used to note that ``self_hosted_gitlab`` should be passed to the ``self_hosted_gitlab`` +# fixture first. The test function would then receive the final fixture value. +# As for ``expected_origin_url``, this parameter is passed directly into the test function because it's not listed in +# ``indirect``. Reference: +# https://docs.pytest.org/en/7.1.x/example/parametrize.html?highlight=indirect#apply-indirect-on-particular-arguments +@pytest.mark.parametrize( + ("self_hosted_gitlab", "expected_origin_url"), + [ + ("https://oauth2:abcxyz@internal.gitlab.org/a/b", "https://internal.gitlab.org/a/b"), + ("https://oauth2:abcxyz@internal.gitlab.org/a/b.git", "https://internal.gitlab.org/a/b"), + ], + indirect=["self_hosted_gitlab"], +) +def test_origin_remote_url_masking(self_hosted_gitlab: Git, expected_origin_url: str, tmp_path: Path) -> None: + """Test if the ``clone_repo`` and ``check_out_repo`` methods handle masking the token in the clone URL correctly. + + Note that this test ONLY checks if the remote origin URL after the clone/checkout operations is updated back to + the token-less URL. + It does not check whether those operation works correctly. + """ + user_config_input = """ + [git_service.gitlab.self_hosted] + domain = internal.gitlab.org + """ + user_config_path = os.path.join(tmp_path, "config.ini") + with open(user_config_path, "w", encoding="utf-8") as user_config_file: + user_config_file.write(user_config_input) + + # We don't have to worry about modifying the ``defaults`` object causing test + # pollution here, since we reload the ``defaults`` object before every test with the + # ``setup_test`` fixture. + load_defaults(user_config_path) + + with mock.patch.dict(os.environ, {"MCN_SELF_HOSTED_GITLAB_TOKEN": "abcxyz"}): + gitlab = SelfHostedGitLab() + gitlab.load_defaults() + + with mock.patch("macaron.slsa_analyzer.git_url.clone_remote_repo", return_value=self_hosted_gitlab.repo): + # We check the origin remote URL after cloning is as expected. + gitlab.clone_repo(str(tmp_path), expected_origin_url) + assert git_url.get_remote_origin_of_local_repo(self_hosted_gitlab) == expected_origin_url + + with mock.patch("macaron.slsa_analyzer.git_url.check_out_repo_target", return_value=self_hosted_gitlab.repo): + # We check that after checking out the latest commit in the default branch, the origin remote + # URL is as expected. + gitlab.check_out_repo(self_hosted_gitlab, branch="", digest="", offline_mode=True) + assert git_url.get_remote_origin_of_local_repo(self_hosted_gitlab) == expected_origin_url + + gitlab.check_out_repo(self_hosted_gitlab, branch="", digest="", offline_mode=False) + assert git_url.get_remote_origin_of_local_repo(self_hosted_gitlab) == expected_origin_url