diff --git a/CHANGELOG.rst b/CHANGELOG.rst index e2ded8b3..2cc30d1c 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,6 +1,12 @@ Brewtils Changelog ================== +3.27.1 +------ +TBD + +- Updated Plugin class to accept requires and requires_timeout attributes to require plugin to wait for dependencies before starting. + 3.27.0 ------ 8/13/24 diff --git a/brewtils/decorators.py b/brewtils/decorators.py index 7186aaa5..d6e332ba 100644 --- a/brewtils/decorators.py +++ b/brewtils/decorators.py @@ -41,6 +41,8 @@ def client( group=None, # type: str groups=[], # type: Optional[List[str]] prefix_topic=None, # type: Optional[str] + require=None, # type: str + requires=[], # type: Optional[List[str]] ): # type: (...) -> Type """Class decorator that marks a class as a beer-garden Client @@ -70,6 +72,8 @@ def client( group: Optional plugin group groups: Optional plugin groups prefix_topic: Optional prefix for Generated Command to Topic mappings + require: Optional system dependency + requires: Optional system dependencies Returns: The decorated class @@ -83,6 +87,8 @@ def client( groups=groups, group=group, prefix_topic=prefix_topic, + require=require, + requires=requires, ) # noqa # Assign these here so linters don't complain @@ -92,10 +98,14 @@ def client( _wrapped._current_request = None _wrapped._groups = groups _wrapped._prefix_topic = prefix_topic + _wrapped._requires = requires if group: _wrapped._groups.append(group) + if require: + _wrapped._requires.append(require) + return _wrapped diff --git a/brewtils/models.py b/brewtils/models.py index fe80efce..80c6de5f 100644 --- a/brewtils/models.py +++ b/brewtils/models.py @@ -241,6 +241,7 @@ class Instance(BaseModel): "STARTING", "STOPPING", "UNKNOWN", + "AWAITING_SYSTEM", } def __init__( @@ -821,6 +822,8 @@ def __init__( template=None, groups=None, prefix_topic=None, + requires=None, + requires_timeout=None, ): self.name = name self.description = description @@ -837,6 +840,8 @@ def __init__( self.template = template self.groups = groups or [] self.prefix_topic = prefix_topic + self.requires = requires or [] + self.requires_timeout = requires_timeout def __str__(self): return "%s:%s-%s" % (self.namespace, self.name, self.version) diff --git a/brewtils/plugin.py b/brewtils/plugin.py index a4724c02..b5b37e8b 100644 --- a/brewtils/plugin.py +++ b/brewtils/plugin.py @@ -85,6 +85,9 @@ class Plugin(object): - ``display_name`` - ``group`` - ``groups`` + - ``require`` + - ``requires`` + - ``requires_timeout`` Connection information tells the Plugin how to communicate with Beer-garden. The most important of these is the ``bg_host`` (to tell the plugin where to find the @@ -172,6 +175,10 @@ class Plugin(object): instance_name (str): Instance name namespace (str): Namespace name + require (str): Required system dependency + requires (list): Required systems dependencies + requires_timeout (int): Timeout to wait for dependencies + group (str): Grouping label applied to plugin groups (list): Grouping labels applied to plugin @@ -256,20 +263,24 @@ def run(self): "attribute to an instance of a class decorated with @brewtils.system" ) - self._startup() - self._logger.info("Plugin %s has started", self.unique_name) - try: - # Need the timeout param so this works correctly in Python 2 - while not self._shutdown_event.wait(timeout=0.1): - pass - except KeyboardInterrupt: - self._logger.debug("Received KeyboardInterrupt - shutting down") - except Exception as ex: - self._logger.exception("Exception during wait, shutting down: %s", ex) + self._startup() + self._logger.info("Plugin %s has started", self.unique_name) - self._shutdown() - self._logger.info("Plugin %s has terminated", self.unique_name) + try: + # Need the timeout param so this works correctly in Python 2 + while not self._shutdown_event.wait(timeout=0.1): + pass + except KeyboardInterrupt: + self._logger.debug("Received KeyboardInterrupt - shutting down") + except Exception as ex: + self._logger.exception("Exception during wait, shutting down: %s", ex) + + self._shutdown() + except PluginValidationError: + self._shutdown(status="ERROR") + finally: + self._logger.info("Plugin %s has terminated", self.unique_name) @property def client(self): @@ -299,6 +310,8 @@ def _set_client(self, new_client): self._system.prefix_topic = getattr( new_client, "_prefix_topic", None ) # noqa + if not self._system.requires: + self._system.requires = getattr(new_client, "_requires", []) # noqa # Now roll up / interpret all metadata to get the Commands self._system.commands = _parse_client(new_client) @@ -315,6 +328,7 @@ def _set_client(self, new_client): client_clazz._bg_commands = self._system.commands client_clazz._groups = self._system.groups client_clazz._prefix_topic = self._system.prefix_topic + client_clazz._requires = self._system.requires client_clazz._current_request = client_clazz.current_request except TypeError: if sys.version_info.major != 2: @@ -368,6 +382,35 @@ def _hook(exc_type, exc_value, traceback): sys.excepthook = _hook + def get_system_dependency(self, require, timeout=300): + wait_time = 0.1 + while timeout > 0: + system = self._ez_client.find_unique_system(name=require, local=True) + if ( + system + and system.instances + and any("RUNNING" == instance.status for instance in system.instances) + ): + return system + self.logger.error( + f"Waiting {wait_time:.1f} seconds before next attempt for {self._system} " + f"dependency for {require}" + ) + timeout = timeout - wait_time + wait_time = min(wait_time * 2, 30) + self._wait(wait_time) + + raise PluginValidationError( + f"Failed to resolve {self._system} dependency for {require}" + ) + + def await_dependencies(self, requires, config): + for req in requires: + system = self.get_system_dependency(req, config.requires_timeout) + self.logger.info( + f"Resolved system {system} for {req}: {config.name} {config.instance_name}" + ) + def _startup(self): """Plugin startup procedure @@ -406,12 +449,20 @@ def _startup(self): self._logger.debug("Initializing and starting processors") self._admin_processor, self._request_processor = self._initialize_processors() self._admin_processor.startup() - self._request_processor.startup() - self._logger.debug("Setting signal handlers") - self._set_signal_handlers() + try: + if self._system.requires: + self.await_dependencies(self._system.requires, self._config) + except PluginValidationError: + raise + else: + self._start() + self._request_processor.startup() + finally: + self._logger.debug("Setting signal handlers") + self._set_signal_handlers() - def _shutdown(self): + def _shutdown(self, status="STOPPED"): """Plugin shutdown procedure This method gracefully stops the plugin. When it completes the plugin should be @@ -422,14 +473,18 @@ def _shutdown(self): self._shutdown_event.set() self._logger.debug("Shutting down processors") - self._request_processor.shutdown() + # Join will cause an exception if processor thread wasn't started + try: + self._request_processor.shutdown() + except RuntimeError: + pass self._admin_processor.shutdown() try: - self._ez_client.update_instance(self._instance.id, new_status="STOPPED") + self._ez_client.update_instance(self._instance.id, new_status=status) except Exception: self._logger.warning( - "Unable to notify Beer-garden that this plugin is STOPPED, so this " + f"Unable to notify Beer-garden that this plugin is {status}, so this " "plugin's status may be incorrect in Beer-garden" ) @@ -540,6 +595,7 @@ def _initialize_system(self): "icon_name": self._system.icon_name, "template": self._system.template, "groups": self._system.groups, + "requires": self._system.requires, } # And if this particular instance doesn't exist we want to add it @@ -590,13 +646,13 @@ def _initialize_processors(self): thread_name="Admin Consumer", queue_name=self._instance.queue_info["admin"]["name"], max_concurrent=1, - **common_args + **common_args, ) request_consumer = RequestConsumer.create( thread_name="Request Consumer", queue_name=self._instance.queue_info["request"]["name"], max_concurrent=self._config.max_concurrent, - **common_args + **common_args, ) # Both RequestProcessors need an updater @@ -635,6 +691,14 @@ def _start(self): self._instance.id, new_status="RUNNING" ) + def _wait(self, timeout): + """Handle wait request""" + # Set the status to wait + self._instance = self._ez_client.update_instance( + self._instance.id, new_status="AWAITING_SYSTEM" + ) + self._shutdown_event.wait(timeout) + def _stop(self): """Handle stop Request""" # Because the run() method is on a 0.1s sleep there's a race regarding if the @@ -847,6 +911,9 @@ def _setup_system(self, system, plugin_kwargs): if self._config.group: self._config.groups.append(self._config.group) + if self._config.require: + self._config.requires.append(self._config.require) + system = System( name=self._config.name, version=self._config.version, @@ -860,6 +927,8 @@ def _setup_system(self, system, plugin_kwargs): template=self._config.template, groups=self._config.groups, prefix_topic=self._config.prefix_topic, + requires=self._config.requires, + requires_timeout=self._config.requires_timeout, ) return system diff --git a/brewtils/rest/easy_client.py b/brewtils/rest/easy_client.py index 9c2c5f86..919f6673 100644 --- a/brewtils/rest/easy_client.py +++ b/brewtils/rest/easy_client.py @@ -398,6 +398,7 @@ def update_system(self, system_id, new_commands=None, **kwargs): icon_name (str): New System icon name template (str): New System template groups (list): New System groups + requires (list): New System dependencies Returns: System: The updated system @@ -424,6 +425,10 @@ def update_system(self, system_id, new_commands=None, **kwargs): if groups: operations.append(PatchOperation("replace", "/groups", groups)) + requires = kwargs.pop("requires", []) + if requires: + operations.append(PatchOperation("replace", "/requires", requires)) + # The remaining kwargs are all strings # Sending an empty string (instead of None) ensures they're actually cleared for key, value in kwargs.items(): diff --git a/brewtils/schemas.py b/brewtils/schemas.py index 37d2bd22..f0553380 100644 --- a/brewtils/schemas.py +++ b/brewtils/schemas.py @@ -233,6 +233,8 @@ class SystemSchema(BaseSchema): template = fields.Str(allow_none=True) groups = fields.List(fields.Str(), allow_none=True) prefix_topic = fields.Str(allow_none=True) + requires = fields.List(fields.Str(), allow_none=True) + requires_timeout = fields.Integer(allow_none=True) class SystemDomainIdentifierSchema(BaseSchema): diff --git a/brewtils/specification.py b/brewtils/specification.py index acea8de6..2ae991c2 100644 --- a/brewtils/specification.py +++ b/brewtils/specification.py @@ -171,6 +171,23 @@ def _is_json_dict(s): "....." "if a prefix is provided, then it is `.`", }, + "require": { + "type": "str", + "description": "A requires system dependency", + "required": False, + }, + "requires": { + "type": "list", + "description": "The required system dependencies", + "items": {"name": {"type": "str"}}, + "required": False, + "default": [], + }, + "requires_timeout": { + "type": "int", + "description": "The dependency timeout to use", + "default": 300, + }, } _PLUGIN_SPEC = { diff --git a/brewtils/test/fixtures.py b/brewtils/test/fixtures.py index 2cf581cc..44e26b21 100644 --- a/brewtils/test/fixtures.py +++ b/brewtils/test/fixtures.py @@ -270,6 +270,8 @@ def system_dict(instance_dict, command_dict, command_dict_2, system_id): "template": "template", "groups": ["GroupB", "GroupA"], "prefix_topic": "custom_topic", + "requires": ["SystemA"], + "requires_timeout": 300, } diff --git a/test/decorators_test.py b/test/decorators_test.py index c7e4ad78..ed01ca8a 100644 --- a/test/decorators_test.py +++ b/test/decorators_test.py @@ -512,6 +512,7 @@ def foo(self): assert hasattr(ClientClass, "_current_request") assert hasattr(ClientClass, "_groups") assert hasattr(ClientClass, "_prefix_topic") + assert hasattr(ClientClass, "_requires") def test_with_args(self): @client( @@ -519,6 +520,7 @@ def test_with_args(self): bg_version="1.0.0", groups=["GroupA"], prefix_topic="custom_topic", + requires=["SystemA"], ) class ClientClass(object): @command @@ -531,11 +533,13 @@ def foo(self): assert hasattr(ClientClass, "_current_request") assert hasattr(ClientClass, "_groups") assert hasattr(ClientClass, "_prefix_topic") + assert hasattr(ClientClass, "_requires") assert ClientClass._bg_name == "sys" assert ClientClass._bg_version == "1.0.0" assert ClientClass._groups == ["GroupA"] assert ClientClass._prefix_topic == "custom_topic" + assert ClientClass._requires == ["SystemA"] def test_group(self): @client(bg_name="sys", bg_version="1.0.0", group="GroupB") @@ -571,6 +575,42 @@ def foo(self): assert ClientClass._bg_version == "1.0.0" assert ClientClass._groups == ["GroupA", "GroupB"] + def test_require(self): + @client(bg_name="sys", bg_version="1.0.0", require="SystemB") + class ClientClass(object): + @command + def foo(self): + pass + + assert hasattr(ClientClass, "_bg_name") + assert hasattr(ClientClass, "_bg_version") + assert hasattr(ClientClass, "_bg_commands") + assert hasattr(ClientClass, "_current_request") + assert hasattr(ClientClass, "_requires") + + assert ClientClass._bg_name == "sys" + assert ClientClass._bg_version == "1.0.0" + assert ClientClass._requires == ["SystemB"] + + def test_requires_combine(self): + @client( + bg_name="sys", bg_version="1.0.0", requires=["SystemA"], require="SystemB" + ) + class ClientClass(object): + @command + def foo(self): + pass + + assert hasattr(ClientClass, "_bg_name") + assert hasattr(ClientClass, "_bg_version") + assert hasattr(ClientClass, "_bg_commands") + assert hasattr(ClientClass, "_current_request") + assert hasattr(ClientClass, "_requires") + + assert ClientClass._bg_name == "sys" + assert ClientClass._bg_version == "1.0.0" + assert ClientClass._requires == ["SystemA", "SystemB"] + class TestCommand(object): """Test command decorator""" diff --git a/test/plugin_test.py b/test/plugin_test.py index c0b83d36..51ec4688 100644 --- a/test/plugin_test.py +++ b/test/plugin_test.py @@ -136,6 +136,23 @@ def test_groups(self, client): assert plugin._system.groups == ["GroupB", "GroupA"] + def test_requires(self, client): + logger = Mock() + plugin = Plugin( + client, + bg_host="host1", + bg_port=2338, + bg_url_prefix="/beer/", + ssl_enabled=False, + ca_verify=False, + logger=logger, + max_concurrent=1, + require="SystemA", + requires=["SystemB"], + ) + + assert plugin._system.requires == ["SystemB", "SystemA"] + def test_kwargs(self, client, bg_system): logger = Mock() @@ -152,6 +169,9 @@ def test_kwargs(self, client, bg_system): group="GroupA", groups=["GroupB"], prefix_topic="custom.topic", + require="SystemA", + requires=["SystemB"], + requires_timeout=200, ) assert plugin._logger == logger @@ -164,6 +184,10 @@ def test_kwargs(self, client, bg_system): assert "GroupA" == plugin._config.group assert "GroupB" in plugin._config.groups assert "GroupA" not in plugin._config.groups + assert "SystemA" == plugin._config.require + assert "SystemB" in plugin._config.requires + assert "SystemA" not in plugin._config.requires + assert plugin._config.requires_timeout == 200 def test_env(self, client, bg_system): os.environ["BG_HOST"] = "remotehost" @@ -173,6 +197,7 @@ def test_env(self, client, bg_system): os.environ["BG_CA_VERIFY"] = "False" os.environ["BG_GROUP"] = "GroupA" os.environ["BG_PREFIX_TOPIC"] = "custom.topic" + os.environ["BG_REQUIRE"] = "SystemA" plugin = Plugin(client, system=bg_system, max_concurrent=1) @@ -183,6 +208,7 @@ def test_env(self, client, bg_system): assert plugin._config.ca_verify is False assert plugin._config.prefix_topic == "custom.topic" assert "GroupA" == plugin._config.group + assert "SystemA" in plugin._config.require def test_conflicts(self, client, bg_system): os.environ["BG_HOST"] = "remotehost" @@ -342,6 +368,7 @@ def test_success( plugin._initialize_processors = Mock( return_value=(admin_processor, request_processor) ) + plugin._ez_client.find_unique_system = Mock(return_value=bg_system) plugin._startup() assert admin_processor.startup.called is True @@ -362,6 +389,7 @@ def test_success_no_ns( plugin._initialize_processors = Mock( return_value=(admin_processor, request_processor) ) + plugin._ez_client.find_unique_system = Mock(return_value=bg_system) plugin._startup() assert admin_processor.startup.called is True @@ -504,6 +532,7 @@ def test_system_exists( display_name=bg_system.display_name, template="template", groups=bg_system.groups, + requires=bg_system.requires, ) # assert ez_client.create_system.return_value == plugin.system @@ -534,6 +563,7 @@ def test_new_instance(self, plugin, ez_client, bg_system, bg_instance): template="template", add_instance=ANY, groups=["GroupB", "GroupA"], + requires=["SystemA"], ) assert ez_client.update_system.call_args[1]["add_instance"].name == new_name @@ -811,6 +841,7 @@ def test_construct_system(self, plugin): "icon_name": "icon", "display_name": "display_name", "metadata": '{"foo": "bar"}', + "requires": [], } ) @@ -825,6 +856,7 @@ def _validate_system(new_system): assert new_system.icon_name == "icon" assert new_system.metadata == {"foo": "bar"} assert new_system.display_name == "display_name" + assert new_system.requires == [] class TestValidateSystem(object):