Skip to content

Commit

Permalink
feat: backport skopeo class
Browse files Browse the repository at this point in the history
Copies the Skopeo class from:
#1696
  • Loading branch information
lengau committed Jun 13, 2024
1 parent d4d5da9 commit 539230c
Show file tree
Hide file tree
Showing 3 changed files with 281 additions and 0 deletions.
2 changes: 2 additions & 0 deletions charmcraft/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
get_charm_name_from_path,
get_templates_environment,
)
from charmcraft.utils.skopeo import Skopeo
from charmcraft.utils.store import ChannelData, Risk
from charmcraft.utils.yaml import load_yaml

Expand Down Expand Up @@ -96,5 +97,6 @@
"find_charm_sources",
"get_charm_name_from_path",
"get_templates_environment",
"Skopeo",
"load_yaml",
]
153 changes: 153 additions & 0 deletions charmcraft/utils/skopeo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
# Copyright 2024 Canonical Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# For further info, check https://github.com/canonical/charmcraft
"""A wrapper around Skopeo."""

import io
import json
import pathlib
import shutil
import subprocess
from collections.abc import Sequence
from typing import Any, cast, overload

from charmcraft import errors


class Skopeo:
"""A class for interacting with skopeo."""

def __init__(
self,
*,
skopeo_path: str = "",
insecure_policy: bool = False,
arch: str | None = None,
os: str | None = None,
tmpdir: pathlib.Path | None = None,
debug: bool = False,
) -> None:
if skopeo_path:
self._skopeo = skopeo_path
else:
self._skopeo = cast(str, shutil.which("skopeo"))
if not self._skopeo:
raise RuntimeError("Cannot find a skopeo executable.")
self._insecure_policy = insecure_policy
self.arch = arch
self.os = os
if tmpdir:
tmpdir.mkdir(parents=True, exist_ok=True)
self._tmpdir = tmpdir
self._debug = debug

self._run_skopeo([self._skopeo, "--version"], capture_output=True, text=True)

def get_global_command(self) -> list[str]:
"""Prepare the global skopeo options."""
command = [self._skopeo]
if self._insecure_policy:
command.append("--insecure-policy")
if self.arch:
command.extend(["--override-arch", self.arch])
if self.os:
command.extend(["--override-os", self.os])
if self._tmpdir:
command.extend(["--tmpdir", str(self._tmpdir)])
if self._debug:
command.append("--debug")
return command

def _run_skopeo(self, command: Sequence[str], **kwargs) -> subprocess.CompletedProcess:
"""Run skopeo, converting the error message if necessary."""
try:
return subprocess.run(command, check=True, **kwargs)
except subprocess.CalledProcessError as exc:
raise errors.SubprocessError.from_subprocess(exc) from exc

def copy(
self,
source_image: str,
destination_image: str,
*,
all_images: bool = False,
preserve_digests: bool = False,
source_username: str | None = None,
source_password: str | None = None,
dest_username: str | None = None,
dest_password: str | None = None,
stdout: io.FileIO | int | None = None,
stderr: io.FileIO | int | None = None,
) -> subprocess.CompletedProcess:
"""Copy an OCI image using Skopeo."""
command = [
*self.get_global_command(),
"copy",
]
if all_images:
command.append("--all")
if preserve_digests:
command.append("--preserve-digests")
if source_username and source_password:
command.extend(["--src-creds", f"{source_username}:{source_password}"])
elif source_username:
command.extend(["--src-creds", source_username])
elif source_password:
command.extend(["--src-password", source_password])
if dest_username and dest_password:
command.extend(["--dest-creds", f"{dest_username}:{dest_password}"])
elif dest_username:
command.extend(["--dest-creds", dest_username])
elif dest_password:
command.extend(["--dest-password", dest_password])

command.extend([source_image, destination_image])

if stdout or stderr:
return self._run_skopeo(command, stdout=stdout, stderr=stderr, text=True)
return self._run_skopeo(command, capture_output=True, text=True)

@overload
def inspect(
self, image: str, *, format_template: None = None, raw: bool = False, tags: bool = True
) -> dict[str, Any]: ...
@overload
def inspect(
self, image: str, *, format_template: str, raw: bool = False, tags: bool = True
) -> str: ...
def inspect(
self,
image: str,
*,
format_template: str | None = None,
raw: bool = False,
tags: bool = True,
) -> dict[str, Any] | str:
"""Inspect an image."""
command = [*self.get_global_command(), "inspect"]
if format_template is not None:
command.extend(["--format", format_template])
if raw:
command.append("--raw")
if not tags:
command.append("--no-tags")

command.append(image)

result = self._run_skopeo(command, capture_output=True, text=True)

if format_template is None:
return json.loads(result.stdout)
return result.stdout
126 changes: 126 additions & 0 deletions tests/unit/utils/test_skopeo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
# Copyright 2024 Canonical Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# For further info, check https://github.com/canonical/charmcraft
"""Unit tests for skopeo wrapper."""

import pathlib
import platform
from unittest import mock

import pytest

from charmcraft.utils.skopeo import Skopeo

pytestmark = [
pytest.mark.xfail(
platform.system().lower() not in ("linux", "darwin"),
reason="Don't necessarily have skopeo on non Linux/mac platforms.",
strict=False, # Allow them to pass anyway.
),
]

IMAGE_PATHS = [ # See: https://github.com/containers/skopeo/blob/main/docs/skopeo.1.md#image-names
"containers-storage:my/local:image",
"dir:/tmp/some-dir",
"docker://my-image:latest",
"docker-archive:/tmp/some-archive",
"docker-archive:/tmp/some-archive:latest",
"docker-daemon:sha256:f515493110d497051b4a5c4d977c2b1e7f38190def919ab22683e6785b9d5067",
"docker-daemon:ubuntu:24.04",
"oci:/tmp/some-dir:latest",
"oci-archive:my-image.tar",
]


@pytest.mark.parametrize("path", ["/skopeo", "/bin/skopeo"])
def test_skopeo_path(fake_process, path):
fake_process.register([path, "--version"])
skopeo = Skopeo(skopeo_path=path)

assert skopeo.get_global_command() == [path]


def test_find_skopeo_success(fake_process):
path = "/fake/path/to/skopeo"
fake_process.register([path, "--version"])
with mock.patch("shutil.which", return_value=path) as mock_which:
skopeo = Skopeo()

assert skopeo.get_global_command() == [path]
mock_which.assert_called_once_with("skopeo")


@pytest.mark.parametrize(
("kwargs", "expected"),
[
pytest.param({}, [], id="empty"),
pytest.param({"insecure_policy": True}, ["--insecure-policy"], id="insecure_policy"),
pytest.param({"arch": "amd64"}, ["--override-arch", "amd64"], id="amd64"),
pytest.param({"arch": "arm64"}, ["--override-arch", "arm64"], id="arm64"),
pytest.param({"arch": "riscv64"}, ["--override-arch", "riscv64"], id="riscv64"),
pytest.param({"os": "linux"}, ["--override-os", "linux"], id="os-linux"),
pytest.param({"os": "bsd"}, ["--override-os", "bsd"], id="os-bsd"),
pytest.param(
{"tmpdir": pathlib.Path("/tmp/skopeo_tmp")},
["--tmpdir", "/tmp/skopeo_tmp"],
id="tmpdir",
),
],
)
def test_get_global_command(fake_process, kwargs, expected):
"""Tests for getting the global command and arguments."""
fake_process.register(["/skopeo", "--version"])
skopeo = Skopeo(skopeo_path="/skopeo", **kwargs)

assert skopeo.get_global_command() == ["/skopeo", *expected]


@pytest.fixture()
def fake_skopeo(fake_process):
fake_process.register(["/skopeo", "--version"])
return Skopeo(skopeo_path="/skopeo")


@pytest.mark.parametrize("source_image", IMAGE_PATHS)
@pytest.mark.parametrize("destination_image", IMAGE_PATHS)
@pytest.mark.parametrize(
("kwargs", "expected_args"),
[
({}, []),
({"all_images": True}, ["--all"]),
({"preserve_digests": True}, ["--preserve-digests"]),
({"source_username": "user"}, ["--src-creds", "user"]),
({"source_password": "pass"}, ["--src-password", "pass"]),
({"source_username": "user", "source_password": "pass"}, ["--src-creds", "user:pass"]),
({"dest_username": "user"}, ["--dest-creds", "user"]),
({"dest_password": "pass"}, ["--dest-password", "pass"]),
({"dest_username": "user", "dest_password": "pass"}, ["--dest-creds", "user:pass"]),
],
)
def test_get_copy_command(
fake_process, fake_skopeo: Skopeo, source_image, destination_image, kwargs, expected_args
):
fake_process.register(
[
*fake_skopeo.get_global_command(),
"copy",
*expected_args,
source_image,
destination_image,
]
)
result = fake_skopeo.copy(source_image, destination_image, **kwargs)

result.check_returncode()

0 comments on commit 539230c

Please sign in to comment.