Skip to content

Commit

Permalink
Merge pull request #4783 from opsmill/dga-20241031-worker
Browse files Browse the repository at this point in the history
Instrument Prefect Worker to make it easier to override some of it
  • Loading branch information
dgarros authored Oct 30, 2024
2 parents 5e2ed0f + ad20cbd commit f9a2e0e
Showing 1 changed file with 60 additions and 38 deletions.
98 changes: 60 additions & 38 deletions backend/infrahub/workers/infrahub_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@
from infrahub.git import initialize_repositories_directory
from infrahub.lock import initialize_lock
from infrahub.services import InfrahubServices, services
from infrahub.services.adapters.cache import InfrahubCache
from infrahub.services.adapters.cache.nats import NATSCache
from infrahub.services.adapters.cache.redis import RedisCache
from infrahub.services.adapters.message_bus import InfrahubMessageBus
from infrahub.services.adapters.message_bus.nats import NATSMessageBus
from infrahub.services.adapters.message_bus.rabbitmq import RabbitMQMessageBus
from infrahub.services.adapters.workflow import InfrahubWorkflow
from infrahub.services.adapters.workflow.local import WorkflowLocalExecution
from infrahub.services.adapters.workflow.worker import WorkflowWorkerExecution
from infrahub.workflows.models import TASK_RESULT_STORAGE_NAME
Expand Down Expand Up @@ -99,44 +102,8 @@ async def setup(
)
)

if not client:
self._logger.debug(f"Using Infrahub API at {config.SETTINGS.main.internal_address}")
client = InfrahubClient(
config=Config(address=config.SETTINGS.main.internal_address, retry_on_failure=True, log=self._logger)
)

try:
await client.branch.all()
except SdkError as exc:
self._logger.error(f"Error in communication with Infrahub: {exc.message}")
raise typer.Exit(1)

database = InfrahubDatabase(driver=await get_db(retry=1))

workflow = config.OVERRIDE.workflow or (
WorkflowWorkerExecution()
if config.SETTINGS.workflow.driver == config.WorkflowDriver.WORKER
else WorkflowLocalExecution()
)

message_bus = config.OVERRIDE.message_bus or (
NATSMessageBus() if config.SETTINGS.broker.driver == config.BrokerDriver.NATS else RabbitMQMessageBus()
)
cache = config.OVERRIDE.cache or (
NATSCache() if config.SETTINGS.cache.driver == config.CacheDriver.NATS else RedisCache()
)

service = InfrahubServices(
cache=cache,
client=client,
database=database,
message_bus=message_bus,
workflow=workflow,
component_type=ComponentType.GIT_AGENT,
)
services.service = service

await service.initialize()
client = await self._init_infrahub_client(client=client)
service = await self._init_services(client=client)

if not registry.schema_has_been_initialized():
initialize_lock(service=service)
Expand Down Expand Up @@ -178,3 +145,58 @@ async def run(
status_code=0,
identifier=str(flow_run.id),
)

async def _init_infrahub_client(self, client: InfrahubClient | None = None) -> InfrahubClient:
if not client:
self._logger.debug(f"Using Infrahub API at {config.SETTINGS.main.internal_address}")
client = InfrahubClient(
config=Config(address=config.SETTINGS.main.internal_address, retry_on_failure=True, log=self._logger)
)

try:
await client.branch.all()
except SdkError as exc:
self._logger.error(f"Error in communication with Infrahub: {exc.message}")
raise typer.Exit(1)

return client

async def _init_database(self) -> InfrahubDatabase:
return InfrahubDatabase(driver=await get_db(retry=1))

async def _init_workflow(self) -> InfrahubWorkflow:
return config.OVERRIDE.workflow or (
WorkflowWorkerExecution()
if config.SETTINGS.workflow.driver == config.WorkflowDriver.WORKER
else WorkflowLocalExecution()
)

async def _init_message_bus(self) -> InfrahubMessageBus:
return config.OVERRIDE.message_bus or (
NATSMessageBus() if config.SETTINGS.broker.driver == config.BrokerDriver.NATS else RabbitMQMessageBus()
)

async def _init_cache(self) -> InfrahubCache:
return config.OVERRIDE.cache or (
NATSCache() if config.SETTINGS.cache.driver == config.CacheDriver.NATS else RedisCache()
)

async def _init_services(self, client: InfrahubClient) -> InfrahubServices:
database = await self._init_database()
workflow = await self._init_workflow()
message_bus = await self._init_message_bus()
cache = await self._init_cache()

service = InfrahubServices(
cache=cache,
client=client,
database=database,
message_bus=message_bus,
workflow=workflow,
component_type=ComponentType.GIT_AGENT,
)

services.service = service
await service.initialize()

return service

0 comments on commit f9a2e0e

Please sign in to comment.