Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Instrument Prefect Worker to make it easier to override some of it #4783

Merged
merged 1 commit into from
Oct 30, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 60 additions & 38 deletions backend/infrahub/workers/infrahub_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@
from infrahub.git import initialize_repositories_directory
from infrahub.lock import initialize_lock
from infrahub.services import InfrahubServices, services
from infrahub.services.adapters.cache import InfrahubCache
from infrahub.services.adapters.cache.nats import NATSCache
from infrahub.services.adapters.cache.redis import RedisCache
from infrahub.services.adapters.message_bus import InfrahubMessageBus
from infrahub.services.adapters.message_bus.nats import NATSMessageBus
from infrahub.services.adapters.message_bus.rabbitmq import RabbitMQMessageBus
from infrahub.services.adapters.workflow import InfrahubWorkflow
from infrahub.services.adapters.workflow.local import WorkflowLocalExecution
from infrahub.services.adapters.workflow.worker import WorkflowWorkerExecution
from infrahub.workflows.models import TASK_RESULT_STORAGE_NAME
Expand Down Expand Up @@ -99,44 +102,8 @@ async def setup(
)
)

if not client:
self._logger.debug(f"Using Infrahub API at {config.SETTINGS.main.internal_address}")
client = InfrahubClient(
config=Config(address=config.SETTINGS.main.internal_address, retry_on_failure=True, log=self._logger)
)

try:
await client.branch.all()
except SdkError as exc:
self._logger.error(f"Error in communication with Infrahub: {exc.message}")
raise typer.Exit(1)

database = InfrahubDatabase(driver=await get_db(retry=1))

workflow = config.OVERRIDE.workflow or (
WorkflowWorkerExecution()
if config.SETTINGS.workflow.driver == config.WorkflowDriver.WORKER
else WorkflowLocalExecution()
)

message_bus = config.OVERRIDE.message_bus or (
NATSMessageBus() if config.SETTINGS.broker.driver == config.BrokerDriver.NATS else RabbitMQMessageBus()
)
cache = config.OVERRIDE.cache or (
NATSCache() if config.SETTINGS.cache.driver == config.CacheDriver.NATS else RedisCache()
)

service = InfrahubServices(
cache=cache,
client=client,
database=database,
message_bus=message_bus,
workflow=workflow,
component_type=ComponentType.GIT_AGENT,
)
services.service = service

await service.initialize()
client = await self._init_infrahub_client(client=client)
service = await self._init_services(client=client)

if not registry.schema_has_been_initialized():
initialize_lock(service=service)
Expand Down Expand Up @@ -178,3 +145,58 @@ async def run(
status_code=0,
identifier=str(flow_run.id),
)

async def _init_infrahub_client(self, client: InfrahubClient | None = None) -> InfrahubClient:
if not client:
self._logger.debug(f"Using Infrahub API at {config.SETTINGS.main.internal_address}")
client = InfrahubClient(
config=Config(address=config.SETTINGS.main.internal_address, retry_on_failure=True, log=self._logger)
)

try:
await client.branch.all()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is just copying around the code but thinking about the error we saw in the pipeline It would be nice to move forward with this #3481. Should we add that issue to 1.0.1 or 1.1?

except SdkError as exc:
self._logger.error(f"Error in communication with Infrahub: {exc.message}")
raise typer.Exit(1)

return client

async def _init_database(self) -> InfrahubDatabase:
return InfrahubDatabase(driver=await get_db(retry=1))

async def _init_workflow(self) -> InfrahubWorkflow:
return config.OVERRIDE.workflow or (
WorkflowWorkerExecution()
if config.SETTINGS.workflow.driver == config.WorkflowDriver.WORKER
else WorkflowLocalExecution()
)

async def _init_message_bus(self) -> InfrahubMessageBus:
return config.OVERRIDE.message_bus or (
NATSMessageBus() if config.SETTINGS.broker.driver == config.BrokerDriver.NATS else RabbitMQMessageBus()
)

async def _init_cache(self) -> InfrahubCache:
return config.OVERRIDE.cache or (
NATSCache() if config.SETTINGS.cache.driver == config.CacheDriver.NATS else RedisCache()
)

async def _init_services(self, client: InfrahubClient) -> InfrahubServices:
database = await self._init_database()
workflow = await self._init_workflow()
message_bus = await self._init_message_bus()
cache = await self._init_cache()

service = InfrahubServices(
cache=cache,
client=client,
database=database,
message_bus=message_bus,
workflow=workflow,
component_type=ComponentType.GIT_AGENT,
)

services.service = service
await service.initialize()

return service
Loading