diff --git a/requirements-minimal.txt b/requirements-minimal.txt index f4711d882..11684cc6a 100644 --- a/requirements-minimal.txt +++ b/requirements-minimal.txt @@ -21,7 +21,7 @@ pytimeparse pytz PyYAML>=5.1 requests -task_processing[mesos_executor,k8s]>=1.1.0 +task_processing[mesos_executor,k8s]>=1.2.0 Twisted>=19.7.0 urllib3>=1.24.2 Werkzeug>=0.15.3 diff --git a/requirements.txt b/requirements.txt index 5e0bcfc70..b070dfe99 100644 --- a/requirements.txt +++ b/requirements.txt @@ -80,7 +80,7 @@ setuptools==65.5.1 six==1.15.0 sshpubkeys==3.1.0 stack-data==0.6.2 -task-processing==1.1.0 +task-processing==1.2.0 traitlets==5.0.0 Twisted==22.10.0 typing-extensions==4.5.0 diff --git a/tests/config/config_parse_test.py b/tests/config/config_parse_test.py index 1de3986c9..03fc69122 100644 --- a/tests/config/config_parse_test.py +++ b/tests/config/config_parse_test.py @@ -1744,5 +1744,40 @@ def test_invalid(self, url, context): config_parse.valid_master_address(url, context) +class TestValidKubeconfigPaths: + @setup + def setup_context(self): + self.context = config_utils.NullConfigContext + + @pytest.mark.parametrize( + "kubeconfig_path,watcher_kubeconfig_paths", + [("/some/kubeconfig.conf", []), ("/another/kube/config", ["a_watcher_kubeconfig"])], + ) + def test_valid(self, kubeconfig_path, watcher_kubeconfig_paths): + k8s_options = { + "enabled": True, + "kubeconfig_path": kubeconfig_path, + "watcher_kubeconfig_paths": watcher_kubeconfig_paths, + } + assert config_parse.valid_kubernetes_options.validate(k8s_options, self.context) + + @pytest.mark.parametrize( + "kubeconfig_path,watcher_kubeconfig_paths", + [ + (["/a/kubeconfig/in/a/list"], ["/a/valid/kubeconfig"]), + (None, []), + ("/some/kubeconfig.conf", "/not/a/list/kubeconfig"), + ], + ) + def test_invalid(self, kubeconfig_path, watcher_kubeconfig_paths): + k8s_options = { + "enabled": True, + "kubeconfig_path": kubeconfig_path, + "watcher_kubeconfig_paths": watcher_kubeconfig_paths, + } + with pytest.raises(ConfigError): + config_parse.valid_kubernetes_options.validate(k8s_options, self.context) + + if __name__ == "__main__": run() diff --git a/tron/config/config_parse.py b/tron/config/config_parse.py index 6dd52cf66..491db7e9c 100644 --- a/tron/config/config_parse.py +++ b/tron/config/config_parse.py @@ -874,6 +874,7 @@ class ValidateKubernetes(Validator): "kubeconfig_path": valid_string, "enabled": valid_bool, "default_volumes": build_list_of_type_validator(valid_volume, allow_empty=True), + "watcher_kubeconfig_paths": build_list_of_type_validator(valid_string, allow_empty=True), } diff --git a/tron/config/schema.py b/tron/config/schema.py index ec0df64b5..5f6d36229 100644 --- a/tron/config/schema.py +++ b/tron/config/schema.py @@ -120,6 +120,7 @@ def config_object_factory(name, required=None, optional=None): "kubeconfig_path", "enabled", "default_volumes", + "watcher_kubeconfig_paths", ], ) diff --git a/tron/kubernetes.py b/tron/kubernetes.py index 4c06f31bb..960ec4e7f 100644 --- a/tron/kubernetes.py +++ b/tron/kubernetes.py @@ -280,12 +280,14 @@ def __init__( enabled: bool = True, default_volumes: Optional[List[ConfigVolume]] = None, pod_launch_timeout: Optional[int] = None, + watcher_kubeconfig_paths: Optional[List[str]] = None, ): # general k8s config self.kubeconfig_path = kubeconfig_path self.enabled = enabled self.default_volumes: Optional[List[ConfigVolume]] = default_volumes or [] self.pod_launch_timeout = pod_launch_timeout or DEFAULT_POD_LAUNCH_TIMEOUT_S + self.watcher_kubeconfig_paths = watcher_kubeconfig_paths or [] # creating a task_proc executor has a couple steps: # * create a TaskProcessor # * load the desired plugin (in this case, the k8s one) @@ -340,6 +342,7 @@ def get_runner(self, kubeconfig_path: str, queue: PyDeferredQueue) -> Optional[S "namespace": "tron", "version": __version__, "kubeconfig_path": self.kubeconfig_path, + "watcher_kubeconfig_paths": self.watcher_kubeconfig_paths, "task_configs": [task.get_config() for task in self.tasks.values()], }, ) @@ -621,6 +624,7 @@ class KubernetesClusterRepository: kubeconfig_path: Optional[str] = None pod_launch_timeout: Optional[int] = None default_volumes: Optional[List[ConfigVolume]] = None + watcher_kubeconfig_paths: Optional[List[str]] = None # metadata config clusters: Dict[str, KubernetesCluster] = {} @@ -643,7 +647,10 @@ def get_cluster(cls, kubeconfig_path: Optional[str] = None) -> Optional[Kubernet if kubeconfig_path not in cls.clusters: # will create the task_proc executor cluster = KubernetesCluster( - kubeconfig_path=kubeconfig_path, enabled=cls.kubernetes_enabled, default_volumes=cls.default_volumes + kubeconfig_path=kubeconfig_path, + enabled=cls.kubernetes_enabled, + default_volumes=cls.default_volumes, + watcher_kubeconfig_paths=cls.watcher_kubeconfig_paths, ) cls.clusters[kubeconfig_path] = cluster @@ -659,6 +666,7 @@ def configure(cls, kubernetes_options: ConfigKubernetes) -> None: cls.kubeconfig_path = kubernetes_options.kubeconfig_path cls.kubernetes_enabled = kubernetes_options.enabled cls.default_volumes = kubernetes_options.default_volumes + cls.watcher_kubeconfig_paths = kubernetes_options.watcher_kubeconfig_paths for cluster in cls.clusters.values(): cluster.set_enabled(cls.kubernetes_enabled)