Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding file/bytes support to bartender. #66

Open
wants to merge 3 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 53 additions & 45 deletions bartender/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
from bartender.local_plugins.monitor import LocalPluginMonitor
from bartender.local_plugins.registry import LocalPluginRegistry
from bartender.local_plugins.validator import LocalPluginValidator
from bartender.mongo_pruner import MongoPruner
from bartender.mongo_pruner import MongoPruner, PruneTask, GridFSPrune
from bartender.monitor import PluginStatusMonitor
from bartender.pika import PikaClient
from bartender.pyrabbit import PyrabbitClient
from bartender.request_validator import RequestValidator
from bartender.thrift.handler import BartenderHandler
from bartender.thrift.server import make_server
from bg_utils.mongo.models import Event, Request
from bg_utils.mongo.models import Event, Request, RequestFile
from brewtils.models import Events
from brewtils.stoppable_thread import StoppableThread

Expand Down Expand Up @@ -100,14 +100,10 @@ def __init__(self):
),
]

# Only want to run the MongoPruner if it would do anything
tasks, run_every = self._setup_pruning_tasks()
if run_every:
self.helper_threads.append(
HelperThread(
MongoPruner, tasks=tasks, run_every=timedelta(minutes=run_every)
)
)
self.helper_threads.append(
HelperThread(MongoPruner, tasks=tasks, run_every=run_every)
)
hazmat345 marked this conversation as resolved.
Show resolved Hide resolved

super(BartenderApp, self).__init__(logger=self.logger, name="BartenderApp")

Expand Down Expand Up @@ -173,48 +169,60 @@ def _shutdown(self):

@staticmethod
def _setup_pruning_tasks():
info_ttl = bartender.config.db.ttl.info
action_ttl = bartender.config.db.ttl.action
event_ttl = bartender.config.db.ttl.event

# Delete request files that were created, but no request ever
# referenced them.
request_file_prune = PruneTask(
RequestFile,
"created_at",
delete_after=timedelta(minutes=15),
additional_query=Q(request=None),
)

prune_tasks = []
if bartender.config.db.ttl.info > 0:
prune_tasks.append(
{
"collection": Request,
"field": "created_at",
"delete_after": timedelta(minutes=bartender.config.db.ttl.info),
"additional_query": (
Q(status="SUCCESS") | Q(status="CANCELED") | Q(status="ERROR")
)
& Q(has_parent=False)
& Q(command_type="INFO"),
}
)
# Delete all orphaned gridfs objects.
gridfs_prune = GridFSPrune()

# Delete INFO/ACTION requests past the TTL.
base_query = (
Q(status="SUCCESS") | Q(status="CANCELED") | Q(status="ERROR")
) & Q(has_parent=False)
info_prune = PruneTask(
Request,
"created_at",
delete_after=timedelta(minutes=info_ttl),
additional_query=base_query & Q(command_type="INFO"),
)
action_prune = PruneTask(
Request,
"created_at",
delete_after=timedelta(minutes=action_ttl),
additional_query=base_query & Q(command_type="ACTION"),
)

if bartender.config.db.ttl.action > 0:
prune_tasks.append(
{
"collection": Request,
"field": "created_at",
"delete_after": timedelta(minutes=bartender.config.db.ttl.action),
"additional_query": (
Q(status="SUCCESS") | Q(status="CANCELED") | Q(status="ERROR")
)
& Q(has_parent=False)
& Q(command_type="ACTION"),
}
)
# Delete events past their TTL.
event_prune = PruneTask(Event, "timestamp", timedelta(minutes=event_ttl))

if bartender.config.db.ttl.event > 0:
prune_tasks.append(
{
"collection": Event,
"field": "timestamp",
"delete_after": timedelta(minutes=bartender.config.db.ttl.event),
}
)
prune_tasks = [request_file_prune]

if info_ttl > 0:
prune_tasks.append(info_prune)

if action_ttl > 0:
prune_tasks.append(action_prune)

if event_ttl > 0:
prune_tasks.append(event_prune)

# Order matters here, the orphan gridfs prune happens *AFTER* the info/action
# pruning, so that any affected requests get their files/chunks deleted.
prune_tasks.append(gridfs_prune)

# Look at the various TTLs to determine how often to run the MongoPruner
real_ttls = [x for x in bartender.config.db.ttl.values() if x > 0]
run_every = min(real_ttls) / 2 if real_ttls else None
run_every = min(real_ttls) / 2 if real_ttls else 7.5

return prune_tasks, run_every

Expand Down
71 changes: 46 additions & 25 deletions bartender/mongo_pruner.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,56 @@
import logging
from datetime import datetime, timedelta

from gridfs import GridFS
from mongoengine import Q
from mongoengine.connection import get_db

from bg_utils.mongo.models import RequestFile
from brewtils.stoppable_thread import StoppableThread


class PruneTask(object):
hazmat345 marked this conversation as resolved.
Show resolved Hide resolved
def __init__(self, collection, field, delete_after, additional_query=None):
self.logger = logging.getLogger(__name__)
self.collection = collection
self.field = field
self.delete_after = delete_after
self.additional_query = additional_query

def setup_query(self, delete_older_than):
query = Q(**{self.field + "__lt": delete_older_than})
if self.additional_query:
query = query & self.additional_query
return query

def execute(self):
current_time = datetime.utcnow()
delete_older_than = current_time - self.delete_after
query = self.setup_query(delete_older_than)
self.logger.debug(
"Removing %ss older than %s"
% (self.collection.__name__, str(delete_older_than))
)
self.collection.objects(query).delete()


class GridFSPrune(object):
def __init__(self, gridfs=None):
self.fs = gridfs

def execute(self):
if self.fs is None:
self.fs = GridFS(get_db())

orphan_ids = {f._id for f in self.fs.find()}
for rf in RequestFile.objects:
if rf.body.grid_id in orphan_ids:
orphan_ids.remove(rf.body.grid_id)

for file_id in orphan_ids:
self.fs.delete(file_id)


class MongoPruner(StoppableThread):
def __init__(self, tasks=None, run_every=timedelta(minutes=15)):
self.logger = logging.getLogger(__name__)
Expand All @@ -15,35 +60,11 @@ def __init__(self, tasks=None, run_every=timedelta(minutes=15)):

super(MongoPruner, self).__init__(logger=self.logger, name="Remover")

def add_task(
self, collection=None, field=None, delete_after=None, additional_query=None
):
self._tasks.append(
{
"collection": collection,
"field": field,
"delete_after": delete_after,
"additional_query": additional_query,
}
)

def run(self):
self.logger.info(self.display_name + " is started")

while not self.wait(self._run_every):
current_time = datetime.utcnow()

for task in self._tasks:
delete_older_than = current_time - task["delete_after"]

query = Q(**{task["field"] + "__lt": delete_older_than})
if task.get("additional_query", None):
query = query & task["additional_query"]

self.logger.debug(
"Removing %ss older than %s"
% (task["collection"].__name__, str(delete_older_than))
)
task["collection"].objects(query).delete()
task.execute()

self.logger.info(self.display_name + " is stopped")
53 changes: 42 additions & 11 deletions bartender/request_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@
import six
import urllib3
from builtins import str
from mongoengine import DoesNotExist
from requests import Session

import bartender
from bg_utils.mongo.models import System, Choices
from bg_utils.mongo.models import System, Choices, RequestFile
from brewtils.choices import parse
from brewtils.errors import ModelValidationError
from brewtils.rest.system_client import SystemClient
Expand Down Expand Up @@ -40,7 +41,7 @@ def validate_request(self, request):

request.parameters = self.get_and_validate_parameters(request, command)

return request
return request, command

def get_and_validate_system(self, request):
"""Ensure there is a system in the DB that corresponds to this Request.
Expand Down Expand Up @@ -427,42 +428,44 @@ def _validate_parameter_based_on_type(self, value, parameter, command, request):
"""Validates the value passed in, ensures the type matches.
Recursive calls for dictionaries which also have nested parameters"""

p_type = parameter.type.lower()

try:
if value is None and not parameter.nullable:
raise ModelValidationError(
"There is no value for parameter '%s' "
"and this field is not nullable." % parameter.key
)
elif parameter.type.upper() == "STRING":
elif p_type == "string":
if isinstance(value, six.string_types):
return str(value)
else:
raise TypeError("Invalid value for string (%s)" % value)
elif parameter.type.upper() == "INTEGER":
elif p_type == "integer":
if int(value) != float(value):
raise TypeError("Invalid value for integer (%s)" % value)
return int(value)
elif parameter.type.upper() == "FLOAT":
elif p_type == "float":
return float(value)
elif parameter.type.upper() == "ANY":
elif p_type == "any":
return value
elif parameter.type.upper() == "BOOLEAN":
elif p_type == "boolean":
if value in [True, False]:
return value
else:
raise TypeError("Invalid value for boolean (%s)" % value)
elif parameter.type.upper() == "DICTIONARY":
elif p_type == "dictionary":
dict_value = dict(value)
if parameter.parameters:
self.logger.debug("Found Nested Parameters.")
return self.get_and_validate_parameters(
request, command, parameter.parameters, dict_value
)
return dict_value
elif parameter.type.upper() == "DATE":
return int(value)
elif parameter.type.upper() == "DATETIME":
elif p_type in ["date", "datetime"]:
return int(value)
elif p_type == "bytes":
return self._get_bytes_value(value, request)
else:
raise ModelValidationError(
"Unknown type for parameter. Please contact a system administrator."
Expand All @@ -479,3 +482,31 @@ def _validate_parameter_based_on_type(self, value, parameter, command, request):
"Value for key: %s is not the correct type. Should be: %s"
% (parameter.key, parameter.type)
)

def _get_bytes_value(self, value, request):
required_keys = ["storage_type", "id", "filename"]
if not isinstance(value, dict):
raise ModelValidationError(
"Bytes parameters should be a dictionary with at least the following keys: %s"
% required_keys
)

for key in required_keys:
if key not in value:
raise ModelValidationError("Bytes parameter missing %s field" % key)

if value["storage_type"] not in RequestFile.STORAGE_ENGINES:
raise ModelValidationError(
"Bytes parameter had invalid storage type: %s" % value["storage_type"]
)

try:
rf = RequestFile.objects.get(id=value["id"])
rf.request = request
rf.save()
except DoesNotExist:
raise ModelValidationError(
"Bytes parameter had an id, but that id did not exist in the database."
)

return value
14 changes: 10 additions & 4 deletions bartender/thrift/handler.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import logging
import random
import string
Expand Down Expand Up @@ -47,12 +48,17 @@ def processRequest(self, request_id):
# Validates the request based on what is in the database.
# This includes the validation of the request parameters,
# systems are there, commands are there etc.
request = self.request_validator.validate_request(request)
request, command = self.request_validator.validate_request(request)
request.save()

if not self.clients["pika"].publish_request(
request, confirm=True, mandatory=True
):
publish_kwargs = {"confirm": True, "mandatory": True}
bytes_params = command.parameter_keys_by_type("Bytes")
if bytes_params:
publish_kwargs["headers"] = {
"resolve_parameters": json.dumps(bytes_params).encode("utf-8")
}

if not self.clients["pika"].publish_request(request, **publish_kwargs):
msg = "Error while publishing request to queue (%s[%s]-%s %s)" % (
request.system,
request.system_version,
Expand Down
Loading