From 41616d5c47276f2bfe3cd53371ca7cfb34cf0274 Mon Sep 17 00:00:00 2001 From: Keiran Price Date: Wed, 28 Feb 2024 13:32:38 +0000 Subject: [PATCH 01/10] Add delete and type hints and library and update action --- .github/workflows/linting.yml | 2 +- .../file_watcher_operator.py | 35 ++++++++++++++----- pyproject.toml | 1 + 3 files changed, 29 insertions(+), 9 deletions(-) diff --git a/.github/workflows/linting.yml b/.github/workflows/linting.yml index bb26eeb..11e0905 100644 --- a/.github/workflows/linting.yml +++ b/.github/workflows/linting.yml @@ -25,4 +25,4 @@ jobs: run: pylint file_watcher - name: Run MyPy - run: mypy --strict file_watcher + run: mypy --strict file_watcher file_watcher_operator diff --git a/file_watcher_operator/file_watcher_operator.py b/file_watcher_operator/file_watcher_operator.py index 0e92564..05fb24f 100644 --- a/file_watcher_operator/file_watcher_operator.py +++ b/file_watcher_operator/file_watcher_operator.py @@ -1,9 +1,13 @@ +""" +Filewatcher operator controls the deployments, PVs, and PVCs based on instrument CRDs +""" import logging import os import sys +from typing import Dict, Any, Mapping, Tuple, List, MutableMapping -import kopf as kopf -import kubernetes +import kopf +import kubernetes # type: ignore import yaml stdout_handler = logging.StreamHandler(stream=sys.stdout) @@ -15,7 +19,15 @@ logger = logging.getLogger(__name__) -def generate_deployment_body(spec, name): +def generate_deployment_body( + spec: Mapping[str, Any], name: str +) -> Tuple[MutableMapping[str, Any], MutableMapping[str, Any], MutableMapping[str, Any]]: + """ + + :param spec: + :param name: + :return: + """ archive_dir = os.environ.get("ARCHIVE_DIR", "/archive") queue_host = os.environ.get("QUEUE_HOST", "rabbitmq-cluster.rabbitmq.svc.cluster.local") queue_name = os.environ.get("EGRESS_QUEUE_NAME", "watched-files") @@ -144,7 +156,7 @@ def generate_deployment_body(spec, name): return deployment_spec, pvc_spec, pv_spec -def deploy_deployment(deployment_spec, name, children): +def deploy_deployment(deployment_spec: Mapping[str, Any], name: str, children: List[Any]) -> None: app_api = kubernetes.client.AppsV1Api() logger.info(f"Starting deployment of: {name} filewatcher") namespace = os.environ.get("FILEWATCHER_NAMESPACE", "ir") @@ -153,7 +165,7 @@ def deploy_deployment(deployment_spec, name, children): logger.info(f"Deployed: {name} filewatcher") -def deploy_pvc(pvc_spec, name, children): +def deploy_pvc(pvc_spec: Mapping[str, Any], name: str, children: List[Any]) -> None: namespace = os.environ.get("FILEWATCHER_NAMESPACE", "ir") core_api = kubernetes.client.CoreV1Api() # Check if PVC exists else deploy a new one: @@ -167,7 +179,7 @@ def deploy_pvc(pvc_spec, name, children): logger.info(f"Deployed PVC: {name} filewatcher") -def deploy_pv(pv_spec, name, children): +def deploy_pv(pv_spec: Mapping[str, Any], name: str, children: List[Any]) -> None: core_api = kubernetes.client.CoreV1Api() # Check if PV exists else deploy a new one if pv_spec["metadata"]["name"] not in [ii.metadata.name for ii in core_api.list_persistent_volume().items]: @@ -178,7 +190,7 @@ def deploy_pv(pv_spec, name, children): @kopf.on.create("ir.com", "v1", "filewatchers") -def create_fn(spec, **kwargs): +def create_fn(spec: Any, **kwargs: Any) -> Dict[str, List[Any]]: name = kwargs["body"]["metadata"]["name"] logger.info(f"Name is {name}") @@ -187,9 +199,16 @@ def create_fn(spec, **kwargs): kopf.adopt(deployment_spec) kopf.adopt(pvc_spec) - children = [] + children: List[Any] = [] deploy_pv(pv_spec, name, children) deploy_pvc(pvc_spec, name, children) deploy_deployment(deployment_spec, name, children) # Update controller's status with child deployment return {"children": children} + + +@kopf.on.delete("ir.com", "v1", "filewatchers") +def delete_func(**kwargs: Any) -> None: + name = kwargs["body"]["metadata"]["name"] + client = kubernetes.client.CoreV1Api() + client.delete_persistent_volume(name=f"{name}-file-watcher-pv") diff --git a/pyproject.toml b/pyproject.toml index a5f8f3b..4ddc9e7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,6 +28,7 @@ test = [ code-inspection = [ "pylint==3.0.2", "mypy==1.6.0", + "types-PyYAML==6.0.12.12", "file_watcher[test]" ] From f1e6540339da269262cb7a6a77183f0201002afe Mon Sep 17 00:00:00 2001 From: Keiran Price Date: Wed, 28 Feb 2024 13:47:17 +0000 Subject: [PATCH 02/10] Lint the operator --- .github/workflows/linting.yml | 2 +- .../file_watcher_operator.py | 51 ++++++++++++++++--- 2 files changed, 45 insertions(+), 8 deletions(-) diff --git a/.github/workflows/linting.yml b/.github/workflows/linting.yml index 11e0905..0bfba74 100644 --- a/.github/workflows/linting.yml +++ b/.github/workflows/linting.yml @@ -22,7 +22,7 @@ jobs: python -m pip install .[code-inspection] - name: Run pylint - run: pylint file_watcher + run: pylint file_watcher file_watcher_operator - name: Run MyPy run: mypy --strict file_watcher file_watcher_operator diff --git a/file_watcher_operator/file_watcher_operator.py b/file_watcher_operator/file_watcher_operator.py index 05fb24f..8df22d3 100644 --- a/file_watcher_operator/file_watcher_operator.py +++ b/file_watcher_operator/file_watcher_operator.py @@ -157,15 +157,31 @@ def generate_deployment_body( def deploy_deployment(deployment_spec: Mapping[str, Any], name: str, children: List[Any]) -> None: + """ + Given a deployment spec, name, and operators children, create the namespaced deployment and add it's uid to the + children + :param deployment_spec: The deployment spec + :param name: The name of the spec + :param children: The operators children + :return: None + """ app_api = kubernetes.client.AppsV1Api() - logger.info(f"Starting deployment of: {name} filewatcher") + logger.info("Starting deployment of: %s filewatcher", name) namespace = os.environ.get("FILEWATCHER_NAMESPACE", "ir") depl = app_api.create_namespaced_deployment(namespace=namespace, body=deployment_spec) children.append(depl.metadata.uid) - logger.info(f"Deployed: {name} filewatcher") + logger.info("Deployed: %s filewatcher", name) def deploy_pvc(pvc_spec: Mapping[str, Any], name: str, children: List[Any]) -> None: + """ + Given a pvc spec, name, and the operators children, create the namespaced persistent volume claim and add its uid + to the operators children + :param pvc_spec: The pvc spec + :param name: The name of the pvc + :param children: The operators children + :return: None + """ namespace = os.environ.get("FILEWATCHER_NAMESPACE", "ir") core_api = kubernetes.client.CoreV1Api() # Check if PVC exists else deploy a new one: @@ -173,26 +189,41 @@ def deploy_pvc(pvc_spec: Mapping[str, Any], name: str, children: List[Any]) -> N ii.metadata.name for ii in core_api.list_namespaced_persistent_volume_claim(pvc_spec["metadata"]["namespace"]).items ]: - logger.info(f"Starting deployment of PVC: {name} filewatcher") + logger.info("Starting deployment of PVC: %s filewatcher", name) pvc = core_api.create_namespaced_persistent_volume_claim(namespace=namespace, body=pvc_spec) children.append(pvc.metadata.uid) - logger.info(f"Deployed PVC: {name} filewatcher") + logger.info("Deployed PVC: %s filewatcher", name) def deploy_pv(pv_spec: Mapping[str, Any], name: str, children: List[Any]) -> None: + """ + Given a pvc spec, name, and the operators children, create the namespaced persistent volume and add its uid + to the operators children + :param pv_spec: The pv spec + :param name: The name of the pv + :param children: The operators children + :return: None + """ core_api = kubernetes.client.CoreV1Api() # Check if PV exists else deploy a new one if pv_spec["metadata"]["name"] not in [ii.metadata.name for ii in core_api.list_persistent_volume().items]: - logger.info(f"Starting deployment of PV: {name} filewatcher") + logger.info("Starting deployment of PV: %s filewatcher", name) pv = core_api.create_persistent_volume(body=pv_spec) children.append(pv.metadata.uid) - logger.info(f"Deployed PV: {name} filewatcher") + logger.info("Deployed PV: %s filewatcher", name) @kopf.on.create("ir.com", "v1", "filewatchers") def create_fn(spec: Any, **kwargs: Any) -> Dict[str, List[Any]]: + """ + Kopf create event handler, generates all 3 specs then creates them in the cluster, while creating the children and + adopting the deployment and pvc + :param spec: Spec of the CRD intercepted by kopf + :param kwargs: KWARGS + :return: None + """ name = kwargs["body"]["metadata"]["name"] - logger.info(f"Name is {name}") + logger.info("Name is %s", name) deployment_spec, pvc_spec, pv_spec = generate_deployment_body(spec, name) # Make the deployment the child of this operator @@ -209,6 +240,12 @@ def create_fn(spec: Any, **kwargs: Any) -> Dict[str, List[Any]]: @kopf.on.delete("ir.com", "v1", "filewatchers") def delete_func(**kwargs: Any) -> None: + """ + Kopf delete event handler. This will automatically delete the filewatcher deployment and pvc, and will manually + delete the persitent volume + :param kwargs: kwargs + :return: None + """ name = kwargs["body"]["metadata"]["name"] client = kubernetes.client.CoreV1Api() client.delete_persistent_volume(name=f"{name}-file-watcher-pv") From 0e0028d3428c138b9dc5bb6a110b817ff1677275 Mon Sep 17 00:00:00 2001 From: Keiran Price Date: Wed, 28 Feb 2024 13:47:37 +0000 Subject: [PATCH 03/10] Fix doc spacing --- README.md | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 41f864e..b095c3a 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,10 @@ There is a recovery attempt that can be made for the instrument, by checking if instrument by using some saved state in the Database. ## FileWatcherOperator -The point of the operator is to check for CustomResourceDefinition files that have been applied to the cluster. Examples can be found in the GitOps repository of what these should look like as part of the deployment for the file-watcher-operator. When a CRD is applied, this software should create a Deployment responsible for ensuring a file-watcher exists for the parameters in the CRD file. +The point of the operator is to check for CustomResourceDefinition files that have been applied to the cluster. +Examples can be found in the GitOps repository of what these should look like as part of the deployment for the +file-watcher-operator. When a CRD is applied, this software should create a Deployment responsible for ensuring a +file-watcher exists for the parameters in the CRD file. ## Docker From 92486154d1e5b280b9160b3f38320fd83872ab8f Mon Sep 17 00:00:00 2001 From: Keiran Price Date: Wed, 28 Feb 2024 14:08:45 +0000 Subject: [PATCH 04/10] Fix doc spacing --- .github/workflows/linting.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/linting.yml b/.github/workflows/linting.yml index 0bfba74..4139c80 100644 --- a/.github/workflows/linting.yml +++ b/.github/workflows/linting.yml @@ -22,7 +22,7 @@ jobs: python -m pip install .[code-inspection] - name: Run pylint - run: pylint file_watcher file_watcher_operator + run: pylint file_watcher file_watcher_operator/file_watcher_operator.py - name: Run MyPy run: mypy --strict file_watcher file_watcher_operator From 71d545553d5aa837cb4253cd754ad4c673fa1253 Mon Sep 17 00:00:00 2001 From: Keiran Price Date: Wed, 28 Feb 2024 14:17:59 +0000 Subject: [PATCH 05/10] Include operator dependencies for inspection --- pyproject.toml | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 4ddc9e7..71fb8f3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,6 +15,12 @@ dependencies = [ file_watcher = "file_watcher.main:main" [project.optional-dependencies] +operator = [ + "kopf==1.37.1", + "kubernetes==29.0.0", + "PyYAML==6.0.1" +] + formatting = [ "black==23.10.1" ] @@ -29,7 +35,8 @@ code-inspection = [ "pylint==3.0.2", "mypy==1.6.0", "types-PyYAML==6.0.12.12", - "file_watcher[test]" + "file_watcher[test]", + "file_watcher[operator]" ] dev = [ From a8e5a4712ddd50403fa6342dcf94c246f0fb7e64 Mon Sep 17 00:00:00 2001 From: Keiran Price Date: Wed, 28 Feb 2024 14:21:20 +0000 Subject: [PATCH 06/10] Add pylint disable --- file_watcher_operator/file_watcher_operator.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/file_watcher_operator/file_watcher_operator.py b/file_watcher_operator/file_watcher_operator.py index 8df22d3..bdd25b2 100644 --- a/file_watcher_operator/file_watcher_operator.py +++ b/file_watcher_operator/file_watcher_operator.py @@ -10,6 +10,8 @@ import kubernetes # type: ignore import yaml +# pylint: disable = (duplicate-code) +# This will be detected from the file watcher which is not the same application. stdout_handler = logging.StreamHandler(stream=sys.stdout) logging.basicConfig( handlers=[stdout_handler], @@ -17,6 +19,7 @@ level=logging.INFO, ) logger = logging.getLogger(__name__) +# pylint: enable = duplicate-code def generate_deployment_body( From b535f4c670afda394366aa620cf935b5701820a1 Mon Sep 17 00:00:00 2001 From: Keiran Price Date: Wed, 28 Feb 2024 14:26:19 +0000 Subject: [PATCH 07/10] Update containerfile --- container/file_watcher_operator.D | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/container/file_watcher_operator.D b/container/file_watcher_operator.D index 258c55f..8b7fa54 100644 --- a/container/file_watcher_operator.D +++ b/container/file_watcher_operator.D @@ -1,4 +1,4 @@ -FROM python:3.10 +FROM python:3.12-slim RUN pip install kopf RUN pip install kubernetes From 954d882661f1cde9f2a0debe0f7df6ad38bb633d Mon Sep 17 00:00:00 2001 From: Keiran Price Date: Thu, 29 Feb 2024 10:06:16 +0000 Subject: [PATCH 08/10] Update event handler --- .../file_watcher_operator.py | 21 ++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/file_watcher_operator/file_watcher_operator.py b/file_watcher_operator/file_watcher_operator.py index bdd25b2..2c0fedf 100644 --- a/file_watcher_operator/file_watcher_operator.py +++ b/file_watcher_operator/file_watcher_operator.py @@ -58,7 +58,7 @@ def generate_deployment_body( spec: containers: - name: {name}-file-watcher - image: ghcr.io/interactivereduction/filewatcher@sha256:{file_watcher_sha} + image: file-watcher:local env: - name: QUEUE_HOST value: {queue_host} @@ -252,3 +252,22 @@ def delete_func(**kwargs: Any) -> None: name = kwargs["body"]["metadata"]["name"] client = kubernetes.client.CoreV1Api() client.delete_persistent_volume(name=f"{name}-file-watcher-pv") + + +@kopf.on.update("ir.com", "v1", "filewatchers") +def update_func(spec: Any, **kwargs: Any) -> None: + """ + kopf update event handler. This automatically updates the filewatcher deployment when the CRD changes + :param spec: the spec + :param kwargs: kwargs + :return: None + """ + name = kwargs["body"]["metadata"]["name"] + + namespace = kwargs["body"]["metadata"]["namespace"] + deployment_spec, _, __ = generate_deployment_body(spec, name) + app_api = kubernetes.client.AppsV1Api() + + app_api.patch_namespaced_deployment( + name=f"{name}-file-watcher-deployment", namespace=namespace, body=deployment_spec + ) From 436d18cd06fe2671d80d29017898fd177a75ae80 Mon Sep 17 00:00:00 2001 From: Keiran Price Date: Thu, 29 Feb 2024 10:09:25 +0000 Subject: [PATCH 09/10] Rollback spec change --- file_watcher_operator/file_watcher_operator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/file_watcher_operator/file_watcher_operator.py b/file_watcher_operator/file_watcher_operator.py index 2c0fedf..edbd73d 100644 --- a/file_watcher_operator/file_watcher_operator.py +++ b/file_watcher_operator/file_watcher_operator.py @@ -58,7 +58,7 @@ def generate_deployment_body( spec: containers: - name: {name}-file-watcher - image: file-watcher:local + image: ghcr.io/interactivereduction/filewatcher@sha256:{file_watcher_sha} env: - name: QUEUE_HOST value: {queue_host} From 97ab7691af752311d82d5d1c59f05d9deaf08f44 Mon Sep 17 00:00:00 2001 From: Keiran Price Date: Mon, 4 Mar 2024 09:08:53 +0000 Subject: [PATCH 10/10] Update docstring --- file_watcher_operator/file_watcher_operator.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/file_watcher_operator/file_watcher_operator.py b/file_watcher_operator/file_watcher_operator.py index edbd73d..db32944 100644 --- a/file_watcher_operator/file_watcher_operator.py +++ b/file_watcher_operator/file_watcher_operator.py @@ -26,10 +26,10 @@ def generate_deployment_body( spec: Mapping[str, Any], name: str ) -> Tuple[MutableMapping[str, Any], MutableMapping[str, Any], MutableMapping[str, Any]]: """ - - :param spec: - :param name: - :return: + Create and return a Kubernetes deployment yaml for each deployment + :param spec: The kopf spec + :param name: The instrument name + :return: Tuple of the mutable mappings containing the deployment specs """ archive_dir = os.environ.get("ARCHIVE_DIR", "/archive") queue_host = os.environ.get("QUEUE_HOST", "rabbitmq-cluster.rabbitmq.svc.cluster.local")