diff --git a/README.md b/README.md index ca0b0dc6..a0699cde 100644 --- a/README.md +++ b/README.md @@ -705,9 +705,9 @@ check doesn't have to match the event being generated: by the time that Juju sends a pebble-check-failed event the check might have started passing again. ```python -ctx = scenario.Context(MyCharm, meta={"name": "foo", "containers": {"my-container": {}}}) +ctx = scenario.Context(MyCharm, meta={"name": "foo", "containers": {"my_container": {}}}) check_info = scenario.CheckInfo("http-check", failures=7, status=ops.pebble.CheckStatus.DOWN) -container = scenario.Container("my-container", check_infos={check_info}) +container = scenario.Container("my_container", check_infos={check_info}) state = scenario.State(containers={container}) ctx.run(ctx.on.pebble_check_failed(info=check_info, container=container), state=state) ``` @@ -804,22 +804,27 @@ Scenario has secrets. Here's how you use them. state = scenario.State( secrets={ scenario.Secret( - {0: {'key': 'public'}}, - id='foo', - ), - }, + tracked_content={'key': 'public'}, + latest_content={'key': 'public', 'cert': 'private'}, + ) + } ) ``` -The only mandatory arguments to Secret are its secret ID (which should be unique) and its 'contents': that is, a mapping -from revision numbers (integers) to a `str:str` dict representing the payload of the revision. +The only mandatory arguments to Secret is the `tracked_content` dict: a `str:str` +mapping representing the content of the revision. If there is a newer revision +of the content than the one the unit that's handling the event is tracking, then +`latest_content` should also be provided - if it's not, then Scenario assumes +that `latest_content` is the `tracked_content`. If there are other revisions of +the content, simply don't include them: the unit has no way of knowing about +these. There are three cases: - the secret is owned by this app but not this unit, in which case this charm can only manage it if we are the leader - the secret is owned by this unit, in which case this charm can always manage it (leader or not) -- (default) the secret is not owned by this app nor unit, which means we can't manage it but only view it +- (default) the secret is not owned by this app nor unit, which means we can't manage it but only view it (this includes user secrets) -Thus by default, the secret is not owned by **this charm**, but, implicitly, by some unknown 'other charm', and that other charm has granted us view rights. +Thus by default, the secret is not owned by **this charm**, but, implicitly, by some unknown 'other charm' (or a user), and that other has granted us view rights. The presence of the secret in `State.secrets` entails that we have access to it, either as owners or as grantees. Therefore, if we're not owners, we must be grantees. Absence of a Secret from the known secrets list means we are not entitled to obtaining it in any way. The charm, indeed, shouldn't even know it exists. @@ -830,32 +835,52 @@ If this charm does not own the secret, but also it was not granted view rights b To specify a secret owned by this unit (or app): ```python +rel = scenario.Relation("web") state = scenario.State( secrets={ scenario.Secret( - {0: {'key': 'private'}}, - id='foo', + {'key': 'private'}, owner='unit', # or 'app' - remote_grants={0: {"remote"}} - # the secret owner has granted access to the "remote" app over some relation with ID 0 - ), - }, + # The secret owner has granted access to the "remote" app over some relation: + remote_grants={rel.id: {"remote"}} + ) + } ) ``` -To specify a secret owned by some other application and give this unit (or app) access to it: +To specify a secret owned by some other application, or a user secret, and give this unit (or app) access to it: ```python state = scenario.State( secrets={ scenario.Secret( - {0: {'key': 'public'}}, - id='foo', + {'key': 'public'}, # owner=None, which is the default - revision=0, # the revision that this unit (or app) is currently tracking - ), - }, + ) + } +) +``` + +When handling the `secret-expired` and `secret-remove` events, the charm must remove the specified revision of the secret. For `secret-remove`, the revision will no longer be in the `State`, because it's no longer in use (which is why the `secret-remove` event was triggered). To ensure that the charm is removing the secret, check the context for the history of secret removal: + +```python +class SecretCharm(ops.CharmBase): + def __init__(self, framework): + super().__init__(framework) + self.framework.observe(self.on.secret_remove, self._on_secret_remove) + + def _on_secret_remove(self, event): + event.secret.remove_revision(event.revision) + + +ctx = scenario.Context(SecretCharm, meta={"name": "foo"}) +secret = scenario.Secret({"password": "xxxxxxxx"}, owner="app") +old_revision = 42 +state = ctx.run( + ctx.on.secret_remove(secret, revision=old_revision), + scenario.State(leader=True, secrets={secret}) ) +assert ctx.removed_secret_revisions == [old_revision] ``` ## StoredState diff --git a/scenario/context.py b/scenario/context.py index 6dcf910a..d3efedc5 100644 --- a/scenario/context.py +++ b/scenario/context.py @@ -376,6 +376,7 @@ class Context: - :attr:`app_status_history`: record of the app statuses the charm has set - :attr:`unit_status_history`: record of the unit statuses the charm has set - :attr:`workload_version_history`: record of the workload versions the charm has set + - :attr:`removed_secret_revisions`: record of the secret revisions the charm has removed - :attr:`emitted_events`: record of the events (including custom) that the charm has processed This allows you to write assertions not only on the output state, but also, to some @@ -441,48 +442,6 @@ def __init__( ): """Represents a simulated charm's execution context. - It is the main entry point to running a scenario test. - - It contains: the charm source code being executed, the metadata files associated with it, - a charm project repository root, and the juju version to be simulated. - - After you have instantiated Context, typically you will call one of `run()` or - `run_action()` to execute the charm once, write any assertions you like on the output - state returned by the call, write any assertions you like on the Context attributes, - then discard the Context. - Each Context instance is in principle designed to be single-use: - Context is not cleaned up automatically between charm runs. - You can call `.clear()` to do some clean up, but we don't guarantee all state will be gone. - - Any side effects generated by executing the charm, that are not rightful part of the State, - are in fact stored in the Context: - - ``juju_log``: record of what the charm has sent to juju-log - - ``app_status_history``: record of the app statuses the charm has set - - ``unit_status_history``: record of the unit statuses the charm has set - - ``workload_version_history``: record of the workload versions the charm has set - - ``emitted_events``: record of the events (including custom ones) that the charm has - processed - - This allows you to write assertions not only on the output state, but also, to some - extent, on the path the charm took to get there. - - A typical scenario test will look like: - - >>> from scenario import Context, State - >>> from ops import ActiveStatus - >>> from charm import MyCharm, MyCustomEvent # noqa - >>> - >>> def test_foo(): - >>> # Arrange: set the context up - >>> c = Context(MyCharm) - >>> # Act: prepare the state and emit an event - >>> state_out = c.run(c.update_status(), State()) - >>> # Assert: verify the output state is what you think it should be - >>> assert state_out.unit_status == ActiveStatus('foobar') - >>> # Assert: verify the Context contains what you think it should - >>> assert len(c.emitted_events) == 4 - >>> assert isinstance(c.emitted_events[3], MyCustomEvent) - :arg charm_type: the CharmBase subclass to call ``ops.main()`` on. :arg meta: charm metadata to use. Needs to be a valid metadata.yaml format (as a dict). If none is provided, we will search for a ``metadata.yaml`` file in the charm root. @@ -497,16 +456,6 @@ def __init__( :arg app_trusted: whether the charm has Juju trust (deployed with ``--trust`` or added with ``juju trust``). Defaults to False. :arg charm_root: virtual charm root the charm will be executed with. - If the charm, say, expects a `./src/foo/bar.yaml` file present relative to the - execution cwd, you need to use this. E.g.: - - >>> import scenario - >>> import tempfile - >>> virtual_root = tempfile.TemporaryDirectory() - >>> local_path = Path(local_path.name) - >>> (local_path / 'foo').mkdir() - >>> (local_path / 'foo' / 'bar.yaml').write_text('foo: bar') - >>> scenario.Context(... charm_root=virtual_root).run(...) """ if not any((meta, actions, config)): @@ -551,6 +500,7 @@ def __init__( self.app_status_history: List["_EntityStatus"] = [] self.unit_status_history: List["_EntityStatus"] = [] self.workload_version_history: List[str] = [] + self.removed_secret_revisions: List[int] = [] self.emitted_events: List[EventBase] = [] self.requested_storages: Dict[str, int] = {} diff --git a/scenario/mocking.py b/scenario/mocking.py index 6a65ef41..5284dbd3 100644 --- a/scenario/mocking.py +++ b/scenario/mocking.py @@ -2,7 +2,6 @@ # Copyright 2023 Canonical Ltd. # See LICENSE file for licensing details. import datetime -import random import shutil from io import StringIO from pathlib import Path @@ -202,21 +201,16 @@ def _get_secret(self, id=None, label=None): return secrets[0] elif label: - secrets = [s for s in self._state.secrets if s.label == label] - if not secrets: - raise SecretNotFoundError(label) - return secrets[0] + try: + return self._state.get_secret(label=label) + except KeyError: + raise SecretNotFoundError(label) from None else: # if all goes well, this should never be reached. ops.model.Secret will check upon # instantiation that either an id or a label are set, and raise a TypeError if not. raise RuntimeError("need id or label.") - @staticmethod - def _generate_secret_id(): - id = "".join(map(str, [random.choice(list(range(10))) for _ in range(20)])) - return f"secret:{id}" - def _check_app_data_access(self, is_app: bool): if not isinstance(is_app, bool): raise TypeError("is_app parameter to relation_get must be a boolean") @@ -368,10 +362,8 @@ def secret_add( ) -> str: from scenario.state import Secret - secret_id = self._generate_secret_id() secret = Secret( - id=secret_id, - contents={0: content}, + content, label=label, description=description, expire=expire, @@ -381,7 +373,7 @@ def secret_add( secrets = set(self._state.secrets) secrets.add(secret) self._state._update_secrets(frozenset(secrets)) - return secret_id + return secret.id def _check_can_manage_secret( self, @@ -411,19 +403,19 @@ def secret_get( secret = self._get_secret(id, label) juju_version = self._context.juju_version if not (juju_version == "3.1.7" or juju_version >= "3.3.1"): - # in this medieval juju chapter, + # In this medieval Juju chapter, # secret owners always used to track the latest revision. # ref: https://bugs.launchpad.net/juju/+bug/2037120 if secret.owner is not None: refresh = True - revision = secret.revision if peek or refresh: - revision = max(secret.contents.keys()) if refresh: - secret._set_revision(revision) + secret._track_latest_revision() + assert secret.latest_content is not None + return secret.latest_content - return secret.contents[revision] + return secret.tracked_content def secret_info_get( self, @@ -439,7 +431,7 @@ def secret_info_get( return SecretInfo( id=secret.id, label=secret.label, - revision=max(secret.contents), + revision=secret._latest_revision, expires=secret.expire, rotation=secret.rotate, rotates=None, # not implemented yet. @@ -458,6 +450,15 @@ def secret_set( secret = self._get_secret(id, label) self._check_can_manage_secret(secret) + if content == secret.latest_content: + # In Juju 3.6 and higher, this is a no-op, but it's good to warn + # charmers if they are doing this, because it's not generally good + # practice. + # https://bugs.launchpad.net/juju/+bug/2069238 + logger.warning( + f"secret {id} contents set to the existing value: new revision not needed", + ) + secret._update_metadata( content=content, label=label, @@ -496,10 +497,32 @@ def secret_remove(self, id: str, *, revision: Optional[int] = None): secret = self._get_secret(id) self._check_can_manage_secret(secret) - if revision: - del secret.contents[revision] - else: - secret.contents.clear() + # Removing all revisions means that the secret is removed, so is no + # longer in the state. + if revision is None: + secrets = set(self._state.secrets) + secrets.remove(secret) + self._state._update_secrets(frozenset(secrets)) + return + + # Juju does not prevent removing the tracked or latest revision, but it + # is always a mistake to do this. Rather than having the state model a + # secret where the tracked/latest revision cannot be retrieved but the + # secret still exists, we raise instead, so that charms know that there + # is a problem with their code. + if revision in (secret._tracked_revision, secret._latest_revision): + raise ValueError( + "Charms should not remove the latest revision of a secret. " + "Add a new revision with `set_content()` instead, and the previous " + "revision will be cleaned up by the secret owner when no longer in use.", + ) + + # For all other revisions, the content is not visible to the charm + # (this is as designed: the secret is being removed, so it should no + # longer be in use). That means that the state does not need to be + # modified - however, unit tests should be able to verify that the remove call was + # executed, so we track that in a history list in the context. + self._context.removed_secret_revisions.append(revision) def relation_remote_app_name( self, diff --git a/scenario/state.py b/scenario/state.py index 9362b2c2..c873b1d4 100644 --- a/scenario/state.py +++ b/scenario/state.py @@ -5,7 +5,9 @@ import dataclasses import datetime import inspect +import random import re +import string from collections import namedtuple from enum import Enum from itertools import chain @@ -268,24 +270,26 @@ def _to_ops(self) -> CloudSpec_Ops: ) +def _generate_secret_id(): + # This doesn't account for collisions, but the odds are so low that it + # should not be possible in any realistic test run. + secret_id = "".join( + random.choice(string.ascii_lowercase + string.digits) for _ in range(20) + ) + return f"secret:{secret_id}" + + @dataclasses.dataclass(frozen=True) class Secret(_max_posargs(1)): - # mapping from revision IDs to each revision's contents - contents: Dict[int, "RawSecretRevisionContents"] + tracked_content: "RawSecretRevisionContents" + latest_content: Optional["RawSecretRevisionContents"] = None - id: str - # CAUTION: ops-created Secrets (via .add_secret()) will have a canonicalized - # secret id (`secret:` prefix) - # but user-created ones will not. Using post-init to patch it in feels bad, but requiring the user to - # add the prefix manually every time seems painful as well. + id: str = dataclasses.field(default_factory=_generate_secret_id) # indicates if the secret is owned by THIS unit, THIS app or some other app/unit. # if None, the implication is that the secret has been granted to this unit. owner: Literal["unit", "app", None] = None - # what revision is currently tracked by this charm. Only meaningful if owner=False - revision: int = 0 - # mapping from relation IDs to remote unit/apps to which this secret has been granted. # Only applicable if owner remote_grants: Dict[int, Set[str]] = dataclasses.field(default_factory=dict) @@ -295,13 +299,25 @@ class Secret(_max_posargs(1)): expire: Optional[datetime.datetime] = None rotate: Optional[SecretRotate] = None + # what revision is currently tracked by this charm. Only meaningful if owner=False + _tracked_revision: int = 1 + + # what revision is the latest for this secret. + _latest_revision: int = 1 + def __hash__(self) -> int: return hash(self.id) - def _set_revision(self, revision: int): - """Set a new tracked revision.""" + def __post_init__(self): + if self.latest_content is None: + # bypass frozen dataclass + object.__setattr__(self, "latest_content", self.tracked_content) + + def _track_latest_revision(self): + """Set the current revision to the tracked revision.""" # bypass frozen dataclass - object.__setattr__(self, "revision", revision) + object.__setattr__(self, "_tracked_revision", self._latest_revision) + object.__setattr__(self, "tracked_content", self.latest_content) def _update_metadata( self, @@ -312,11 +328,12 @@ def _update_metadata( rotate: Optional[SecretRotate] = None, ): """Update the metadata.""" - revision = max(self.contents.keys()) - if content: - self.contents[revision + 1] = content - # bypass frozen dataclass + object.__setattr__(self, "_latest_revision", self._latest_revision + 1) + # TODO: if this is done twice in the same hook, then Juju ignores the + # first call, it doesn't continue to update like this does. + if content: + object.__setattr__(self, "latest_content", content) if label: object.__setattr__(self, "label", label) if description: diff --git a/tests/test_consistency_checker.py b/tests/test_consistency_checker.py index 1b97b94b..38dcfbc2 100644 --- a/tests/test_consistency_checker.py +++ b/tests/test_consistency_checker.py @@ -303,7 +303,7 @@ def test_config_secret_old_juju(juju_version): @pytest.mark.parametrize("bad_v", ("1.0", "0", "1.2", "2.35.42", "2.99.99", "2.99")) def test_secrets_jujuv_bad(bad_v): - secret = Secret("secret:foo", {0: {"a": "b"}}) + secret = Secret({"a": "b"}) assert_inconsistent( State(secrets={secret}), _Event("bar"), @@ -312,14 +312,14 @@ def test_secrets_jujuv_bad(bad_v): ) assert_inconsistent( State(secrets={secret}), - secret.changed_event, + _Event("secret_changed", secret=secret), _CharmSpec(MyCharm, {}), bad_v, ) assert_inconsistent( State(), - secret.changed_event, + _Event("secret_changed", secret=secret), _CharmSpec(MyCharm, {}), bad_v, ) @@ -328,7 +328,7 @@ def test_secrets_jujuv_bad(bad_v): @pytest.mark.parametrize("good_v", ("3.0", "3.1", "3", "3.33", "4", "100")) def test_secrets_jujuv_bad(good_v): assert_consistent( - State(secrets={Secret(id="secret:foo", contents={0: {"a": "b"}})}), + State(secrets={Secret({"a": "b"})}), _Event("bar"), _CharmSpec(MyCharm, {}), good_v, @@ -336,14 +336,14 @@ def test_secrets_jujuv_bad(good_v): def test_secret_not_in_state(): - secret = Secret(id="secret:foo", contents={"a": "b"}) + secret = Secret({"a": "b"}) assert_inconsistent( State(), _Event("secret_changed", secret=secret), _CharmSpec(MyCharm, {}), ) assert_consistent( - State(secrets=[secret]), + State(secrets={secret}), _Event("secret_changed", secret=secret), _CharmSpec(MyCharm, {}), ) @@ -723,11 +723,7 @@ def test_storedstate_consistency(): ) assert_inconsistent( State( - stored_states={ - StoredState( - owner_path=None, content={"secret": Secret(id="foo", contents={})} - ) - } + stored_states={StoredState(owner_path=None, content={"secret": Secret({})})} ), _Event("start"), _CharmSpec( diff --git a/tests/test_context_on.py b/tests/test_context_on.py index 151bc303..8ddbf4d4 100644 --- a/tests/test_context_on.py +++ b/tests/test_context_on.py @@ -81,10 +81,8 @@ def test_simple_events(event_name, event_kind): ) def test_simple_secret_events(as_kwarg, event_name, event_kind, owner): ctx = scenario.Context(ContextCharm, meta=META, actions=ACTIONS) - secret = scenario.Secret( - id="secret:123", contents={0: {"password": "xxxx"}}, owner=owner - ) - state_in = scenario.State(secrets=[secret]) + secret = scenario.Secret({"password": "xxxx"}, owner=owner) + state_in = scenario.State(secrets={secret}) # These look like: # ctx.run(ctx.on.secret_changed(secret=secret), state) # The secret must always be passed because the same event name is used for @@ -114,11 +112,11 @@ def test_simple_secret_events(as_kwarg, event_name, event_kind, owner): def test_revision_secret_events(event_name, event_kind): ctx = scenario.Context(ContextCharm, meta=META, actions=ACTIONS) secret = scenario.Secret( - id="secret:123", - contents={42: {"password": "yyyy"}, 43: {"password": "xxxx"}}, + tracked_content={"password": "yyyy"}, + latest_content={"password": "xxxx"}, owner="app", ) - state_in = scenario.State(secrets=[secret]) + state_in = scenario.State(secrets={secret}) # These look like: # ctx.run(ctx.on.secret_expired(secret=secret, revision=revision), state) # The secret and revision must always be passed because the same event name @@ -137,11 +135,11 @@ def test_revision_secret_events(event_name, event_kind): def test_revision_secret_events_as_positional_arg(event_name): ctx = scenario.Context(ContextCharm, meta=META, actions=ACTIONS) secret = scenario.Secret( - id="secret:123", - contents={42: {"password": "yyyy"}, 43: {"password": "xxxx"}}, + tracked_content={"password": "yyyy"}, + latest_content={"password": "xxxx"}, owner=None, ) - state_in = scenario.State(secrets=[secret]) + state_in = scenario.State(secrets={secret}) with pytest.raises(TypeError): ctx.run(getattr(ctx.on, event_name)(secret, 42), state_in) diff --git a/tests/test_e2e/test_secrets.py b/tests/test_e2e/test_secrets.py index 85fda55c..5983c7df 100644 --- a/tests/test_e2e/test_secrets.py +++ b/tests/test_e2e/test_secrets.py @@ -1,5 +1,4 @@ import datetime -import warnings import pytest from ops.charm import CharmBase @@ -36,97 +35,91 @@ def test_get_secret_no_secret(mycharm): assert mgr.charm.model.get_secret(label="foo") -def test_get_secret(mycharm): +@pytest.mark.parametrize("owner", ("app", "unit")) +def test_get_secret(mycharm, owner): ctx = Context(mycharm, meta={"name": "local"}) + secret = Secret({"a": "b"}, owner=owner) with ctx.manager( - state=State(secrets={Secret(id="foo", contents={0: {"a": "b"}})}), + state=State(secrets={secret}), event=ctx.on.update_status(), ) as mgr: - assert mgr.charm.model.get_secret(id="foo").get_content()["a"] == "b" + assert mgr.charm.model.get_secret(id=secret.id).get_content()["a"] == "b" @pytest.mark.parametrize("owner", ("app", "unit")) def test_get_secret_get_refresh(mycharm, owner): ctx = Context(mycharm, meta={"name": "local"}) + secret = Secret( + tracked_content={"a": "b"}, + latest_content={"a": "c"}, + owner=owner, + ) with ctx.manager( ctx.on.update_status(), - State( - secrets={ - Secret( - id="foo", - contents={ - 0: {"a": "b"}, - 1: {"a": "c"}, - }, - owner=owner, - ) - } - ), + State(secrets={secret}), ) as mgr: charm = mgr.charm - assert charm.model.get_secret(id="foo").get_content(refresh=True)["a"] == "c" + assert ( + charm.model.get_secret(id=secret.id).get_content(refresh=True)["a"] == "c" + ) @pytest.mark.parametrize("app", (True, False)) def test_get_secret_nonowner_peek_update(mycharm, app): ctx = Context(mycharm, meta={"name": "local"}) + secret = Secret( + tracked_content={"a": "b"}, + latest_content={"a": "c"}, + ) with ctx.manager( ctx.on.update_status(), State( leader=app, - secrets={ - Secret( - id="foo", - contents={ - 0: {"a": "b"}, - 1: {"a": "c"}, - }, - ), - }, + secrets={secret}, ), ) as mgr: charm = mgr.charm - assert charm.model.get_secret(id="foo").get_content()["a"] == "b" - assert charm.model.get_secret(id="foo").peek_content()["a"] == "c" - assert charm.model.get_secret(id="foo").get_content()["a"] == "b" + assert charm.model.get_secret(id=secret.id).get_content()["a"] == "b" + assert charm.model.get_secret(id=secret.id).peek_content()["a"] == "c" + # Verify that the peek has not refreshed: + assert charm.model.get_secret(id=secret.id).get_content()["a"] == "b" - assert charm.model.get_secret(id="foo").get_content(refresh=True)["a"] == "c" - assert charm.model.get_secret(id="foo").get_content()["a"] == "c" + assert ( + charm.model.get_secret(id=secret.id).get_content(refresh=True)["a"] == "c" + ) + assert charm.model.get_secret(id=secret.id).get_content()["a"] == "c" @pytest.mark.parametrize("owner", ("app", "unit")) def test_get_secret_owner_peek_update(mycharm, owner): ctx = Context(mycharm, meta={"name": "local"}) + secret = Secret( + tracked_content={"a": "b"}, + latest_content={"a": "c"}, + owner=owner, + ) with ctx.manager( ctx.on.update_status(), State( - secrets={ - Secret( - id="foo", - contents={ - 0: {"a": "b"}, - 1: {"a": "c"}, - }, - owner=owner, - ) - } + secrets={secret}, ), ) as mgr: charm = mgr.charm - assert charm.model.get_secret(id="foo").get_content()["a"] == "b" - assert charm.model.get_secret(id="foo").peek_content()["a"] == "c" - assert charm.model.get_secret(id="foo").get_content(refresh=True)["a"] == "c" + assert charm.model.get_secret(id=secret.id).get_content()["a"] == "b" + assert charm.model.get_secret(id=secret.id).peek_content()["a"] == "c" + # Verify that the peek has not refreshed: + assert charm.model.get_secret(id=secret.id).get_content()["a"] == "b" + assert ( + charm.model.get_secret(id=secret.id).get_content(refresh=True)["a"] == "c" + ) @pytest.mark.parametrize("owner", ("app", "unit")) def test_secret_changed_owner_evt_fails(mycharm, owner): ctx = Context(mycharm, meta={"name": "local"}) secret = Secret( - id="foo", - contents={ - 0: {"a": "b"}, - 1: {"a": "c"}, - }, + tracked_content={"a": "b"}, + latest_content={"a": "c"}, owner=owner, ) with pytest.raises(ValueError): @@ -144,11 +137,8 @@ def test_secret_changed_owner_evt_fails(mycharm, owner): def test_consumer_events_failures(mycharm, evt_suffix, revision): ctx = Context(mycharm, meta={"name": "local"}) secret = Secret( - id="foo", - contents={ - 0: {"a": "b"}, - 1: {"a": "c"}, - }, + tracked_content={"a": "b"}, + latest_content={"a": "c"}, ) kwargs = {"secret": secret} if revision is not None: @@ -172,15 +162,15 @@ def test_add(mycharm, app): assert mgr.output.secrets secret = mgr.output.get_secret(label="mylabel") - assert secret.contents[0] == {"foo": "bar"} + assert secret.latest_content == secret.tracked_content == {"foo": "bar"} assert secret.label == "mylabel" def test_set_legacy_behaviour(mycharm): # in juju < 3.1.7, secret owners always used to track the latest revision. # ref: https://bugs.launchpad.net/juju/+bug/2037120 - rev1, rev2, rev3 = {"foo": "bar"}, {"foo": "baz"}, {"foo": "baz", "qux": "roz"} ctx = Context(mycharm, meta={"name": "local"}, juju_version="3.1.6") + rev1, rev2 = {"foo": "bar"}, {"foo": "baz", "qux": "roz"} with ctx.manager( ctx.on.update_status(), State(), @@ -205,26 +195,18 @@ def test_set_legacy_behaviour(mycharm): == rev2 ) - secret.set_content(rev3) state_out = mgr.run() - secret = charm.model.get_secret(label="mylabel") - assert ( - secret.get_content() - == secret.peek_content() - == secret.get_content(refresh=True) - == rev3 - ) - assert state_out.get_secret(label="mylabel").contents == { - 0: rev1, - 1: rev2, - 2: rev3, - } + assert ( + state_out.get_secret(label="mylabel").tracked_content + == state_out.get_secret(label="mylabel").latest_content + == rev2 + ) def test_set(mycharm): - rev1, rev2, rev3 = {"foo": "bar"}, {"foo": "baz"}, {"foo": "baz", "qux": "roz"} ctx = Context(mycharm, meta={"name": "local"}) + rev1, rev2 = {"foo": "bar"}, {"foo": "baz", "qux": "roz"} with ctx.manager( ctx.on.update_status(), State(), @@ -238,25 +220,25 @@ def test_set(mycharm): == rev1 ) + # TODO: if this is done in the same event hook, it's more complicated + # than this. Figure out what we should do here. + # Also the next test, for Juju 3.3 secret.set_content(rev2) assert secret.get_content() == rev1 assert secret.peek_content() == secret.get_content(refresh=True) == rev2 - secret.set_content(rev3) state_out = mgr.run() - assert secret.get_content() == rev2 - assert secret.peek_content() == secret.get_content(refresh=True) == rev3 - assert state_out.get_secret(label="mylabel").contents == { - 0: rev1, - 1: rev2, - 2: rev3, - } + assert ( + state_out.get_secret(label="mylabel").tracked_content + == state_out.get_secret(label="mylabel").latest_content + == rev2 + ) def test_set_juju33(mycharm): - rev1, rev2, rev3 = {"foo": "bar"}, {"foo": "baz"}, {"foo": "baz", "qux": "roz"} ctx = Context(mycharm, meta={"name": "local"}, juju_version="3.3.1") + rev1, rev2 = {"foo": "bar"}, {"foo": "baz", "qux": "roz"} with ctx.manager( ctx.on.update_status(), State(), @@ -270,44 +252,36 @@ def test_set_juju33(mycharm): assert secret.peek_content() == rev2 assert secret.get_content(refresh=True) == rev2 - secret.set_content(rev3) state_out = mgr.run() - assert secret.get_content() == rev2 - assert secret.peek_content() == rev3 - assert secret.get_content(refresh=True) == rev3 - assert state_out.get_secret(label="mylabel").contents == { - 0: rev1, - 1: rev2, - 2: rev3, - } + assert ( + state_out.get_secret(label="mylabel").tracked_content + == state_out.get_secret(label="mylabel").latest_content + == rev2 + ) @pytest.mark.parametrize("app", (True, False)) def test_meta(mycharm, app): ctx = Context(mycharm, meta={"name": "local"}) + secret = Secret( + {"a": "b"}, + owner="app" if app else "unit", + label="mylabel", + description="foobarbaz", + rotate=SecretRotate.HOURLY, + ) with ctx.manager( ctx.on.update_status(), State( leader=True, - secrets={ - Secret( - owner="app" if app else "unit", - id="foo", - label="mylabel", - description="foobarbaz", - rotate=SecretRotate.HOURLY, - contents={ - 0: {"a": "b"}, - }, - ) - }, + secrets={secret}, ), ) as mgr: charm = mgr.charm assert charm.model.get_secret(label="mylabel") - secret = charm.model.get_secret(id="foo") + secret = charm.model.get_secret(id=secret.id) info = secret.get_info() assert secret.label is None @@ -326,32 +300,27 @@ def test_secret_permission_model(mycharm, leader, owner): ) ctx = Context(mycharm, meta={"name": "local"}) + secret = Secret( + {"a": "b"}, + label="mylabel", + owner=owner, + description="foobarbaz", + rotate=SecretRotate.HOURLY, + ) + secret_id = secret.id with ctx.manager( ctx.on.update_status(), State( leader=leader, - secrets={ - Secret( - id="foo", - label="mylabel", - description="foobarbaz", - rotate=SecretRotate.HOURLY, - owner=owner, - contents={ - 0: {"a": "b"}, - }, - ) - }, + secrets={secret}, ), ) as mgr: - secret = mgr.charm.model.get_secret(id="foo") + # can always view + secret: ops_Secret = mgr.charm.model.get_secret(id=secret_id) assert secret.get_content()["a"] == "b" assert secret.peek_content() assert secret.get_content(refresh=True) - # can always view - secret: ops_Secret = mgr.charm.model.get_secret(id="foo") - if expect_manage: assert secret.get_content() assert secret.peek_content() @@ -379,22 +348,18 @@ def test_grant(mycharm, app): ctx = Context( mycharm, meta={"name": "local", "requires": {"foo": {"interface": "bar"}}} ) + secret = Secret( + {"a": "b"}, + owner="unit", + label="mylabel", + description="foobarbaz", + rotate=SecretRotate.HOURLY, + ) with ctx.manager( ctx.on.update_status(), State( relations=[Relation("foo", "remote")], - secrets={ - Secret( - owner="unit", - id="foo", - label="mylabel", - description="foobarbaz", - rotate=SecretRotate.HOURLY, - contents={ - 0: {"a": "b"}, - }, - ) - }, + secrets={secret}, ), ) as mgr: charm = mgr.charm @@ -412,19 +377,15 @@ def test_update_metadata(mycharm): exp = datetime.datetime(2050, 12, 12) ctx = Context(mycharm, meta={"name": "local"}) + secret = Secret( + {"a": "b"}, + owner="unit", + label="mylabel", + ) with ctx.manager( ctx.on.update_status(), State( - secrets={ - Secret( - owner="unit", - id="foo", - label="mylabel", - contents={ - 0: {"a": "b"}, - }, - ) - }, + secrets={secret}, ), ) as mgr: secret = mgr.charm.model.get_secret(label="mylabel") @@ -456,7 +417,7 @@ def _on_start(self, _): secret = self.unit.add_secret({"foo": "bar"}) secret.grant(self.model.relations["bar"][0]) - state = State(leader=leader, relations=[Relation("bar")]) + state = State(leader=leader, relations={Relation("bar")}) ctx = Context( GrantingCharm, meta={"name": "foo", "provides": {"bar": {"interface": "bar"}}} ) @@ -464,29 +425,26 @@ def _on_start(self, _): def test_grant_nonowner(mycharm): - def post_event(charm: CharmBase): - secret = charm.model.get_secret(id="foo") + secret = Secret( + {"a": "b"}, + label="mylabel", + description="foobarbaz", + rotate=SecretRotate.HOURLY, + ) + secret_id = secret.id + def post_event(charm: CharmBase): + secret = charm.model.get_secret(id=secret_id) secret = charm.model.get_secret(label="mylabel") foo = charm.model.get_relation("foo") with pytest.raises(ModelError): secret.grant(relation=foo) - out = trigger( + trigger( State( relations={Relation("foo", "remote")}, - secrets={ - Secret( - id="foo", - label="mylabel", - description="foobarbaz", - rotate=SecretRotate.HOURLY, - contents={ - 0: {"a": "b"}, - }, - ) - }, + secrets={secret}, ), "update_status", mycharm, @@ -497,8 +455,7 @@ def post_event(charm: CharmBase): def test_add_grant_revoke_remove(): class GrantingCharm(CharmBase): - def __init__(self, *args): - super().__init__(*args) + pass ctx = Context( GrantingCharm, meta={"name": "foo", "provides": {"bar": {"interface": "bar"}}} @@ -537,22 +494,84 @@ def __init__(self, *args): secret = charm.model.get_secret(label="mylabel") secret.remove_all_revisions() - assert not mgr.output.get_secret(label="mylabel").contents # secret wiped + with pytest.raises(KeyError): + mgr.output.get_secret(label="mylabel") + + +def test_secret_removed_event(): + class SecretCharm(CharmBase): + def __init__(self, framework): + super().__init__(framework) + self.framework.observe(self.on.secret_remove, self._on_secret_remove) + + def _on_secret_remove(self, event): + event.secret.remove_revision(event.revision) + + ctx = Context(SecretCharm, meta={"name": "foo"}) + secret = Secret({"a": "b"}, owner="app") + old_revision = 42 + state = ctx.run( + ctx.on.secret_remove(secret, revision=old_revision), + State(leader=True, secrets={secret}), + ) + assert secret in state.secrets + assert ctx.removed_secret_revisions == [old_revision] + + +def test_secret_expired_event(): + class SecretCharm(CharmBase): + def __init__(self, framework): + super().__init__(framework) + self.framework.observe(self.on.secret_expired, self._on_secret_expired) + + def _on_secret_expired(self, event): + event.secret.set_content({"password": "newpass"}) + event.secret.remove_revision(event.revision) + + ctx = Context(SecretCharm, meta={"name": "foo"}) + secret = Secret({"password": "oldpass"}, owner="app") + old_revision = 42 + state = ctx.run( + ctx.on.secret_expired(secret, revision=old_revision), + State(leader=True, secrets={secret}), + ) + assert state.get_secret(id=secret.id).latest_content == {"password": "newpass"} + assert ctx.removed_secret_revisions == [old_revision] + + +def test_remove_bad_revision(): + class SecretCharm(CharmBase): + def __init__(self, framework): + super().__init__(framework) + self.framework.observe(self.on.secret_remove, self._on_secret_remove) + + def _on_secret_remove(self, event): + with pytest.raises(ValueError): + event.secret.remove_revision(event.revision) + + ctx = Context(SecretCharm, meta={"name": "foo"}) + secret = Secret({"a": "b"}, owner="app") + ctx.run( + ctx.on.secret_remove(secret, revision=secret._latest_revision), + State(leader=True, secrets={secret}), + ) + ctx.run( + ctx.on.secret_remove(secret, revision=secret._tracked_revision), + State(leader=True, secrets={secret}), + ) def test_no_additional_positional_arguments(): with pytest.raises(TypeError): - Secret({}, None) + Secret({}, {}, None) def test_default_values(): contents = {"foo": "bar"} - id = "secret:1" - secret = Secret(contents, id=id) - assert secret.contents == contents - assert secret.id == id + secret = Secret(contents) + assert secret.latest_content == secret.tracked_content == contents + assert secret.id.startswith("secret:") assert secret.label is None - assert secret.revision == 0 assert secret.description is None assert secret.owner is None assert secret.rotate is None