Skip to content

Commit

Permalink
Convert BranchDelete task and event to Prefect
Browse files Browse the repository at this point in the history
  • Loading branch information
dgarros committed Oct 30, 2024
1 parent 3002117 commit 2a178a9
Show file tree
Hide file tree
Showing 24 changed files with 149 additions and 272 deletions.
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ on:
push:
branches:
- develop
- develop-*
- stable
- release-*

Expand Down
19 changes: 18 additions & 1 deletion backend/infrahub/core/branch/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@
from infrahub.core.migrations.schema.tasks import schema_apply_migrations
from infrahub.core.validators.models.validate_migration import SchemaValidateMigrationData
from infrahub.core.validators.tasks import schema_validate_migrations
from infrahub.events.branch_action import BranchDeleteEvent
from infrahub.exceptions import ValidationError
from infrahub.log import get_log_data
from infrahub.message_bus import Meta, messages
from infrahub.services import services
from infrahub.worker import WORKER_IDENTITY
from infrahub.workflows.catalogue import IPAM_RECONCILIATION
from infrahub.workflows.catalogue import BRANCH_CANCEL_PROPOSED_CHANGES, IPAM_RECONCILIATION
from infrahub.workflows.utils import add_branch_tag


Expand Down Expand Up @@ -165,3 +166,19 @@ async def merge_branch(branch: str, conflict_resolution: dict[str, bool] | None
meta=Meta(initiator_id=WORKER_IDENTITY, request_id=request_id),
)
await service.send(message=message)


@flow(name="branch-delete")
async def delete_branch(branch: str) -> None:
service = services.service

await add_branch_tag(branch_name=branch)

obj = await Branch.get_by_name(db=service.database, name=str(branch))
assert obj.id
event = BranchDeleteEvent(branch=branch, branch_id=obj.id, sync_with_git=obj.sync_with_git)
await obj.delete(db=service.database)

await service.workflow.submit_workflow(workflow=BRANCH_CANCEL_PROPOSED_CHANGES, parameters={"branch_name": branch})

await service.event.send(event=event)
35 changes: 35 additions & 0 deletions backend/infrahub/events/branch_action.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from pydantic import Field

from infrahub.message_bus import InfrahubMessage
from infrahub.message_bus.messages.event_branch_delete import EventBranchDelete
from infrahub.message_bus.messages.refresh_registry_branches import RefreshRegistryBranches

from .models import InfrahubBranchEvent


class BranchDeleteEvent(InfrahubBranchEvent):
"""Event generated when a branch has been deleted"""

branch_id: str = Field(..., description="The ID of the mutated node")
sync_with_git: bool = Field(..., description="Indicates if the branch was extended to Git")

def get_name(self) -> str:
return f"{self.get_event_namespace()}.branch.deleted"

def get_resource(self) -> dict[str, str]:
return {
"prefect.resource.id": f"infrahub.branch.{self.branch}",
"infrahub.branch.id": self.branch_id,
}

def get_messages(self) -> list[InfrahubMessage]:
events = [
EventBranchDelete(
branch=self.branch,
branch_id=self.branch_id,
sync_with_git=self.sync_with_git,
meta=self.get_message_meta(),
),
RefreshRegistryBranches(),
]
return events
2 changes: 1 addition & 1 deletion backend/infrahub/events/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def get_name(self) -> str:
def get_resource(self) -> dict[str, str]:
raise NotImplementedError

def get_message(self) -> InfrahubMessage:
def get_messages(self) -> list[InfrahubMessage]:
raise NotImplementedError

def get_related(self) -> list[dict[str, str]]:
Expand Down
21 changes: 12 additions & 9 deletions backend/infrahub/events/node_action.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from pydantic import Field

from infrahub.core.constants import MutationAction
from infrahub.message_bus import InfrahubMessage
from infrahub.message_bus.messages.event_node_mutated import EventNodeMutated

from .models import InfrahubBranchEvent
Expand All @@ -29,12 +30,14 @@ def get_resource(self) -> dict[str, str]:
def get_payload(self) -> dict[str, Any]:
return self.data

def get_message(self) -> EventNodeMutated:
return EventNodeMutated(
branch=self.branch,
kind=self.kind,
node_id=self.node_id,
action=self.action.value,
data=self.data,
meta=self.get_message_meta(),
)
def get_messages(self) -> list[InfrahubMessage]:
return [
EventNodeMutated(
branch=self.branch,
kind=self.kind,
node_id=self.node_id,
action=self.action.value,
data=self.data,
meta=self.get_message_meta(),
)
]
22 changes: 5 additions & 17 deletions backend/infrahub/graphql/mutations/branch.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from infrahub.log import get_log_data, get_logger
from infrahub.message_bus import Meta, messages
from infrahub.worker import WORKER_IDENTITY
from infrahub.workflows.catalogue import BRANCH_MERGE, BRANCH_REBASE
from infrahub.workflows.catalogue import BRANCH_DELETE, BRANCH_MERGE, BRANCH_REBASE

from ..types import BranchType

Expand Down Expand Up @@ -129,22 +129,10 @@ class Arguments:
async def mutate(cls, root: dict, info: GraphQLResolveInfo, data: BranchNameInput) -> Self:
context: GraphqlContext = info.context

async with UserTask.from_graphql_context(title=f"Delete branch: {data['name']}", context=context):
obj = await Branch.get_by_name(db=context.db, name=str(data.name))
await obj.delete(db=context.db)

if context.service:
log_data = get_log_data()
request_id = log_data.get("request_id", "")
message = messages.EventBranchDelete(
branch=obj.name,
branch_id=str(obj.id),
sync_with_git=obj.sync_with_git,
meta=Meta(request_id=request_id),
)
await context.service.send(message=message)

return cls(ok=True)
obj = await Branch.get_by_name(db=context.db, name=str(data.name))
assert context.service
await context.service.workflow.execute_workflow(workflow=BRANCH_DELETE, parameters={"branch": obj.name})
return cls(ok=True)


class BranchUpdate(Mutation):
Expand Down
4 changes: 0 additions & 4 deletions backend/infrahub/message_bus/messages/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,13 @@
from .request_generatordefinition_check import RequestGeneratorDefinitionCheck
from .request_generatordefinition_run import RequestGeneratorDefinitionRun
from .request_graphqlquerygroup_update import RequestGraphQLQueryGroupUpdate
from .request_proposed_change_cancel import RequestProposedChangeCancel
from .request_proposedchange_pipeline import RequestProposedChangePipeline
from .request_repository_checks import RequestRepositoryChecks
from .request_repository_userchecks import RequestRepositoryUserChecks
from .schema_migration_path import SchemaMigrationPath, SchemaMigrationPathResponse
from .schema_validator_path import SchemaValidatorPath, SchemaValidatorPathResponse
from .send_echo_request import SendEchoRequest, SendEchoRequestResponse
from .trigger_generatordefinition_run import TriggerGeneratorDefinitionRun
from .trigger_proposed_change_cancel import TriggerProposedChangeCancel
from .trigger_webhook_actions import TriggerWebhookActions

MESSAGE_MAP: dict[str, type[InfrahubMessage]] = {
Expand Down Expand Up @@ -76,7 +74,6 @@
"request.generator_definition.check": RequestGeneratorDefinitionCheck,
"request.generator_definition.run": RequestGeneratorDefinitionRun,
"request.graphql_query_group.update": RequestGraphQLQueryGroupUpdate,
"request.proposed_change.cancel": RequestProposedChangeCancel,
"request.proposed_change.data_integrity": RequestProposedChangeDataIntegrity,
"request.proposed_change.pipeline": RequestProposedChangePipeline,
"request.proposed_change.refresh_artifacts": RequestProposedChangeRefreshArtifacts,
Expand All @@ -88,7 +85,6 @@
"request.repository.user_checks": RequestRepositoryUserChecks,
"send.echo.request": SendEchoRequest,
"trigger.generator_definition.run": TriggerGeneratorDefinitionRun,
"trigger.proposed_change.cancel": TriggerProposedChangeCancel,
"trigger.webhook.actions": TriggerWebhookActions,
}

Expand Down

This file was deleted.

This file was deleted.

3 changes: 0 additions & 3 deletions backend/infrahub/message_bus/operations/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
"check.repository.merge_conflicts": check.repository.merge_conflicts,
"check.repository.user_check": check.repository.user_check,
"event.branch.create": event.branch.create,
"event.branch.delete": event.branch.delete,
"event.branch.merge": event.branch.merge,
"event.branch.rebased": event.branch.rebased,
"event.node.mutated": event.node.mutated,
Expand All @@ -48,7 +47,6 @@
"request.generator_definition.run": requests.generator_definition.run,
"request.graphql_query_group.update": requests.graphql_query_group.update,
"request.artifact_definition.check": requests.artifact_definition.check,
"request.proposed_change.cancel": requests.proposed_change.cancel,
"request.proposed_change.data_integrity": requests.proposed_change.data_integrity,
"request.proposed_change.pipeline": requests.proposed_change.pipeline,
"request.proposed_change.refresh_artifacts": requests.proposed_change.refresh_artifacts,
Expand All @@ -62,7 +60,6 @@
"schema.migration.path": schema.migration.path,
"schema.validator.path": schema.validator.path,
"trigger.generator_definition.run": trigger.generator_definition.run,
"trigger.proposed_change.cancel": trigger.proposed_change.cancel,
"trigger.webhook.actions": trigger.webhook.actions,
}

Expand Down
14 changes: 0 additions & 14 deletions backend/infrahub/message_bus/operations/event/branch.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,6 @@ async def create(message: messages.EventBranchCreate, service: InfrahubServices)
await service.send(message=event)


@flow(name="event-branch-delete")
async def delete(message: messages.EventBranchDelete, service: InfrahubServices) -> None:
log.info("Branch was deleted", branch=message.branch)

events: List[InfrahubMessage] = [
messages.RefreshRegistryBranches(),
messages.TriggerProposedChangeCancel(branch=message.branch),
]

for event in events:
event.assign_meta(parent=message)
await service.send(message=event)


@flow(name="branch-event-merge")
async def merge(message: messages.EventBranchMerge, service: InfrahubServices) -> None:
log.info("Branch merged", source_branch=message.source_branch, target_branch=message.target_branch)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@
from typing import TYPE_CHECKING, Union

import pytest
from infrahub_sdk.protocols import CoreGeneratorDefinition, CoreProposedChange
from infrahub_sdk.protocols import CoreGeneratorDefinition
from prefect import flow
from pydantic import BaseModel

from infrahub import config, lock
from infrahub.core.constants import CheckType, InfrahubKind, ProposedChangeState, RepositoryInternalStatus
from infrahub.core.constants import CheckType, InfrahubKind, RepositoryInternalStatus
from infrahub.core.diff.coordinator import DiffCoordinator
from infrahub.core.diff.model.diff import SchemaConflict
from infrahub.core.integrity.object_conflict.conflict_recorder import ObjectConflictValidatorRecorder
Expand Down Expand Up @@ -70,19 +70,6 @@ def log_line(self) -> str:
return "Doesn't require changes due to no relevant modified kinds or file changes in Git"


@flow(name="proposed-changed-cancel")
async def cancel(message: messages.RequestProposedChangeCancel, service: InfrahubServices) -> None:
"""Cancel a proposed change."""
async with service.task_report(
related_node=message.proposed_change,
title="Canceling proposed change",
) as task_report:
await task_report.info("Canceling proposed change as the source branch was deleted", id=message.proposed_change)
proposed_change = await service.client.get(kind=CoreProposedChange, id=message.proposed_change)
proposed_change.state.value = ProposedChangeState.CANCELED.value
await proposed_change.save()


@flow(name="proposed-changed-data-integrity")
async def data_integrity(message: messages.RequestProposedChangeDataIntegrity, service: InfrahubServices) -> None:
"""Triggers a data integrity validation check on the provided proposed change to start."""
Expand Down Expand Up @@ -408,8 +395,8 @@ async def run_generators(message: messages.RequestProposedChangeRunGenerators, s
related_node=message.proposed_change,
title="Evaluating Generators",
) as task_report:
generators: list[CoreGeneratorDefinition] = await service.client.filters(
kind=InfrahubKind.GENERATORDEFINITION,
generators = await service.client.filters(
kind=CoreGeneratorDefinition,
prefetch_relationships=True,
populate_store=True,
branch=message.source_branch,
Expand Down
4 changes: 2 additions & 2 deletions backend/infrahub/message_bus/operations/trigger/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from . import generator_definition, proposed_change, webhook
from . import generator_definition, webhook

__all__ = ["generator_definition", "proposed_change", "webhook"]
__all__ = ["generator_definition", "webhook"]
27 changes: 0 additions & 27 deletions backend/infrahub/message_bus/operations/trigger/proposed_change.py

This file was deleted.

Empty file.
41 changes: 41 additions & 0 deletions backend/infrahub/proposed_change/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from __future__ import annotations

from infrahub_sdk.protocols import CoreProposedChange
from prefect import flow, task
from prefect.logging import get_run_logger

from infrahub.core.constants import ProposedChangeState
from infrahub.services import (
services,
)


@flow(name="proposed-changes-cancel-branch", description="Cancel all Proposed change associated with a branch.")
async def cancel_proposed_changes_branch(branch_name: str) -> None:
service = services.service
proposed_changed_opened = await service.client.filters(
kind=CoreProposedChange,
include=["id", "source_branch"],
state__value=ProposedChangeState.OPEN.value,
source_branch__value=branch_name,
)
proposed_changed_closed = await service.client.filters(
kind=CoreProposedChange,
include=["id", "source_branch"],
state__value=ProposedChangeState.CLOSED.value,
source_branch__value=branch_name,
)

for proposed_change in proposed_changed_opened + proposed_changed_closed:
await cancel_proposed_change(proposed_change=proposed_change)


@task(description="Cancel a propose change")
async def cancel_proposed_change(proposed_change: CoreProposedChange) -> None:
service = services.service
log = get_run_logger()

log.info("Canceling proposed change as the source branch was deleted")
proposed_change = await service.client.get(kind=CoreProposedChange, id=proposed_change.id)
proposed_change.state.value = ProposedChangeState.CANCELED.value
await proposed_change.save()
4 changes: 2 additions & 2 deletions backend/infrahub/services/adapters/event/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ async def send(self, event: InfrahubEvent) -> None:
await asyncio.gather(*tasks)

async def _send_bus(self, event: InfrahubEvent) -> None:
message = event.get_message()
await self.service.send(message=message)
for message in event.get_messages():
await self.service.send(message=message)

async def _send_prefect(self, event: InfrahubEvent) -> None:
emit_event(
Expand Down
Loading

0 comments on commit 2a178a9

Please sign in to comment.