diff --git a/talker_agent/Dockerfile b/talker_agent/Dockerfile index db45218..9b891c8 100644 --- a/talker_agent/Dockerfile +++ b/talker_agent/Dockerfile @@ -1,8 +1,7 @@ FROM python:2.7-slim-buster #args from docker-compose.yaml -ARG TALKER_AGENT_VERSION -ARG TALKER_HOST_ID +ARG TALKER_AGENT_RELEASE RUN mkdir -p /root/talker WORKDIR /root/talker @@ -10,7 +9,6 @@ WORKDIR /root/talker RUN apt-get update -y && apt-get install curl locales -y RUN echo "LC_ALL=en_US.UTF-8" >> /etc/environment && echo "en_US.UTF-8 UTF-8" >> /etc/locale.gen && echo "LANG=en_US.UTF-8" > /etc/locale.conf RUN locale-gen en_US.UTF-8 -RUN echo {\"password\": null, \"port\": 6379, \"host\": \"redis\", \"host_id\": \"$TALKER_HOST_ID\"} > conf.json COPY install_agent.sh ./ RUN chmod +x install_agent.sh && ./install_agent.sh @@ -20,7 +18,6 @@ RUN chmod +x /etc/init.d/talker RUN ln -s /usr/local/bin/python /usr/bin/python RUN update-rc.d talker defaults -COPY talker.py ./ -RUN chmod +x talker.py +COPY config.ini ./ CMD service talker start && tail -f /dev/null diff --git a/talker_agent/config.ini b/talker_agent/config.ini new file mode 100644 index 0000000..521841f --- /dev/null +++ b/talker_agent/config.ini @@ -0,0 +1,13 @@ +[agent] +host_id = MyHostId + +[redis] +host = redis +port = 6379 + +[logging] +logpath = /var/log/talker.log + +[metrics] +enabled = True +host = graphite diff --git a/talker_agent/install_agent.sh b/talker_agent/install_agent.sh index 112a2e4..4a887a3 100755 --- a/talker_agent/install_agent.sh +++ b/talker_agent/install_agent.sh @@ -3,27 +3,12 @@ set -e if [ "$DOWNLOAD_TALKER" = true ] ; then - # talker agent version can be: - # tag - for example: 1.8.2 - # tag-commit_id - for example: 1.8.2-02ee1aed8275c79a5d0840505caaaef0fa53b26e - if [[ $TALKER_AGENT_VERSION == *"-"* ]]; then - IFS='-' read -r -a parts <<< "$TALKER_AGENT_VERSION" - git_version=${parts[1]} - else - git_version=v$TALKER_AGENT_VERSION - fi - sudo mkdir -p /root/talker cd /root/talker - curl -sfL https://github.com/weka-io/talker/raw/"$git_version"/talker_agent/talker-service > talker-service - curl -sfL https://github.com/weka-io/talker/raw/"$git_version"/talker_agent/talker.py > talker.py - chmod +x talker.py + curl -sfL https://github.com/weka-io/talker/raw/"$TALKER_AGENT_RELEASE"/talker_agent/talker-service > talker-service fi -echo "$TALKER_AGENT_VERSION" > version +curl -sfL https://github.com/weka-io/talker/releases/download/"$TALKER_AGENT_RELEASE"/talker.bin > talker.bin +chmod +x talker.bin -# Add redis py dependency -REDIS_COMMIT_ID=ad84781ea260be0a1ca4bf6768959b50e8835a6b -curl -sfL https://github.com/weka-io/redis-py/archive/"$REDIS_COMMIT_ID".tar.gz | tar -xz -mv redis-py-"$REDIS_COMMIT_ID"/redis . -rm -rf redis-py-"$REDIS_COMMIT_ID" +echo "$TALKER_AGENT_RELEASE" > version diff --git a/talker_agent/talker-service b/talker_agent/talker-service index 97c504d..054eed5 100644 --- a/talker_agent/talker-service +++ b/talker_agent/talker-service @@ -18,7 +18,7 @@ else fi NAME=/etc/init.d/talker -CMD=/root/talker/talker.py +CMD=/root/talker/talker.bin export LC_CTYPE=en_US.UTF-8 export LC_ALL=en_US.UTF-8 diff --git a/talker_agent/talker.py b/talker_agent/talker.py index 0b84e7f..fe969ca 100644 --- a/talker_agent/talker.py +++ b/talker_agent/talker.py @@ -38,6 +38,11 @@ from contextlib import contextmanager from logging import getLogger from logging.handlers import RotatingFileHandler +try: + from configparser import ConfigParser +except: # python 2.7 + from ConfigParser import ConfigParser +import graphyte PY3 = sys.version_info[0] == 3 @@ -72,7 +77,7 @@ def reraise(tp, value, tb=None): # =========================================================================================== -CONFIG_FILENAME = '/root/talker/conf.json' +CONFIG_FILENAME = '/root/talker/config.ini' REBOOT_FILENAME = '/root/talker/reboot.id' EXCEPTION_FILENAME = '/root/talker/last_exception' VERSION_FILENAME = '/root/talker/version' @@ -98,6 +103,8 @@ def reraise(tp, value, tb=None): JOBS_EXPIRATION = 15 # 20 * 60 # how long to keep job ids in the EOS registry (exactly-once-semantics) +config = None + class LineTimeout(Exception): pass @@ -111,6 +118,39 @@ class JobTimeout(Exception): pass +class Config: + def __init__(self, filename=CONFIG_FILENAME): + self._filename = filename + self._parser = ConfigParser() + self.load_configuration() + + @property + def parser(self): + return self._parser + + def load_configuration(self): + try: + self._parser.read(self._filename) + except (IOError, OSError) as e: + logger.error(e, exc_info=True) + sys.exit(1) + + logger.info("Loaded configuration: %s", self._filename) + + self.validate_config() + + def update(self, section, **kwargs): + for key, val in kwargs.items(): + self._parser.set(section, key, val) + + with open(self._filename, 'w') as configfile: + self._parser.write(configfile) + + def validate_config(self): + if not self._parser.get('agent', 'host_id'): + raise Exception('config agent.host_id is missing') + + class Job(object): class OutputChannel(object): @@ -669,6 +709,9 @@ def finalize_previous_session(self): def start_job(self, job_data_raw): job_data = json.loads(job_data_raw) cmd = job_data['cmd'] + if config.parser.getboolean('metrics', 'enabled'): + logger.debug('sending command metrics') + graphyte.send('commands_received', 1) if isinstance(cmd, list): if SIMULATE_DROPS_RATE and cmd != ['true'] and random.random() > SIMULATE_DROPS_RATE: logger.warning("dropping job: %(id)s", job_data) @@ -786,7 +829,14 @@ def start(self): reraise(*self.exc_info) assert False, "exception should have been raised" - def setup(self, host_id, host, port, password, socket_timeout=10, retry_on_timeout=True, health_check_interval=30): + def setup(self, socket_timeout=10, retry_on_timeout=True, health_check_interval=30): + host_id = config.parser.get('agent', 'host_id') + host = config.parser.get('redis', 'host') + port = config.parser.getint('redis', 'port') + try: + password = config.parser.get('redis', 'password') + except: + password = None logger.info("Connecting to redis %s:%s", host, port) import redis # deferring so that importing talker (for ut) doesn't immediately fail if package not available self.redis = redis.StrictRedis( @@ -857,12 +907,7 @@ def set_logging_to_file(logpath): FILE_LOG_HANDLER = handler - with open(CONFIG_FILENAME, 'r') as f: - config = json.load(f) - - with open(CONFIG_FILENAME, 'w') as f: - config.update(logpath=logpath) - json.dump(config, f) + config.update('logging', logpath=logpath) def setup_logging(verbose): @@ -879,29 +924,19 @@ def setup_logging(verbose): logging.root.addHandler(handler) -def load_configuration(): - try: - with open(CONFIG_FILENAME, 'r') as f: - config = json.load(f) - except (IOError, OSError): - logger.warning("No configuration, exiting") - sys.exit(1) - - logger.info("Loaded configuration: %s", CONFIG_FILENAME) - for p in sorted(config.items()): - logger.info(" %s: %s", *p) - return config - - def main(*args): + global config + setup_logging(verbose="-v" in args) no_restart = "--no-restart" in args version = open(VERSION_FILENAME).read() logger.info("Starting Talker: %s", version) - config = load_configuration() - set_logging_to_file(config.pop("logpath", "/var/log/talker.log")) + config = Config() + set_logging_to_file(config.parser.get('logging', 'logpath')) + if config.parser.getboolean('metrics', 'enabled'): + graphyte.init(config.parser.get('metrics', 'host')) # to help with WEKAPP-74054 os.system("df") @@ -912,7 +947,7 @@ def main(*args): try: agent = TalkerAgent() - agent.setup(**config) + agent.setup() agent.start() except SystemExit: raise diff --git a/tests/integration/docker-compose.yml b/tests/integration/docker-compose.yml index d57e8d9..b354437 100644 --- a/tests/integration/docker-compose.yml +++ b/tests/integration/docker-compose.yml @@ -2,11 +2,21 @@ version: '3' services: redis: image: "redis:4.0.14-alpine" + graphite: + image: "graphiteapp/graphite-statsd" + container_name: graphite + ports: + - 80:80 + - 2003-2004:2003-2004 + - 2023-2024:2023-2024 + - 8125:8125/udp + - 8126:8126 talker: build: context: ../../talker_agent/. args: - - TALKER_AGENT_VERSION=$TALKER_AGENT_VERSION + - TALKER_AGENT_RELEASE=$TALKER_AGENT_RELEASE - TALKER_HOST_ID=$TALKER_HOST_ID links: - redis + - graphite diff --git a/tests/integration/test_sanity.py b/tests/integration/test_sanity.py index 25687a5..112939d 100644 --- a/tests/integration/test_sanity.py +++ b/tests/integration/test_sanity.py @@ -1,10 +1,15 @@ import unittest +from talker_agent.talker import Config from talker.errors import CommandAbortedByOverflow from tests.utils import get_talker_client, get_retcode, get_stdout class IntegrationTest(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.host_id = Config('talker_agent/config.ini').parser.get('agent', 'host_id') + def setUp(self): super().setUp() self.client = get_talker_client() @@ -14,12 +19,12 @@ def tearDown(self) -> None: self.client.redis.flushall() def test_echo_hello_command(self): - cmd = self.client.run('MyHostId', 'bash', '-ce', 'echo hello') + cmd = self.client.run(self.host_id, 'bash', '-ce', 'echo hello') result = cmd.result() self.assertEqual(result, 'hello\n') def test_max_output_exceeds_maximum(self): - cmd = self.client.run('MyHostId', 'bash', '-ce', 'yes') + cmd = self.client.run(self.host_id, 'bash', '-ce', 'yes') with self.assertRaises(CommandAbortedByOverflow): cmd.result() @@ -27,7 +32,7 @@ def test_max_output_per_channel_set(self): max_output_per_channel = 3 for val, expected_ret in [('123', '0'), ('1234', 'overflowed')]: cmd = self.client.run( - 'MyHostId', 'bash', '-ce', 'echo -n {}'.format(val), max_output_per_channel=max_output_per_channel) + self.host_id, 'bash', '-ce', 'echo -n {}'.format(val), max_output_per_channel=max_output_per_channel) ret = get_retcode(self.client.redis, cmd.job_id) self.assertEqual(ret, expected_ret) if expected_ret == 0: