Skip to content

Commit

Permalink
Merge pull request #500 from beer-garden/plugin-requires
Browse files Browse the repository at this point in the history
Brewtils Plugin requires
  • Loading branch information
1maple1 authored Aug 21, 2024
2 parents f85cc2b + 4cd4868 commit e2f73e8
Show file tree
Hide file tree
Showing 10 changed files with 209 additions and 21 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
Brewtils Changelog
==================

3.27.1
------
TBD

- Updated Plugin class to accept requires and requires_timeout attributes to require plugin to wait for dependencies before starting.

3.27.0
------
8/13/24
Expand Down
10 changes: 10 additions & 0 deletions brewtils/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ def client(
group=None, # type: str
groups=[], # type: Optional[List[str]]
prefix_topic=None, # type: Optional[str]
require=None, # type: str
requires=[], # type: Optional[List[str]]
):
# type: (...) -> Type
"""Class decorator that marks a class as a beer-garden Client
Expand Down Expand Up @@ -70,6 +72,8 @@ def client(
group: Optional plugin group
groups: Optional plugin groups
prefix_topic: Optional prefix for Generated Command to Topic mappings
require: Optional system dependency
requires: Optional system dependencies
Returns:
The decorated class
Expand All @@ -83,6 +87,8 @@ def client(
groups=groups,
group=group,
prefix_topic=prefix_topic,
require=require,
requires=requires,
) # noqa

# Assign these here so linters don't complain
Expand All @@ -92,10 +98,14 @@ def client(
_wrapped._current_request = None
_wrapped._groups = groups
_wrapped._prefix_topic = prefix_topic
_wrapped._requires = requires

if group:
_wrapped._groups.append(group)

if require:
_wrapped._requires.append(require)

return _wrapped


Expand Down
5 changes: 5 additions & 0 deletions brewtils/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ class Instance(BaseModel):
"STARTING",
"STOPPING",
"UNKNOWN",
"AWAITING_SYSTEM",
}

def __init__(
Expand Down Expand Up @@ -821,6 +822,8 @@ def __init__(
template=None,
groups=None,
prefix_topic=None,
requires=None,
requires_timeout=None,
):
self.name = name
self.description = description
Expand All @@ -837,6 +840,8 @@ def __init__(
self.template = template
self.groups = groups or []
self.prefix_topic = prefix_topic
self.requires = requires or []
self.requires_timeout = requires_timeout

def __str__(self):
return "%s:%s-%s" % (self.namespace, self.name, self.version)
Expand Down
111 changes: 90 additions & 21 deletions brewtils/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ class Plugin(object):
- ``display_name``
- ``group``
- ``groups``
- ``require``
- ``requires``
- ``requires_timeout``
Connection information tells the Plugin how to communicate with Beer-garden. The
most important of these is the ``bg_host`` (to tell the plugin where to find the
Expand Down Expand Up @@ -172,6 +175,10 @@ class Plugin(object):
instance_name (str): Instance name
namespace (str): Namespace name
require (str): Required system dependency
requires (list): Required systems dependencies
requires_timeout (int): Timeout to wait for dependencies
group (str): Grouping label applied to plugin
groups (list): Grouping labels applied to plugin
Expand Down Expand Up @@ -256,20 +263,24 @@ def run(self):
"attribute to an instance of a class decorated with @brewtils.system"
)

self._startup()
self._logger.info("Plugin %s has started", self.unique_name)

try:
# Need the timeout param so this works correctly in Python 2
while not self._shutdown_event.wait(timeout=0.1):
pass
except KeyboardInterrupt:
self._logger.debug("Received KeyboardInterrupt - shutting down")
except Exception as ex:
self._logger.exception("Exception during wait, shutting down: %s", ex)
self._startup()
self._logger.info("Plugin %s has started", self.unique_name)

self._shutdown()
self._logger.info("Plugin %s has terminated", self.unique_name)
try:
# Need the timeout param so this works correctly in Python 2
while not self._shutdown_event.wait(timeout=0.1):
pass
except KeyboardInterrupt:
self._logger.debug("Received KeyboardInterrupt - shutting down")
except Exception as ex:
self._logger.exception("Exception during wait, shutting down: %s", ex)

self._shutdown()
except PluginValidationError:
self._shutdown(status="ERROR")
finally:
self._logger.info("Plugin %s has terminated", self.unique_name)

@property
def client(self):
Expand Down Expand Up @@ -299,6 +310,8 @@ def _set_client(self, new_client):
self._system.prefix_topic = getattr(
new_client, "_prefix_topic", None
) # noqa
if not self._system.requires:
self._system.requires = getattr(new_client, "_requires", []) # noqa
# Now roll up / interpret all metadata to get the Commands
self._system.commands = _parse_client(new_client)

Expand All @@ -315,6 +328,7 @@ def _set_client(self, new_client):
client_clazz._bg_commands = self._system.commands
client_clazz._groups = self._system.groups
client_clazz._prefix_topic = self._system.prefix_topic
client_clazz._requires = self._system.requires
client_clazz._current_request = client_clazz.current_request
except TypeError:
if sys.version_info.major != 2:
Expand Down Expand Up @@ -368,6 +382,35 @@ def _hook(exc_type, exc_value, traceback):

sys.excepthook = _hook

def get_system_dependency(self, require, timeout=300):
wait_time = 0.1
while timeout > 0:
system = self._ez_client.find_unique_system(name=require, local=True)
if (
system
and system.instances
and any("RUNNING" == instance.status for instance in system.instances)
):
return system
self.logger.error(
f"Waiting {wait_time:.1f} seconds before next attempt for {self._system} "
f"dependency for {require}"
)
timeout = timeout - wait_time
wait_time = min(wait_time * 2, 30)
self._wait(wait_time)

raise PluginValidationError(
f"Failed to resolve {self._system} dependency for {require}"
)

def await_dependencies(self, requires, config):
for req in requires:
system = self.get_system_dependency(req, config.requires_timeout)
self.logger.info(
f"Resolved system {system} for {req}: {config.name} {config.instance_name}"
)

def _startup(self):
"""Plugin startup procedure
Expand Down Expand Up @@ -406,12 +449,20 @@ def _startup(self):
self._logger.debug("Initializing and starting processors")
self._admin_processor, self._request_processor = self._initialize_processors()
self._admin_processor.startup()
self._request_processor.startup()

self._logger.debug("Setting signal handlers")
self._set_signal_handlers()
try:
if self._system.requires:
self.await_dependencies(self._system.requires, self._config)
except PluginValidationError:
raise
else:
self._start()
self._request_processor.startup()
finally:
self._logger.debug("Setting signal handlers")
self._set_signal_handlers()

def _shutdown(self):
def _shutdown(self, status="STOPPED"):
"""Plugin shutdown procedure
This method gracefully stops the plugin. When it completes the plugin should be
Expand All @@ -422,14 +473,18 @@ def _shutdown(self):
self._shutdown_event.set()

self._logger.debug("Shutting down processors")
self._request_processor.shutdown()
# Join will cause an exception if processor thread wasn't started
try:
self._request_processor.shutdown()
except RuntimeError:
pass
self._admin_processor.shutdown()

try:
self._ez_client.update_instance(self._instance.id, new_status="STOPPED")
self._ez_client.update_instance(self._instance.id, new_status=status)
except Exception:
self._logger.warning(
"Unable to notify Beer-garden that this plugin is STOPPED, so this "
f"Unable to notify Beer-garden that this plugin is {status}, so this "
"plugin's status may be incorrect in Beer-garden"
)

Expand Down Expand Up @@ -540,6 +595,7 @@ def _initialize_system(self):
"icon_name": self._system.icon_name,
"template": self._system.template,
"groups": self._system.groups,
"requires": self._system.requires,
}

# And if this particular instance doesn't exist we want to add it
Expand Down Expand Up @@ -590,13 +646,13 @@ def _initialize_processors(self):
thread_name="Admin Consumer",
queue_name=self._instance.queue_info["admin"]["name"],
max_concurrent=1,
**common_args
**common_args,
)
request_consumer = RequestConsumer.create(
thread_name="Request Consumer",
queue_name=self._instance.queue_info["request"]["name"],
max_concurrent=self._config.max_concurrent,
**common_args
**common_args,
)

# Both RequestProcessors need an updater
Expand Down Expand Up @@ -635,6 +691,14 @@ def _start(self):
self._instance.id, new_status="RUNNING"
)

def _wait(self, timeout):
"""Handle wait request"""
# Set the status to wait
self._instance = self._ez_client.update_instance(
self._instance.id, new_status="AWAITING_SYSTEM"
)
self._shutdown_event.wait(timeout)

def _stop(self):
"""Handle stop Request"""
# Because the run() method is on a 0.1s sleep there's a race regarding if the
Expand Down Expand Up @@ -847,6 +911,9 @@ def _setup_system(self, system, plugin_kwargs):
if self._config.group:
self._config.groups.append(self._config.group)

if self._config.require:
self._config.requires.append(self._config.require)

system = System(
name=self._config.name,
version=self._config.version,
Expand All @@ -860,6 +927,8 @@ def _setup_system(self, system, plugin_kwargs):
template=self._config.template,
groups=self._config.groups,
prefix_topic=self._config.prefix_topic,
requires=self._config.requires,
requires_timeout=self._config.requires_timeout,
)

return system
Expand Down
5 changes: 5 additions & 0 deletions brewtils/rest/easy_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,7 @@ def update_system(self, system_id, new_commands=None, **kwargs):
icon_name (str): New System icon name
template (str): New System template
groups (list): New System groups
requires (list): New System dependencies
Returns:
System: The updated system
Expand All @@ -424,6 +425,10 @@ def update_system(self, system_id, new_commands=None, **kwargs):
if groups:
operations.append(PatchOperation("replace", "/groups", groups))

requires = kwargs.pop("requires", [])
if requires:
operations.append(PatchOperation("replace", "/requires", requires))

# The remaining kwargs are all strings
# Sending an empty string (instead of None) ensures they're actually cleared
for key, value in kwargs.items():
Expand Down
2 changes: 2 additions & 0 deletions brewtils/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,8 @@ class SystemSchema(BaseSchema):
template = fields.Str(allow_none=True)
groups = fields.List(fields.Str(), allow_none=True)
prefix_topic = fields.Str(allow_none=True)
requires = fields.List(fields.Str(), allow_none=True)
requires_timeout = fields.Integer(allow_none=True)


class SystemDomainIdentifierSchema(BaseSchema):
Expand Down
17 changes: 17 additions & 0 deletions brewtils/specification.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,23 @@ def _is_json_dict(s):
"<garden name>.<namespace>.<system name>.<system version>.<system instance>.<command name>"
"if a prefix is provided, then it is `<prefix>.<command name>`",
},
"require": {
"type": "str",
"description": "A requires system dependency",
"required": False,
},
"requires": {
"type": "list",
"description": "The required system dependencies",
"items": {"name": {"type": "str"}},
"required": False,
"default": [],
},
"requires_timeout": {
"type": "int",
"description": "The dependency timeout to use",
"default": 300,
},
}

_PLUGIN_SPEC = {
Expand Down
2 changes: 2 additions & 0 deletions brewtils/test/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,8 @@ def system_dict(instance_dict, command_dict, command_dict_2, system_id):
"template": "<html>template</html>",
"groups": ["GroupB", "GroupA"],
"prefix_topic": "custom_topic",
"requires": ["SystemA"],
"requires_timeout": 300,
}


Expand Down
Loading

0 comments on commit e2f73e8

Please sign in to comment.