Skip to content

Commit

Permalink
Merge pull request #989 from Yelp/jfong/TRON-2195-old-kubeconfig-paths
Browse files Browse the repository at this point in the history
TRON-2195: Support watcher_kubeconfig_paths
  • Loading branch information
jfongatyelp authored Jul 11, 2024
2 parents 9e978e3 + 1d0b396 commit 95ad61a
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 3 deletions.
2 changes: 1 addition & 1 deletion requirements-minimal.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pytimeparse
pytz
PyYAML>=5.1
requests
task_processing[mesos_executor,k8s]>=1.1.0
task_processing[mesos_executor,k8s]>=1.2.0
Twisted>=19.7.0
urllib3>=1.24.2
Werkzeug>=0.15.3
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ setuptools==65.5.1
six==1.15.0
sshpubkeys==3.1.0
stack-data==0.6.2
task-processing==1.1.0
task-processing==1.2.0
traitlets==5.0.0
Twisted==22.10.0
typing-extensions==4.5.0
Expand Down
35 changes: 35 additions & 0 deletions tests/config/config_parse_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1744,5 +1744,40 @@ def test_invalid(self, url, context):
config_parse.valid_master_address(url, context)


class TestValidKubeconfigPaths:
@setup
def setup_context(self):
self.context = config_utils.NullConfigContext

@pytest.mark.parametrize(
"kubeconfig_path,watcher_kubeconfig_paths",
[("/some/kubeconfig.conf", []), ("/another/kube/config", ["a_watcher_kubeconfig"])],
)
def test_valid(self, kubeconfig_path, watcher_kubeconfig_paths):
k8s_options = {
"enabled": True,
"kubeconfig_path": kubeconfig_path,
"watcher_kubeconfig_paths": watcher_kubeconfig_paths,
}
assert config_parse.valid_kubernetes_options.validate(k8s_options, self.context)

@pytest.mark.parametrize(
"kubeconfig_path,watcher_kubeconfig_paths",
[
(["/a/kubeconfig/in/a/list"], ["/a/valid/kubeconfig"]),
(None, []),
("/some/kubeconfig.conf", "/not/a/list/kubeconfig"),
],
)
def test_invalid(self, kubeconfig_path, watcher_kubeconfig_paths):
k8s_options = {
"enabled": True,
"kubeconfig_path": kubeconfig_path,
"watcher_kubeconfig_paths": watcher_kubeconfig_paths,
}
with pytest.raises(ConfigError):
config_parse.valid_kubernetes_options.validate(k8s_options, self.context)


if __name__ == "__main__":
run()
1 change: 1 addition & 0 deletions tron/config/config_parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -874,6 +874,7 @@ class ValidateKubernetes(Validator):
"kubeconfig_path": valid_string,
"enabled": valid_bool,
"default_volumes": build_list_of_type_validator(valid_volume, allow_empty=True),
"watcher_kubeconfig_paths": build_list_of_type_validator(valid_string, allow_empty=True),
}


Expand Down
1 change: 1 addition & 0 deletions tron/config/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ def config_object_factory(name, required=None, optional=None):
"kubeconfig_path",
"enabled",
"default_volumes",
"watcher_kubeconfig_paths",
],
)

Expand Down
10 changes: 9 additions & 1 deletion tron/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,12 +280,14 @@ def __init__(
enabled: bool = True,
default_volumes: Optional[List[ConfigVolume]] = None,
pod_launch_timeout: Optional[int] = None,
watcher_kubeconfig_paths: Optional[List[str]] = None,
):
# general k8s config
self.kubeconfig_path = kubeconfig_path
self.enabled = enabled
self.default_volumes: Optional[List[ConfigVolume]] = default_volumes or []
self.pod_launch_timeout = pod_launch_timeout or DEFAULT_POD_LAUNCH_TIMEOUT_S
self.watcher_kubeconfig_paths = watcher_kubeconfig_paths or []
# creating a task_proc executor has a couple steps:
# * create a TaskProcessor
# * load the desired plugin (in this case, the k8s one)
Expand Down Expand Up @@ -340,6 +342,7 @@ def get_runner(self, kubeconfig_path: str, queue: PyDeferredQueue) -> Optional[S
"namespace": "tron",
"version": __version__,
"kubeconfig_path": self.kubeconfig_path,
"watcher_kubeconfig_paths": self.watcher_kubeconfig_paths,
"task_configs": [task.get_config() for task in self.tasks.values()],
},
)
Expand Down Expand Up @@ -621,6 +624,7 @@ class KubernetesClusterRepository:
kubeconfig_path: Optional[str] = None
pod_launch_timeout: Optional[int] = None
default_volumes: Optional[List[ConfigVolume]] = None
watcher_kubeconfig_paths: Optional[List[str]] = None

# metadata config
clusters: Dict[str, KubernetesCluster] = {}
Expand All @@ -643,7 +647,10 @@ def get_cluster(cls, kubeconfig_path: Optional[str] = None) -> Optional[Kubernet
if kubeconfig_path not in cls.clusters:
# will create the task_proc executor
cluster = KubernetesCluster(
kubeconfig_path=kubeconfig_path, enabled=cls.kubernetes_enabled, default_volumes=cls.default_volumes
kubeconfig_path=kubeconfig_path,
enabled=cls.kubernetes_enabled,
default_volumes=cls.default_volumes,
watcher_kubeconfig_paths=cls.watcher_kubeconfig_paths,
)
cls.clusters[kubeconfig_path] = cluster

Expand All @@ -659,6 +666,7 @@ def configure(cls, kubernetes_options: ConfigKubernetes) -> None:
cls.kubeconfig_path = kubernetes_options.kubeconfig_path
cls.kubernetes_enabled = kubernetes_options.enabled
cls.default_volumes = kubernetes_options.default_volumes
cls.watcher_kubeconfig_paths = kubernetes_options.watcher_kubeconfig_paths

for cluster in cls.clusters.values():
cluster.set_enabled(cls.kubernetes_enabled)
Expand Down

0 comments on commit 95ad61a

Please sign in to comment.