Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make executor report get facts to scheduler #8268

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions changelogs/unreleased/8190_report_get_facts_to_executor.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
description: Make executor report get facts to scheduler
issue-nr: 8190
change-type: minor
destination-branches: [master]

17 changes: 14 additions & 3 deletions src/inmanta/agent/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@
from typing import Any, Dict, Optional, Sequence, cast
from uuid import UUID

import inmanta.types
import inmanta.util
Comment on lines -38 to -39
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may be wrong to do

import packaging.requirements
from inmanta import const
from inmanta.agent import config as cfg
Expand Down Expand Up @@ -471,6 +469,19 @@ async def create_member(self, member_id: EnvBlueprint) -> ExecutorVirtualEnviron
return process_environment


@dataclass
class FactResult:
resource_id: ResourceVersionIdStr
# Failed fact checks may have empty action_id
action_id: Optional[uuid.UUID]
started: datetime.datetime
finished: datetime.datetime
succeeded: bool
parameters: list[dict[str, Any]]
messages: list[LogLine]
error_msg: Optional[str] = None


@dataclass
class DeployResult:
rvid: ResourceVersionIdStr
Expand Down Expand Up @@ -552,7 +563,7 @@ async def dry_run(
pass

@abc.abstractmethod
async def get_facts(self, resource: ResourceDetails) -> inmanta.types.Apireturn:
async def get_facts(self, resource: ResourceDetails) -> FactResult:
"""
Get facts for a given resource
:param resource: The resource for which to get facts.
Expand Down
8 changes: 4 additions & 4 deletions src/inmanta/agent/forking_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@
import inmanta.util
from inmanta import const, tracing
from inmanta.agent import executor, resourcepool
from inmanta.agent.executor import DeployResult
from inmanta.agent.executor import DeployResult, FactResult
from inmanta.agent.resourcepool import PoolManager, PoolMember
from inmanta.data.model import ResourceIdStr, ResourceType
from inmanta.protocol.ipc_light import (
Expand Down Expand Up @@ -506,14 +506,14 @@ async def call(self, context: ExecutorContext) -> DeployResult:
)


class FactsCommand(inmanta.protocol.ipc_light.IPCMethod[ExecutorContext, inmanta.types.Apireturn]):
class FactsCommand(inmanta.protocol.ipc_light.IPCMethod[ExecutorContext, FactResult]):
"""Get facts from in an executor"""

def __init__(self, agent_name: str, resource: "inmanta.agent.executor.ResourceDetails") -> None:
self.agent_name = agent_name
self.resource = resource

async def call(self, context: ExecutorContext) -> inmanta.types.Apireturn:
async def call(self, context: ExecutorContext) -> FactResult:
return await context.get(self.agent_name).get_facts(self.resource)


Expand Down Expand Up @@ -815,7 +815,7 @@ async def execute(
) -> DeployResult:
return await self.call(ExecuteCommand(self.id.agent_name, action_id, gid, resource_details, reason, requires))

async def get_facts(self, resource: "inmanta.agent.executor.ResourceDetails") -> inmanta.types.Apireturn:
async def get_facts(self, resource: "inmanta.agent.executor.ResourceDetails") -> FactResult:
return await self.call(FactsCommand(self.id.agent_name, resource))

async def join(self) -> None:
Expand Down
52 changes: 38 additions & 14 deletions src/inmanta/agent/in_process_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,12 @@
import inmanta.util
from inmanta import const, data, env, tracing
from inmanta.agent import executor, handler
from inmanta.agent.executor import DeployResult, DryrunResult, FailedResources, ResourceDetails
from inmanta.agent.executor import DeployResult, DryrunResult, FactResult, FailedResources, ResourceDetails
from inmanta.agent.handler import HandlerAPI, SkipResource
from inmanta.const import ParameterSource
from inmanta.data.model import AttributeStateChange, ResourceIdStr, ResourceVersionIdStr
from inmanta.loader import CodeLoader
from inmanta.resources import Resource
from inmanta.types import Apireturn
from inmanta.util import NamedLock, join_threadpools


Expand Down Expand Up @@ -371,7 +370,7 @@ async def dry_run(
assert dryrun_result is not None, "Dryrun result cannot be None"
return dryrun_result

async def get_facts(self, resource: ResourceDetails) -> Apireturn:
async def get_facts(self, resource: ResourceDetails) -> FactResult:
"""
Get facts for a given resource
:param resource: The resource for which to get facts.
Expand All @@ -381,7 +380,16 @@ async def get_facts(self, resource: ResourceDetails) -> Apireturn:
try:
resource_obj: Resource | None = await self.deserialize(resource, const.ResourceAction.getfact)
except Exception:
return 500
return FactResult(
resource_id=resource.rvid,
action_id=None,
parameters=[],
started=datetime.datetime.now().astimezone(),
finished=datetime.datetime.now().astimezone(),
messages=[],
succeeded=False,
error_msg=f"Unable to deserialize resource {resource.id}",
)
assert resource_obj is not None
ctx = handler.HandlerContext(resource_obj)
async with self.activity_lock:
Expand All @@ -405,28 +413,44 @@ async def get_facts(self, resource: ResourceDetails) -> Apireturn:
# Add facts set via the set_fact() method of the HandlerContext
parameters.extend(ctx.facts)

await self.client.set_parameters(tid=self.environment, parameters=parameters)
finished = datetime.datetime.now().astimezone()
await self.client.resource_action_update(
tid=self.environment,
resource_ids=[resource.rvid],
return FactResult(
resource_id=resource.rvid,
action_id=ctx.action_id,
action=const.ResourceAction.getfact,
parameters=parameters,
started=started,
finished=finished,
finished=datetime.datetime.now().astimezone(),
messages=ctx.logs,
succeeded=True,
)

except Exception:
self.logger.exception("Unable to retrieve fact")
self.logger.exception("Unable to retrieve facts for resource %s", resource.id)
return FactResult(
resource_id=resource.rvid,
action_id=ctx.action_id,
parameters=[],
started=datetime.datetime.now().astimezone(),
finished=datetime.datetime.now().astimezone(),
messages=ctx.logs,
succeeded=False,
error_msg=f"Unable to retrieve facts for resource {resource.id}",
)

except Exception:
self.logger.exception("Unable to find a handler for %s", resource.id)
return 500
return FactResult(
resource_id=resource.rvid,
action_id=None,
parameters=[],
started=datetime.datetime.now().astimezone(),
finished=datetime.datetime.now().astimezone(),
messages=[],
succeeded=False,
error_msg=f"Unable to find a handler for {resource.id}",
)
finally:
if provider is not None:
provider.close()
return 200


class InProcessExecutorManager(executor.ExecutorManager[InProcessExecutor]):
Expand Down
6 changes: 3 additions & 3 deletions src/inmanta/agent/write_barier_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
import uuid
from copy import deepcopy

import inmanta.types
import inmanta.types # noqa: F401
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not exactly sure if this is needed or we can remove it

from inmanta import const
from inmanta.agent import executor
from inmanta.agent.executor import DeployResult, DryrunResult, FailedResources, ResourceDetails, ResourceInstallSpec
from inmanta.agent.executor import DeployResult, DryrunResult, FactResult, FailedResources, ResourceDetails, ResourceInstallSpec
from inmanta.data.model import ResourceIdStr


Expand Down Expand Up @@ -57,7 +57,7 @@ async def dry_run(
) -> DryrunResult:
return await self.delegate.dry_run(deepcopy(resource), dry_run_id)

async def get_facts(self, resource: ResourceDetails) -> inmanta.types.Apireturn:
async def get_facts(self, resource: ResourceDetails) -> FactResult:
return await self.delegate.get_facts(deepcopy(resource))

async def join(self) -> None:
Expand Down
30 changes: 29 additions & 1 deletion src/inmanta/deploy/persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from asyncpg import UniqueViolationError

from inmanta import const, data
from inmanta.agent.executor import DeployResult, DryrunResult
from inmanta.agent.executor import DeployResult, DryrunResult, FactResult
from inmanta.const import TERMINAL_STATES, TRANSIENT_STATES, VALID_STATES_ON_STATE_UPDATE, Change, ResourceState
from inmanta.data.model import AttributeStateChange, ResourceIdStr, ResourceVersionIdStr
from inmanta.protocol import Client
Expand Down Expand Up @@ -58,6 +58,10 @@ async def send_deploy_done(self, result: DeployResult) -> None:
async def dryrun_update(self, env: UUID, dryrun_result: DryrunResult) -> None:
pass

@abc.abstractmethod
async def set_parameters(self, fact_result: FactResult) -> None:
pass


class ToServerUpdateManager(StateUpdateManager):
"""
Expand Down Expand Up @@ -112,6 +116,18 @@ async def dryrun_update(self, env: UUID, dryrun_result: DryrunResult) -> None:
status=const.ResourceState.dry,
)

async def set_parameters(self, fact_result: FactResult) -> None:
await self.client.set_parameters(tid=self.environment, parameters=fact_result.parameters)
await self.client.resource_action_update(
tid=self.environment,
resource_ids=[fact_result.resource_id],
action_id=fact_result.action_id,
action=const.ResourceAction.getfact,
started=fact_result.started,
finished=fact_result.finished,
messages=fact_result.messages,
)


class ToDbUpdateManager(StateUpdateManager):

Expand Down Expand Up @@ -331,3 +347,15 @@ async def dryrun_update(self, env: UUID, dryrun_result: DryrunResult) -> None:
messages=dryrun_result.messages,
status=const.ResourceState.dry,
)

async def set_parameters(self, fact_result: FactResult) -> None:
await self.client.set_parameters(tid=self.environment, parameters=fact_result.parameters)
await self.client.resource_action_update(
tid=self.environment,
resource_ids=[fact_result.resource_id],
action_id=fact_result.action_id,
action=const.ResourceAction.getfact,
started=fact_result.started,
finished=fact_result.finished,
messages=fact_result.messages,
)
5 changes: 4 additions & 1 deletion src/inmanta/deploy/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from inmanta import const, data
from inmanta.agent import executor
from inmanta.agent.code_manager import CodeManager
from inmanta.agent.executor import DeployResult
from inmanta.agent.executor import DeployResult, FactResult
from inmanta.data import ConfigurationModel, Environment
from inmanta.data.model import ResourceIdStr, ResourceType, ResourceVersionIdStr
from inmanta.deploy import work
Expand Down Expand Up @@ -641,3 +641,6 @@ async def send_deploy_done(self, result: DeployResult) -> None:

async def dryrun_update(self, env: uuid.UUID, dryrun_result: executor.DryrunResult) -> None:
await self._state_update_delegate.dryrun_update(env, dryrun_result)

async def set_parameters(self, fact_result: FactResult) -> None:
await self._state_update_delegate.set_parameters(fact_result)
8 changes: 7 additions & 1 deletion src/inmanta/deploy/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,4 +286,10 @@ async def execute(self, task_manager: "scheduler.TaskManager", agent: str, reaso
)
return

await my_executor.get_facts(executor_resource_details)
fact_result = await my_executor.get_facts(executor_resource_details)
if fact_result.succeeded:
await task_manager.set_parameters(
fact_result=fact_result,
)
else:
raise Exception(f"Error encountered while executing RefreshTask: {fact_result.error_msg}")
8 changes: 5 additions & 3 deletions tests/deploy/test_scheduler_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,11 @@
import asyncpg
import pytest

import inmanta.types
import utils
from inmanta import const, util
from inmanta.agent import executor
from inmanta.agent.agent_new import Agent
from inmanta.agent.executor import DeployResult, DryrunResult, ResourceDetails, ResourceInstallSpec
from inmanta.agent.executor import DeployResult, DryrunResult, FactResult, ResourceDetails, ResourceInstallSpec
from inmanta.config import Config
from inmanta.const import Change
from inmanta.data import ResourceIdStr
Expand Down Expand Up @@ -120,7 +119,7 @@ async def execute(
async def dry_run(self, resources: Sequence[ResourceDetails], dry_run_id: uuid.UUID) -> None:
self.dry_run_count += 1

async def get_facts(self, resource: ResourceDetails) -> inmanta.types.Apireturn:
async def get_facts(self, resource: ResourceDetails) -> None:
self.facts_count += 1

async def open_version(self, version: int) -> None:
Expand Down Expand Up @@ -251,6 +250,9 @@ def check_with_scheduler(self, scheduler: ResourceScheduler) -> None:
if status:
assert scheduler._state.resource_state[resource].status == status

def set_parameters(self, fact_result: FactResult) -> None:
self.state[Id.parse_id(fact_result.resource_id).resource_str()] = const.ResourceState.deploying

async def dryrun_update(self, env: UUID, dryrun_result: DryrunResult) -> None:
self.state[Id.parse_id(dryrun_result.rvid).resource_str()] = const.ResourceState.dry

Expand Down