Skip to content

Commit

Permalink
addressing comments
Browse files Browse the repository at this point in the history
Signed-off-by: Shuying Liang <[email protected]>
  • Loading branch information
shuyingliang committed Dec 20, 2024
1 parent e2c74a2 commit a0c5d8e
Show file tree
Hide file tree
Showing 11 changed files with 11 additions and 78 deletions.
1 change: 1 addition & 0 deletions .github/workflows/pythonbuild.yml
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ jobs:
- flytekit-huggingface
- flytekit-identity-aware-proxy
- flytekit-inference
- flytekit-k8sdataservice
- flytekit-k8s-pod
- flytekit-kf-mpi
- flytekit-kf-pytorch
Expand Down
1 change: 1 addition & 0 deletions Dockerfile.agent
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ RUN apt-get update && apt-get install build-essential -y \
RUN uv pip install --system --no-cache-dir -U flytekit==$VERSION \
flytekitplugins-airflow==$VERSION \
flytekitplugins-bigquery==$VERSION \
flytekitplugins-k8sdataservice==$VERSION \
flytekitplugins-openai==$VERSION \
flytekitplugins-snowflake==$VERSION \
flytekitplugins-awssagemaker==$VERSION \
Expand Down
18 changes: 0 additions & 18 deletions Dockerfile.k8sdataservice

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,11 @@ class DataServiceMetadata(ResourceMeta):

class DataServiceAgent(AsyncAgentBase):
name = "K8s DataService Async Agent"
# config_file_path = "/etc/config/aipflyteagent/task_logs.yaml"

def __init__(self):
self.k8s_manager = K8sManager()
super().__init__(task_type_name="dataservicetask", metadata_type=DataServiceMetadata)
self.config = None
self.kk_execution_id = None

def create(
self, task_template: TaskTemplate, output_prefix: str, inputs: Optional[LiteralMap] = None, **kwargs
Expand All @@ -39,12 +37,11 @@ def create(
name = ""
if existing_release_name is None or existing_release_name == "":
logger.info("Creating K8s data service resources...")
name = self.k8s_manager.create_data_service(self.kk_execution_id)
name = self.k8s_manager.create_data_service()
logger.info(f'Data service {name} with image {graph_engine_config["Image"]} completed')
else:
name = existing_release_name
logger.info(f"User configs to use the existing data service release name: {name}.")
logger.info(f"The existing execution ID found is: {self.kk_execution_id}.")

dataservice_config = DataServiceConfig(
Name=graph_engine_config.get("Name", None),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from flytekitplugins.k8sdataservice.k8s.kube_config import KubeConfig
from kubernetes import client
from kubernetes.client.rest import ApiException
from utils.infra import union_maps
from utils.resources import cleanup_resources, convert_flyte_to_k8s_fields

from flytekit import logger
Expand Down Expand Up @@ -75,9 +74,10 @@ def create_stateful_set_object(self):
)
],
)
self.labels.update({"app.kubernetes.io/instance": self.name})
template = client.V1PodTemplateSpec(
metadata=client.V1ObjectMeta(
labels=union_maps(self.labels, {"app.kubernetes.io/instance": self.name}),
labels=self.labels,
annotations={},
),
spec=client.V1PodSpec(
Expand Down Expand Up @@ -123,13 +123,13 @@ def create_service(self) -> str:
namespace = self.namespace
logger.info(f"creating a service at namespace {namespace} with name {self.name}")
port = self.data_service_config.get("Port", 40000)
label = union_maps(self.labels, {"app.kubernetes.io/instance": self.name, "app": APPNAME})
self.labels.update({"app.kubernetes.io/instance": self.name, "app": APPNAME})
body = client.V1Service(
api_version="v1",
kind="Service",
metadata=client.V1ObjectMeta(
name=self.name,
labels=label,
labels=self.labels,
namespace=namespace,
),
spec=client.V1ServiceSpec(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ async def poke(self, release_name: str, cleanup_data_service: bool, cluster: str
)
logger.info("DataService sensor will stop polling")
return True
# NOTE: the sensory node will be append to the end of workflow.
# NOTE: the sensory node can be appended to the end of workflow.
# So the training jobs are guaranteed to be finished, regardless of success or failure.
logger.info(f"The training job is in terminal stage, deleting graph engine {self.release_name}")
self.delete_data_service()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,3 @@ subjects:
- kind: ServiceAccount
name: flyteagent
namespace: flyte
- kind: ServiceAccount
name: aipflyteagent
namespace: flyte
2 changes: 1 addition & 1 deletion plugins/flytekit-k8sdataservice/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
author="LinkedIn",
author_email="[email protected]",
description="Flytekit K8s Data Service Plugin",
# namespace_packages=["flytekitplugins"],
namespace_packages=["flytekitplugins"],
packages=find_namespace_packages(where="."),
include_package_data=True,
install_requires=plugin_requires,
Expand Down
30 changes: 0 additions & 30 deletions plugins/flytekit-k8sdataservice/tests/k8sdataservice/test_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,11 @@
from flytekit.models.task import TaskTemplate


# @patch("flytekitplugins.k8sdataservice.agent.open", new_callable=mock_open,
# read_data="task_logs:\n templates:\n - templateUris:\n - 'https://some-log-url'\n displayName: testlogs")
# @patch("flytekitplugins.k8sdataservice.agent.yaml.safe_load")
@patch("flytekitplugins.k8sdataservice.agent.K8sManager.create_data_service", return_value="gnn-1234")
@patch("flytekitplugins.k8sdataservice.agent.K8sManager.check_stateful_set_status", return_value="succeeded")
@patch("flytekitplugins.k8sdataservice.agent.K8sManager.delete_stateful_set")
@patch("flytekitplugins.k8sdataservice.agent.K8sManager.delete_service")
def test_gnn_agent(mock_delete_service, mock_delete_stateful_set, mock_check_status, mock_create_data_service):
ctx = MagicMock(spec=grpc.ServicerContext)
# mock_load.return_value = {"task_logs": {"templates": [{"templateUris": ["https://some-log-url"], "displayName": "testlogs"}]}}
# Your test code here
agent = AgentRegistry.get_agent("dataservicetask")
task_id = Identifier(
resource_type=ResourceType.TASK, project="project", domain="domain", name="name", version="version"
Expand Down Expand Up @@ -95,17 +89,11 @@ def test_gnn_agent(mock_delete_service, mock_delete_stateful_set, mock_check_sta
mock_delete_service.assert_called_once_with("gnn-1234")


# @patch("flytekitplugins.k8sdataservice.agent.open", new_callable=mock_open,
# read_data="task_logs:\n templates:\n - templateUris:\n - 'https://some-log-url'\n displayName: testlogs")
# @patch("flytekitplugins.k8sdataservice.agent.yaml.safe_load")
@patch("flytekitplugins.k8sdataservice.agent.K8sManager.create_data_service", return_value="gnn-1234")
@patch("flytekitplugins.k8sdataservice.agent.K8sManager.check_stateful_set_status", return_value="succeeded")
@patch("flytekitplugins.k8sdataservice.agent.K8sManager.delete_stateful_set")
@patch("flytekitplugins.k8sdataservice.agent.K8sManager.delete_service")
def test_gnn_agent_reuse_data_service(mock_delete_service, mock_delete_stateful_set, mock_check_status, mock_create_data_service):
ctx = MagicMock(spec=grpc.ServicerContext)
# mock_load.return_value = {"task_logs": {"templates": [{"templateUris": ["https://some-log-url"], "displayName": "testlogs"}]}}
# Your test code here
agent = AgentRegistry.get_agent("dataservicetask")
task_id = Identifier(
resource_type=ResourceType.TASK, project="project", domain="domain", name="name", version="version"
Expand Down Expand Up @@ -176,17 +164,11 @@ def test_gnn_agent_reuse_data_service(mock_delete_service, mock_delete_stateful_
mock_delete_service.assert_called_once_with("gnn-2345")


# @patch("flytekitplugins.k8sdataservice.agent.open", new_callable=mock_open,
# read_data="task_logs:\n templates:\n - templateUris:\n - 'https://some-log-url'\n displayName: testlogs")
# @patch("flytekitplugins.k8sdataservice.agent.yaml.safe_load")
@patch("flytekitplugins.k8sdataservice.agent.K8sManager.create_data_service", return_value="gnn-1234")
@patch("flytekitplugins.k8sdataservice.agent.K8sManager.check_stateful_set_status", return_value="running")
@patch("flytekitplugins.k8sdataservice.agent.K8sManager.delete_stateful_set")
@patch("flytekitplugins.k8sdataservice.agent.K8sManager.delete_service")
def test_gnn_agent_status(mock_delete_service, mock_delete_stateful_set, mock_check_status, mock_create_data_service):
ctx = MagicMock(spec=grpc.ServicerContext)
# mock_load.return_value = {"task_logs": {"templates": [{"templateUris": ["https://some-log-url"], "displayName": "testlogs"}]}}
# Your test code here
agent = AgentRegistry.get_agent("dataservicetask")
task_id = Identifier(
resource_type=ResourceType.TASK, project="project", domain="domain", name="name", version="version"
Expand Down Expand Up @@ -256,17 +238,11 @@ def test_gnn_agent_status(mock_delete_service, mock_delete_stateful_set, mock_ch
mock_delete_service.assert_called_once_with("gnn-2345")


# @patch("flytekitplugins.k8sdataservice.agent.open", new_callable=mock_open,
# read_data="")
# @patch("flytekitplugins.k8sdataservice.agent.yaml.safe_load")
@patch("flytekitplugins.k8sdataservice.agent.K8sManager.create_data_service", return_value="gnn-1234")
@patch("flytekitplugins.k8sdataservice.agent.K8sManager.check_stateful_set_status", return_value="succeeded")
@patch("flytekitplugins.k8sdataservice.agent.K8sManager.delete_stateful_set")
@patch("flytekitplugins.k8sdataservice.agent.K8sManager.delete_service")
def test_gnn_agent_no_configmap(mock_delete_service, mock_delete_stateful_set, mock_check_status, mock_create_data_service):
ctx = MagicMock(spec=grpc.ServicerContext)
# mock_load.return_value = {}
# Your test code here
agent = AgentRegistry.get_agent("dataservicetask")
task_id = Identifier(
resource_type=ResourceType.TASK, project="project", domain="domain", name="name", version="version"
Expand Down Expand Up @@ -337,17 +313,11 @@ def test_gnn_agent_no_configmap(mock_delete_service, mock_delete_stateful_set, m
mock_delete_service.assert_called_once_with("gnn-2345")


# @patch("flytekitplugins.k8sdataservice.agent.open", new_callable=mock_open,
# read_data="")
# @patch("flytekitplugins.k8sdataservice.agent.yaml.safe_load")
@patch("flytekitplugins.k8sdataservice.agent.K8sManager.create_data_service", return_value="gnn-1234")
@patch("flytekitplugins.k8sdataservice.agent.K8sManager.check_stateful_set_status", return_value="pending")
@patch("flytekitplugins.k8sdataservice.agent.K8sManager.delete_stateful_set")
@patch("flytekitplugins.k8sdataservice.agent.K8sManager.delete_service")
def test_gnn_agent_status_failed(mock_delete_service, mock_delete_stateful_set, mock_check_status, mock_create_data_service):
ctx = MagicMock(spec=grpc.ServicerContext)
# mock_load.return_value = {}
# Your test code here
agent = AgentRegistry.get_agent("dataservicetask")
task_id = Identifier(
resource_type=ResourceType.TASK, project="project", domain="domain", name="name", version="version"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@


@pytest.mark.asyncio
@patch("flytekitplugins.k8sdataservice.sensor.get_execution_namespace")
@patch("flytekitplugins.k8sdataservice.sensor.KubeConfig")
@patch("flytekitplugins.k8sdataservice.sensor.client.AppsV1Api")
@patch("flytekitplugins.k8sdataservice.sensor.client.CoreV1Api")
Expand All @@ -19,10 +18,8 @@ async def test_poke_no_cleanup(
mock_custom_api,
mock_core_v1_api,
mock_apps_v1_api,
mock_kube_config,
mock_get_namespace
mock_kube_config
):
mock_get_namespace.return_value = "test-namespace"
mock_kube_config.return_value = MagicMock()
sensor = CleanupSensor(name="test-sensor")
await asyncio.create_task(
Expand All @@ -36,13 +33,12 @@ async def test_poke_no_cleanup(
assert isinstance(sensor.custom_api, MagicMock)
assert sensor.release_name == "test-release"
assert sensor.cleanup_data_service is False
assert sensor.namespace == "test-namespace"
assert sensor.namespace == "flyte"
assert sensor.cluster == "test-cluster"


@pytest.mark.asyncio
@patch("flytekitplugins.k8sdataservice.sensor.CleanupSensor.delete_data_service")
@patch("flytekitplugins.k8sdataservice.sensor.get_execution_namespace")
@patch("flytekitplugins.k8sdataservice.sensor.KubeConfig")
@patch("flytekitplugins.k8sdataservice.sensor.client.AppsV1Api")
@patch("flytekitplugins.k8sdataservice.sensor.client.CoreV1Api")
Expand All @@ -54,11 +50,8 @@ async def test_poke_with_cleanup(
mock_core_v1_api,
mock_apps_v1_api,
mock_kube_config,
mock_get_namespace,
mock_delete_data_service
):
# Configure mocks
mock_get_namespace.return_value = "test-namespace"
mock_kube_config.return_value = MagicMock()

# Initialize CleanupSensor instance
Expand Down
8 changes: 0 additions & 8 deletions plugins/flytekit-k8sdataservice/utils/constants.py

This file was deleted.

0 comments on commit a0c5d8e

Please sign in to comment.