Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing Tron logs for jobs using other services images #1001

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions tests/utils/scribereader_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
from unittest import mock

import pytest
import yaml

import tron.utils.scribereader
from tron.utils.scribereader import get_log_namespace
from tron.utils.scribereader import read_log_stream_for_action_run

try:
Expand Down Expand Up @@ -420,3 +422,63 @@ def test_read_log_stream_for_action_run_min_date_and_max_date_for_long_output():
# The expected output should be max_lines plus the
# extra line for 'This output is truncated.' message
assert len(output) == max_lines + 1


def test_get_log_namespace_yml_file_found():
action_run_id = "namespace.job.1234.action"
paasta_cluster = "fake_cluster"
config_content = """
job:
actions:
action:
service: test_service
"""
with mock.patch("builtins.open", mock.mock_open(read_data=config_content)), mock.patch(
"yaml.safe_load", return_value=yaml.safe_load(config_content)
):
result = get_log_namespace(action_run_id, paasta_cluster)
assert result == "test_service"


def test_get_log_namespace_file_not_found():
action_run_id = "namespace.job.1234.action"
paasta_cluster = "fake_cluster"
with mock.patch("builtins.open", side_effect=FileNotFoundError):
result = get_log_namespace(action_run_id, paasta_cluster)
assert result == "namespace"


def test_get_log_namespace_yaml_error():
action_run_id = "namespace.job.1234.action"
paasta_cluster = "fake_cluster"
with mock.patch("builtins.open", mock.mock_open(read_data="invalid_yaml")), mock.patch(
"yaml.safe_load", side_effect=yaml.YAMLError
):
result = get_log_namespace(action_run_id, paasta_cluster)
assert result == "namespace"


def test_get_log_namespace_generic_error():
action_run_id = "namespace.job.1234.action"
paasta_cluster = "fake_cluster"
with mock.patch("builtins.open", mock.mock_open(read_data="some_data")), mock.patch(
"yaml.safe_load", side_effect=Exception
):
result = get_log_namespace(action_run_id, paasta_cluster)
assert result == "namespace"


def test_get_log_namespace_service_not_found():
action_run_id = "namespace.job.1234.action"
paasta_cluster = "fake_cluster"
config_content = """
job:
actions:
action:
command: "sleep 10"
"""
with mock.patch("builtins.open", mock.mock_open(read_data=config_content)), mock.patch(
"yaml.safe_load", return_value=yaml.safe_load(config_content)
):
result = get_log_namespace(action_run_id, paasta_cluster)
assert result == "namespace"
25 changes: 24 additions & 1 deletion tron/utils/scribereader.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from typing import Tuple

import staticconf # type: ignore
import yaml

from tron.config.static_config import get_config_watcher
from tron.config.static_config import NAMESPACE
Expand Down Expand Up @@ -81,12 +82,34 @@ def get_scribereader_host_and_port(ecosystem: str, superregion: str, region: str
return host, port


def get_log_namespace(action_run_id: str, paasta_cluster: str) -> str:
namespace, job_name, _, action = action_run_id.split(".")
for ext in ["yaml", "yml"]:
try:
with open(f"/nail/etc/services/{namespace}/tron-{paasta_cluster}.{ext}") as f:
config = yaml.safe_load(f)
service: Optional[str] = (
config.get(job_name, {}).get("actions", {}).get(action, {}).get("service", None)
)
if service:
return service
except FileNotFoundError:
log.warning(f"yelp-soaconfig file tron-{paasta_cluster}.{ext} not found.")
except yaml.YAMLError as e:
log.error(f"Error parsing YAML file tron-{paasta_cluster}.{ext}: {e}")
except Exception as e:
log.error(f"Error reading service for {action} from file tron-{paasta_cluster}.{ext}: {e}")

return namespace


class PaaSTALogs:
def __init__(self, component: str, paasta_cluster: str, action_run_id: str) -> None:
self.component = component
self.paasta_cluster = paasta_cluster
self.action_run_id = action_run_id
namespace, job_name, run_num, action = action_run_id.split(".")
_, job_name, run_num, action = action_run_id.split(".")
namespace = get_log_namespace(action_run_id, paasta_cluster)
# in our logging infra, things are logged to per-instance streams - but
# since Tron PaaSTA instances are of the form `job_name.action`, we need
# to escape the period since some parts of our infra will reject streams
Expand Down
Loading