diff --git a/.github/workflows/pre-commit.yml b/.github/workflows/pre-commit.yml index deeea5e0d..d31fe25cd 100644 --- a/.github/workflows/pre-commit.yml +++ b/.github/workflows/pre-commit.yml @@ -49,7 +49,7 @@ jobs: matrix: k8s: [ - v1.31.0, + v1.31.1, v1.30.4, v1.29.8, v1.28.13, diff --git a/Makefile b/Makefile index 11eb865e1..127ed9bff 100644 --- a/Makefile +++ b/Makefile @@ -70,7 +70,7 @@ export PATH := $(BASE_DIR)/$(TOOLS_DIR):$(GO_EXE_PATH):$(PATH) # Default values for dev cluster ifeq ($(K8S_VERSION),) -K8S_VERSION := v1.29.4 +K8S_VERSION := v1.31.1 endif ifeq ($(CLUSTER_NAME),) CLUSTER_NAME := yk8s @@ -705,14 +705,23 @@ arch: @echo EXEC_ARCH=$(EXEC_ARCH) # Start dev cluster +.PHONY: start-cluster start-cluster: $(KIND_BIN) @"$(KIND_BIN)" delete cluster --name="$(CLUSTER_NAME)" || : @./scripts/run-e2e-tests.sh -a install -n "$(CLUSTER_NAME)" -v "kindest/node:$(K8S_VERSION)" $(PLUGIN_OPTS) # Stop dev cluster +.PHONY: stop-cluster stop-cluster: $(KIND_BIN) @"$(KIND_BIN)" delete cluster --name="$(CLUSTER_NAME)" +# Start dev cluster, run e2e tests, stop dev cluster +.PHONY: kind-e2e +kind-e2e: $(KIND_BIN) + @"$(KIND_BIN)" delete cluster --name="$(CLUSTER_NAME)" || : ; \ + ./scripts/run-e2e-tests.sh -a test -n "$(CLUSTER_NAME)" -v "kindest/node:$(K8S_VERSION)" $(PLUGIN_OPTS) ; STATUS=$$? ; \ + "$(KIND_BIN)" delete cluster --name="$(CLUSTER_NAME)" || : ; exit $$STATUS + # Run the e2e tests, this assumes yunikorn is running under yunikorn namespace .PHONY: e2e_test e2e_test: tools diff --git a/deployments/grafana-dashboard/assets/yunikorn-metrics.png b/deployments/grafana-dashboard/assets/yunikorn-metrics.png index b097ee376..9b3c3a27e 100644 Binary files a/deployments/grafana-dashboard/assets/yunikorn-metrics.png and b/deployments/grafana-dashboard/assets/yunikorn-metrics.png differ diff --git a/deployments/grafana-dashboard/yunikorn-metrics.json b/deployments/grafana-dashboard/yunikorn-metrics.json index 03d32f5a4..a5ee4c2d1 100644 --- a/deployments/grafana-dashboard/yunikorn-metrics.json +++ b/deployments/grafana-dashboard/yunikorn-metrics.json @@ -1,4 +1,35 @@ { + "__inputs": [ + { + "name": "DS_PROMETHEUS", + "label": "prometheus", + "description": "", + "type": "datasource", + "pluginId": "prometheus", + "pluginName": "Prometheus" + } + ], + "__elements": {}, + "__requires": [ + { + "type": "grafana", + "id": "grafana", + "name": "Grafana", + "version": "11.1.0" + }, + { + "type": "datasource", + "id": "prometheus", + "name": "Prometheus", + "version": "1.0.0" + }, + { + "type": "panel", + "id": "timeseries", + "name": "Time series", + "version": "" + } + ], "annotations": { "list": [ { @@ -18,7 +49,7 @@ "editable": true, "fiscalYearStartMonth": 0, "graphTooltip": 0, - "id": 33, + "id": null, "links": [], "liveNow": false, "panels": [ @@ -38,7 +69,7 @@ { "datasource": { "type": "prometheus", - "uid": "prometheus" + "uid": "${DS_PROMETHEUS}" }, "fieldConfig": { "defaults": { @@ -46,36 +77,77 @@ "mode": "palette-classic" }, "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 30, + "gradientMode": "opacity", "hideFrom": { "legend": false, "tooltip": false, "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineStyle": { + "fill": "solid" + }, + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" } }, - "mappings": [] + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "apps" }, "overrides": [] }, "gridPos": { "h": 8, - "w": 6, + "w": 12, "x": 0, "y": 1 }, "id": 17, "options": { "legend": { - "displayMode": "list", - "placement": "bottom", - "showLegend": true - }, - "pieType": "pie", - "reduceOptions": { "calcs": [ - "lastNotNull" + "count", + "lastNotNull", + "max" ], - "fields": "", - "values": false + "displayMode": "table", + "placement": "right", + "showLegend": true, + "sortBy": "Count", + "sortDesc": true }, "tooltip": { "mode": "single", @@ -86,7 +158,7 @@ { "datasource": { "type": "prometheus", - "uid": "prometheus" + "uid": "${DS_PROMETHEUS}" }, "editorMode": "code", "expr": "yunikorn_scheduler_application_total", @@ -96,12 +168,12 @@ } ], "title": "Total Applications", - "type": "piechart" + "type": "timeseries" }, { "datasource": { "type": "prometheus", - "uid": "prometheus" + "uid": "${DS_PROMETHEUS}" }, "fieldConfig": { "defaults": { @@ -109,6 +181,7 @@ "mode": "palette-classic" }, "custom": { + "axisBorderShow": false, "axisCenteredZero": false, "axisColorMode": "text", "axisLabel": "", @@ -122,6 +195,7 @@ "tooltip": false, "viz": false }, + "insertNulls": false, "lineInterpolation": "linear", "lineWidth": 1, "pointSize": 5, @@ -151,22 +225,27 @@ "value": 80 } ] - } + }, + "unit": "apps" }, "overrides": [] }, "gridPos": { "h": 8, - "w": 10, - "x": 6, + "w": 12, + "x": 12, "y": 1 }, "id": 7, "options": { "legend": { - "calcs": [], - "displayMode": "list", - "placement": "bottom", + "calcs": [ + "count", + "lastNotNull", + "max" + ], + "displayMode": "table", + "placement": "right", "showLegend": true }, "tooltip": { @@ -178,7 +257,7 @@ { "datasource": { "type": "prometheus", - "uid": "prometheus" + "uid": "${DS_PROMETHEUS}" }, "editorMode": "code", "expr": "yunikorn_scheduler_application_submission_total", @@ -193,7 +272,7 @@ { "datasource": { "type": "prometheus", - "uid": "prometheus" + "uid": "${DS_PROMETHEUS}" }, "fieldConfig": { "defaults": { @@ -201,6 +280,7 @@ "mode": "palette-classic" }, "custom": { + "axisBorderShow": false, "axisCenteredZero": false, "axisColorMode": "text", "axisLabel": "", @@ -214,6 +294,7 @@ "tooltip": false, "viz": false }, + "insertNulls": false, "lineInterpolation": "linear", "lineWidth": 1, "pointSize": 5, @@ -243,22 +324,27 @@ "value": 80 } ] - } + }, + "unit": "pods" }, "overrides": [] }, "gridPos": { - "h": 8, - "w": 8, - "x": 16, - "y": 1 + "h": 7, + "w": 12, + "x": 0, + "y": 9 }, "id": 8, "options": { "legend": { - "calcs": [], - "displayMode": "list", - "placement": "bottom", + "calcs": [ + "count", + "lastNotNull", + "max" + ], + "displayMode": "table", + "placement": "right", "showLegend": true }, "tooltip": { @@ -270,7 +356,7 @@ { "datasource": { "type": "prometheus", - "uid": "prometheus" + "uid": "${DS_PROMETHEUS}" }, "editorMode": "code", "expr": "yunikorn_scheduler_container_allocation_attempt_total", @@ -285,7 +371,7 @@ { "datasource": { "type": "prometheus", - "uid": "prometheus" + "uid": "${DS_PROMETHEUS}" }, "fieldConfig": { "defaults": { @@ -293,26 +379,28 @@ "mode": "palette-classic" }, "custom": { + "axisBorderShow": false, "axisCenteredZero": false, "axisColorMode": "text", "axisLabel": "", "axisPlacement": "auto", "barAlignment": 0, "drawStyle": "line", - "fillOpacity": 0, - "gradientMode": "none", + "fillOpacity": 30, + "gradientMode": "opacity", "hideFrom": { "legend": false, "tooltip": false, "viz": false }, + "insertNulls": false, "lineInterpolation": "linear", "lineWidth": 1, "pointSize": 5, "scaleDistribution": { "type": "linear" }, - "showPoints": "auto", + "showPoints": "never", "spanNulls": false, "stacking": { "group": "A", @@ -335,22 +423,27 @@ "value": 80 } ] - } + }, + "unit": "reqps" }, "overrides": [] }, "gridPos": { - "h": 5, - "w": 8, - "x": 0, + "h": 7, + "w": 12, + "x": 12, "y": 9 }, "id": 18, "options": { "legend": { - "calcs": [], - "displayMode": "list", - "placement": "bottom", + "calcs": [ + "max", + "mean", + "min" + ], + "displayMode": "table", + "placement": "right", "showLegend": true }, "tooltip": { @@ -362,22 +455,61 @@ { "datasource": { "type": "prometheus", - "uid": "prometheus" + "uid": "${DS_PROMETHEUS}" }, "editorMode": "code", "expr": "histogram_quantile(0.95, sum(irate(yunikorn_scheduler_scheduling_latency_milliseconds_bucket[60s])) by (le))", - "legendFormat": "milliseconds", + "legendFormat": "Scheduling latency", "range": true, "refId": "A" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "code", + "expr": "histogram_quantile(0.95, sum(irate(yunikorn_scheduler_trynode_latency_milliseconds_bucket[60s])) by (le))", + "hide": false, + "instant": false, + "legendFormat": "Try Node latency", + "range": true, + "refId": "B" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "code", + "expr": "histogram_quantile(0.95, sum by(le) (yunikorn_scheduler_trypreemption_latency_milliseconds_bucket))", + "hide": false, + "instant": false, + "legendFormat": "Try Preemption latency", + "range": true, + "refId": "C" } ], "title": "P95 Scheduling latency", "type": "timeseries" }, + { + "collapsed": false, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 16 + }, + "id": 22, + "panels": [], + "title": "Resource Usage", + "type": "row" + }, { "datasource": { "type": "prometheus", - "uid": "prometheus" + "uid": "${DS_PROMETHEUS}" }, "fieldConfig": { "defaults": { @@ -385,26 +517,28 @@ "mode": "palette-classic" }, "custom": { + "axisBorderShow": false, "axisCenteredZero": false, "axisColorMode": "text", "axisLabel": "", "axisPlacement": "auto", "barAlignment": 0, "drawStyle": "line", - "fillOpacity": 0, - "gradientMode": "none", + "fillOpacity": 30, + "gradientMode": "opacity", "hideFrom": { "legend": false, "tooltip": false, "viz": false }, + "insertNulls": false, "lineInterpolation": "linear", "lineWidth": 1, "pointSize": 5, "scaleDistribution": { "type": "linear" }, - "showPoints": "auto", + "showPoints": "never", "spanNulls": false, "stacking": { "group": "A", @@ -427,22 +561,25 @@ "value": 80 } ] - } + }, + "unit": "nodes" }, "overrides": [] }, "gridPos": { - "h": 5, - "w": 8, - "x": 8, - "y": 9 + "h": 9, + "w": 12, + "x": 0, + "y": 17 }, - "id": 19, + "id": 15, "options": { "legend": { - "calcs": [], - "displayMode": "list", - "placement": "bottom", + "calcs": [ + "lastNotNull" + ], + "displayMode": "table", + "placement": "right", "showLegend": true }, "tooltip": { @@ -450,26 +587,27 @@ "sort": "none" } }, + "pluginVersion": "11.1.0", "targets": [ { "datasource": { "type": "prometheus", - "uid": "prometheus" + "uid": "${DS_PROMETHEUS}" }, "editorMode": "code", - "expr": "histogram_quantile(0.95, sum(irate(yunikorn_scheduler_trynode_latency_milliseconds_bucket[60s])) by (le))", - "legendFormat": "milliseconds", + "expr": "yunikorn_scheduler_vcore_node_usage_total", + "legendFormat": "{{range}}", "range": true, "refId": "A" } ], - "title": "P95 Try Node latency", + "title": "vcore usage of nodes", "type": "timeseries" }, { "datasource": { "type": "prometheus", - "uid": "prometheus" + "uid": "${DS_PROMETHEUS}" }, "fieldConfig": { "defaults": { @@ -477,26 +615,28 @@ "mode": "palette-classic" }, "custom": { + "axisBorderShow": false, "axisCenteredZero": false, "axisColorMode": "text", "axisLabel": "", "axisPlacement": "auto", "barAlignment": 0, "drawStyle": "line", - "fillOpacity": 0, - "gradientMode": "none", + "fillOpacity": 30, + "gradientMode": "opacity", "hideFrom": { "legend": false, "tooltip": false, "viz": false }, + "insertNulls": false, "lineInterpolation": "linear", "lineWidth": 1, "pointSize": 5, "scaleDistribution": { "type": "linear" }, - "showPoints": "auto", + "showPoints": "never", "spanNulls": false, "stacking": { "group": "A", @@ -519,22 +659,25 @@ "value": 80 } ] - } + }, + "unit": "nodes" }, "overrides": [] }, "gridPos": { - "h": 5, - "w": 8, - "x": 16, - "y": 9 + "h": 9, + "w": 12, + "x": 12, + "y": 17 }, - "id": 20, + "id": 14, "options": { "legend": { - "calcs": [], - "displayMode": "list", - "placement": "bottom", + "calcs": [ + "lastNotNull" + ], + "displayMode": "table", + "placement": "right", "showLegend": true }, "tooltip": { @@ -546,22 +689,22 @@ { "datasource": { "type": "prometheus", - "uid": "prometheus" + "uid": "${DS_PROMETHEUS}" }, "editorMode": "code", - "expr": "histogram_quantile(0.95, sum(irate(yunikorn_scheduler_trypreemption_latency_milliseconds_bucket[30s])) by (le))", - "legendFormat": "milliseconds", + "expr": "yunikorn_scheduler_memory_node_usage_total", + "legendFormat": "{{range}}", "range": true, "refId": "A" } ], - "title": "P95 Try Preemption latency", + "title": "memory usage of nodes", "type": "timeseries" }, { "datasource": { "type": "prometheus", - "uid": "prometheus" + "uid": "${DS_PROMETHEUS}" }, "fieldConfig": { "defaults": { @@ -569,62 +712,97 @@ "mode": "palette-classic" }, "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 30, + "gradientMode": "opacity", "hideFrom": { "legend": false, "tooltip": false, "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" } }, - "mappings": [] + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "nodes" }, "overrides": [] }, "gridPos": { - "h": 7, - "w": 5, + "h": 9, + "w": 12, "x": 0, - "y": 14 + "y": 26 }, - "id": 15, + "id": 16, "options": { "legend": { - "displayMode": "list", - "placement": "bottom", - "showLegend": true - }, - "pieType": "pie", - "reduceOptions": { "calcs": [ "lastNotNull" ], - "fields": "", - "values": false + "displayMode": "table", + "placement": "right", + "showLegend": true }, "tooltip": { "mode": "single", "sort": "none" } }, + "pluginVersion": "11.1.0", "targets": [ { "datasource": { "type": "prometheus", - "uid": "prometheus" + "uid": "${DS_PROMETHEUS}" }, "editorMode": "code", - "expr": "yunikorn_scheduler_vcore_node_usage_total", + "expr": "yunikorn_scheduler_pods_node_usage_total", "legendFormat": "{{range}}", "range": true, "refId": "A" } ], - "title": "vcore_node_usage_total", - "type": "piechart" + "title": "pods number ratio of nodes", + "type": "timeseries" }, { "datasource": { "type": "prometheus", - "uid": "prometheus" + "uid": "${DS_PROMETHEUS}" }, "fieldConfig": { "defaults": { @@ -632,36 +810,70 @@ "mode": "palette-classic" }, "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 30, + "gradientMode": "opacity", "hideFrom": { "legend": false, "tooltip": false, "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" } }, - "mappings": [] + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "nodes" }, "overrides": [] }, "gridPos": { - "h": 7, - "w": 6, - "x": 5, - "y": 14 + "h": 9, + "w": 12, + "x": 12, + "y": 26 }, "id": 10, "options": { "legend": { - "displayMode": "list", - "placement": "bottom", - "showLegend": true - }, - "pieType": "pie", - "reduceOptions": { "calcs": [ "lastNotNull" ], - "fields": "", - "values": false + "displayMode": "table", + "placement": "right", + "showLegend": true }, "tooltip": { "mode": "single", @@ -672,7 +884,7 @@ { "datasource": { "type": "prometheus", - "uid": "prometheus" + "uid": "${DS_PROMETHEUS}" }, "editorMode": "code", "expr": "yunikorn_scheduler_ephemeral_storage_node_usage_total", @@ -681,13 +893,28 @@ "refId": "A" } ], - "title": "ephemeral_storage_node_usage_total", - "type": "piechart" + "title": "ephemeral storage usage of nodes", + "type": "timeseries" + }, + { + "collapsed": false, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 35 + }, + "id": 11, + "panels": [], + "repeat": "Queue", + "repeatDirection": "h", + "title": "Queue", + "type": "row" }, { "datasource": { "type": "prometheus", - "uid": "prometheus" + "uid": "${DS_PROMETHEUS}" }, "fieldConfig": { "defaults": { @@ -695,36 +922,71 @@ "mode": "palette-classic" }, "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 30, + "gradientMode": "opacity", "hideFrom": { "legend": false, "tooltip": false, "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" } }, - "mappings": [] - }, - "overrides": [] - }, - "gridPos": { - "h": 7, - "w": 6, - "x": 11, - "y": 14 - }, - "id": 14, - "options": { - "legend": { - "displayMode": "list", - "placement": "bottom", - "showLegend": true + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "apps" }, - "pieType": "pie", - "reduceOptions": { + "overrides": [] + }, + "gridPos": { + "h": 7, + "w": 12, + "x": 0, + "y": 36 + }, + "id": 21, + "options": { + "legend": { "calcs": [ - "lastNotNull" + "lastNotNull", + "max" ], - "fields": "", - "values": false + "displayMode": "table", + "placement": "right", + "showLegend": true }, "tooltip": { "mode": "single", @@ -735,22 +997,22 @@ { "datasource": { "type": "prometheus", - "uid": "prometheus" + "uid": "${DS_PROMETHEUS}" }, "editorMode": "code", - "expr": "yunikorn_scheduler_memory_node_usage_total", - "legendFormat": "{{range}}", + "expr": "{__name__=~\"yunikorn_root_${Queue}_queue_app\"}", + "legendFormat": "{{state}}", "range": true, "refId": "A" } ], - "title": "memory_node_usage_total", - "type": "piechart" + "title": "Application Status", + "type": "timeseries" }, { "datasource": { "type": "prometheus", - "uid": "prometheus" + "uid": "${DS_PROMETHEUS}" }, "fieldConfig": { "defaults": { @@ -758,36 +1020,71 @@ "mode": "palette-classic" }, "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 30, + "gradientMode": "opacity", "hideFrom": { "legend": false, "tooltip": false, "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" } }, - "mappings": [] + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "pods" }, "overrides": [] }, "gridPos": { "h": 7, - "w": 7, - "x": 17, - "y": 14 + "w": 12, + "x": 12, + "y": 36 }, - "id": 16, + "id": 6, "options": { "legend": { - "displayMode": "list", - "placement": "bottom", - "showLegend": true - }, - "pieType": "pie", - "reduceOptions": { "calcs": [ - "lastNotNull" + "lastNotNull", + "max" ], - "fields": "", - "values": false + "displayMode": "table", + "placement": "right", + "showLegend": true }, "tooltip": { "mode": "single", @@ -798,543 +1095,240 @@ { "datasource": { "type": "prometheus", - "uid": "prometheus" + "uid": "${DS_PROMETHEUS}" }, "editorMode": "code", - "expr": "yunikorn_scheduler_pods_node_usage_total", - "legendFormat": "{{range}}", + "expr": "{__name__=~\"yunikorn_root_${Queue}_queue_resource\", resource=\"pods\"}", + "legendFormat": "{{state}}", "range": true, "refId": "A" } ], - "title": "pods_node_usage_total", - "type": "piechart" + "title": "Pods", + "type": "timeseries" }, { - "collapsed": true, - "gridPos": { - "h": 1, - "w": 24, - "x": 0, - "y": 21 + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" }, - "id": 11, - "panels": [ - { - "datasource": { - "type": "prometheus", - "uid": "prometheus" + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" }, - "fieldConfig": { - "defaults": { - "color": { - "mode": "palette-classic" - }, - "custom": { - "hideFrom": { - "legend": false, - "tooltip": false, - "viz": false - } - }, - "mappings": [] + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 30, + "gradientMode": "opacity", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false }, - "overrides": [] - }, - "gridPos": { - "h": 6, - "w": 4, - "x": 0, - "y": 22 - }, - "id": 21, - "options": { - "legend": { - "displayMode": "list", - "placement": "bottom", - "showLegend": true + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" }, - "pieType": "pie", - "reduceOptions": { - "calcs": [ - "lastNotNull" - ], - "fields": "", - "values": false + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" }, - "tooltip": { - "mode": "single", - "sort": "none" - } - }, - "targets": [ - { - "datasource": { - "type": "prometheus", - "uid": "prometheus" - }, - "editorMode": "code", - "expr": "{__name__=~\"yunikorn_root_${Queue}_queue_app\"}", - "legendFormat": "{{state}}", - "range": true, - "refId": "A" + "thresholdsStyle": { + "mode": "off" } - ], - "title": "Application Status", - "type": "piechart" - }, - { - "datasource": { - "type": "prometheus", - "uid": "prometheus" }, - "fieldConfig": { - "defaults": { - "color": { - "mode": "palette-classic" - }, - "custom": { - "axisCenteredZero": false, - "axisColorMode": "text", - "axisLabel": "", - "axisPlacement": "auto", - "barAlignment": 0, - "drawStyle": "line", - "fillOpacity": 35, - "gradientMode": "opacity", - "hideFrom": { - "legend": false, - "tooltip": false, - "viz": false - }, - "lineInterpolation": "linear", - "lineStyle": { - "fill": "solid" - }, - "lineWidth": 1, - "pointSize": 8, - "scaleDistribution": { - "type": "linear" - }, - "showPoints": "auto", - "spanNulls": false, - "stacking": { - "group": "A", - "mode": "none" - }, - "thresholdsStyle": { - "mode": "off" - } + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null }, - "mappings": [], - "thresholds": { - "mode": "absolute", - "steps": [ - { - "color": "green" - }, - { - "color": "red", - "value": 80 - } - ] + { + "color": "red", + "value": 80 } - }, - "overrides": [] - }, - "gridPos": { - "h": 6, - "w": 8, - "x": 4, - "y": 22 + ] }, - "id": 2, - "options": { - "legend": { - "calcs": [], - "displayMode": "list", - "placement": "bottom", - "showLegend": true + "unit": " m" + }, + "overrides": [ + { + "matcher": { + "id": "byName", + "options": "allocated" }, - "tooltip": { - "mode": "single", - "sort": "none" - } - }, - "targets": [ - { - "datasource": { - "type": "prometheus", - "uid": "prometheus" - }, - "editorMode": "code", - "expr": "{__name__=~\"yunikorn_root_${Queue}_queue_app\"}", - "legendFormat": "{{state}}", - "range": true, - "refId": "A" - } + "properties": [ + { + "id": "custom.axisLabel", + "value": "" + } + ] + } + ] + }, + "gridPos": { + "h": 6, + "w": 12, + "x": 0, + "y": 43 + }, + "id": 4, + "options": { + "legend": { + "calcs": [ + "lastNotNull", + "max", + "p95" ], - "title": "Application Status", - "type": "timeseries" + "displayMode": "table", + "placement": "right", + "showLegend": true }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ { "datasource": { "type": "prometheus", - "uid": "prometheus" + "uid": "${DS_PROMETHEUS}" }, - "fieldConfig": { - "defaults": { - "color": { - "mode": "palette-classic" - }, - "custom": { - "hideFrom": { - "legend": false, - "tooltip": false, - "viz": false - } - }, - "mappings": [] - }, - "overrides": [] - }, - "gridPos": { - "h": 6, - "w": 4, - "x": 12, - "y": 22 + "editorMode": "code", + "expr": "{__name__=~\"yunikorn_root_${Queue}_queue_resource\", resource=\"vcore\"}", + "legendFormat": "{{state}}", + "range": true, + "refId": "A" + } + ], + "title": "Vcore", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" }, - "id": 22, - "options": { - "legend": { - "displayMode": "list", - "placement": "bottom", - "showLegend": true - }, - "pieType": "pie", - "reduceOptions": { - "calcs": [ - "lastNotNull" - ], - "fields": "", - "values": false + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 30, + "gradientMode": "opacity", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false }, - "tooltip": { - "mode": "single", - "sort": "none" - } - }, - "targets": [ - { - "datasource": { - "type": "prometheus", - "uid": "prometheus" - }, - "editorMode": "code", - "expr": "{__name__=~\"yunikorn_root_${Queue}_queue_resource\", resource=\"pods\"}", - "legendFormat": "{{state}}", - "range": true, - "refId": "A" - } - ], - "title": "Pods", - "type": "piechart" - }, - { - "datasource": { - "type": "prometheus", - "uid": "prometheus" - }, - "fieldConfig": { - "defaults": { - "color": { - "mode": "palette-classic" - }, - "custom": { - "axisCenteredZero": false, - "axisColorMode": "text", - "axisLabel": "", - "axisPlacement": "auto", - "barAlignment": 0, - "drawStyle": "line", - "fillOpacity": 30, - "gradientMode": "opacity", - "hideFrom": { - "legend": false, - "tooltip": false, - "viz": false - }, - "lineInterpolation": "linear", - "lineWidth": 1, - "pointSize": 5, - "scaleDistribution": { - "type": "linear" - }, - "showPoints": "auto", - "spanNulls": false, - "stacking": { - "group": "A", - "mode": "none" - }, - "thresholdsStyle": { - "mode": "off" - } - }, - "mappings": [], - "thresholds": { - "mode": "absolute", - "steps": [ - { - "color": "green" - }, - { - "color": "red", - "value": 80 - } - ] - } + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" }, - "overrides": [] - }, - "gridPos": { - "h": 6, - "w": 8, - "x": 16, - "y": 22 - }, - "id": 6, - "options": { - "legend": { - "calcs": [], - "displayMode": "list", - "placement": "bottom", - "showLegend": true + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" }, - "tooltip": { - "mode": "single", - "sort": "none" - } - }, - "targets": [ - { - "datasource": { - "type": "prometheus", - "uid": "prometheus" - }, - "editorMode": "code", - "expr": "{__name__=~\"yunikorn_root_${Queue}_queue_resource\", resource=\"pods\"}", - "legendFormat": "{{state}}", - "range": true, - "refId": "A" + "thresholdsStyle": { + "mode": "off" } - ], - "title": "Pods", - "type": "timeseries" - }, - { - "datasource": { - "type": "prometheus", - "uid": "prometheus" }, - "fieldConfig": { - "defaults": { - "color": { - "mode": "palette-classic" - }, - "custom": { - "axisCenteredZero": false, - "axisColorMode": "text", - "axisLabel": "", - "axisPlacement": "auto", - "barAlignment": 0, - "drawStyle": "line", - "fillOpacity": 30, - "gradientMode": "opacity", - "hideFrom": { - "legend": false, - "tooltip": false, - "viz": false - }, - "lineInterpolation": "linear", - "lineWidth": 1, - "pointSize": 5, - "scaleDistribution": { - "type": "linear" - }, - "showPoints": "auto", - "spanNulls": false, - "stacking": { - "group": "A", - "mode": "none" - }, - "thresholdsStyle": { - "mode": "off" - } + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null }, - "mappings": [], - "thresholds": { - "mode": "absolute", - "steps": [ - { - "color": "green" - }, - { - "color": "red", - "value": 80 - } - ] + { + "color": "red", + "value": 80 } - }, - "overrides": [] - }, - "gridPos": { - "h": 6, - "w": 12, - "x": 0, - "y": 28 - }, - "id": 4, - "options": { - "legend": { - "calcs": [], - "displayMode": "list", - "placement": "bottom", - "showLegend": true - }, - "tooltip": { - "mode": "single", - "sort": "none" - } + ] }, - "targets": [ - { - "datasource": { - "type": "prometheus", - "uid": "prometheus" - }, - "editorMode": "code", - "expr": "{__name__=~\"yunikorn_root_${Queue}_queue_resource\", resource=\"vcore\"}", - "legendFormat": "{{state}}", - "range": true, - "refId": "A" - } + "unit": "decbytes" + }, + "overrides": [] + }, + "gridPos": { + "h": 6, + "w": 12, + "x": 12, + "y": 43 + }, + "id": 5, + "options": { + "legend": { + "calcs": [ + "lastNotNull", + "max", + "p95" ], - "title": "Vcore", - "type": "timeseries" + "displayMode": "table", + "placement": "right", + "showLegend": true }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ { "datasource": { "type": "prometheus", - "uid": "prometheus" - }, - "fieldConfig": { - "defaults": { - "color": { - "mode": "palette-classic" - }, - "custom": { - "axisCenteredZero": false, - "axisColorMode": "text", - "axisLabel": "", - "axisPlacement": "auto", - "barAlignment": 0, - "drawStyle": "line", - "fillOpacity": 30, - "gradientMode": "opacity", - "hideFrom": { - "legend": false, - "tooltip": false, - "viz": false - }, - "lineInterpolation": "linear", - "lineWidth": 1, - "pointSize": 5, - "scaleDistribution": { - "type": "linear" - }, - "showPoints": "auto", - "spanNulls": false, - "stacking": { - "group": "A", - "mode": "none" - }, - "thresholdsStyle": { - "mode": "off" - } - }, - "mappings": [], - "thresholds": { - "mode": "absolute", - "steps": [ - { - "color": "green" - }, - { - "color": "red", - "value": 80 - } - ] - } - }, - "overrides": [] + "uid": "${DS_PROMETHEUS}" }, - "gridPos": { - "h": 6, - "w": 12, - "x": 12, - "y": 28 - }, - "id": 5, - "options": { - "legend": { - "calcs": [], - "displayMode": "list", - "placement": "bottom", - "showLegend": true - }, - "tooltip": { - "mode": "single", - "sort": "none" - } - }, - "targets": [ - { - "datasource": { - "type": "prometheus", - "uid": "prometheus" - }, - "editorMode": "code", - "expr": "{__name__=~\"yunikorn_root_${Queue}_queue_resource\", resource=\"memory\"}", - "legendFormat": "{{state}}", - "range": true, - "refId": "A" - } - ], - "title": "Memory", - "type": "timeseries" + "editorMode": "code", + "expr": "{__name__=~\"yunikorn_root_${Queue}_queue_resource\", resource=\"memory\"}", + "legendFormat": "{{state}}", + "range": true, + "refId": "A" } ], - "repeat": "Queue", - "repeatDirection": "h", - "title": "Queue", - "type": "row" + "title": "Memory", + "type": "timeseries" } ], "refresh": "", - "schemaVersion": 38, - "style": "dark", + "schemaVersion": 39, "tags": [], "templating": { "list": [ { - "current": { - "selected": false, - "text": "default", - "value": "default" - }, + "current": {}, "datasource": { "type": "prometheus", - "uid": "prometheus" + "uid": "${DS_PROMETHEUS}" }, "definition": "metrics(yunikorn.+resource$)", "hide": 0, @@ -1362,6 +1356,6 @@ "timezone": "", "title": "Yunikorn", "uid": "d77c8ddf-b832-4e38-bf3f-2d363c7261ea", - "version": 9, + "version": 22, "weekStart": "" -} +} \ No newline at end of file diff --git a/deployments/grafana-dashboard/yunikorn-pprof.json b/deployments/grafana-dashboard/yunikorn-pprof.json index 8b8414be6..33b067001 100644 --- a/deployments/grafana-dashboard/yunikorn-pprof.json +++ b/deployments/grafana-dashboard/yunikorn-pprof.json @@ -80,7 +80,7 @@ "uid": "prometheus" }, "editorMode": "code", - "expr": "go_memstats_mspan_inuse_bytes{job=~\"yunikorn\"}", + "expr": "go_memstats_mspan_inuse_bytes{job=~\"yunikorn.*\"}", "format": "time_series", "intervalFactor": 1, "legendFormat": "{{__name__}}", @@ -93,7 +93,7 @@ "uid": "prometheus" }, "editorMode": "code", - "expr": "go_memstats_mspan_sys_bytes{job=~\"yunikorn\"}", + "expr": "go_memstats_mspan_sys_bytes{job=~\"yunikorn.*\"}", "format": "time_series", "intervalFactor": 1, "legendFormat": "{{__name__}}", @@ -105,7 +105,7 @@ "type": "prometheus", "uid": "prometheus" }, - "expr": "go_memstats_mcache_inuse_bytes{job=~\"yunikorn\"}", + "expr": "go_memstats_mcache_inuse_bytes{job=~\"yunikorn.*\"}", "format": "time_series", "intervalFactor": 1, "legendFormat": "{{__name__}}", @@ -116,7 +116,7 @@ "type": "prometheus", "uid": "prometheus" }, - "expr": "go_memstats_mcache_sys_bytes{job=~\"yunikorn\"}", + "expr": "go_memstats_mcache_sys_bytes{job=~\"yunikorn.*\"}", "format": "time_series", "intervalFactor": 1, "legendFormat": "{{__name__}}", @@ -127,7 +127,7 @@ "type": "prometheus", "uid": "prometheus" }, - "expr": "go_memstats_buck_hash_sys_bytes{job=~\"yunikorn\"}", + "expr": "go_memstats_buck_hash_sys_bytes{job=~\"yunikorn.*\"}", "format": "time_series", "intervalFactor": 1, "legendFormat": "{{__name__}}", @@ -138,7 +138,7 @@ "type": "prometheus", "uid": "prometheus" }, - "expr": "go_memstats_gc_sys_bytes{job=~\"yunikorn\"}", + "expr": "go_memstats_gc_sys_bytes{job=~\"yunikorn.*\"}", "format": "time_series", "intervalFactor": 1, "legendFormat": "{{__name__}}", @@ -149,7 +149,7 @@ "type": "prometheus", "uid": "prometheus" }, - "expr": "go_memstats_next_gc_bytes{job=~\"yunikorn\"}", + "expr": "go_memstats_next_gc_bytes{job=~\"yunikorn.*\"}", "format": "time_series", "intervalFactor": 1, "legendFormat": "{{__name__}}", @@ -242,7 +242,7 @@ "type": "prometheus", "uid": "prometheus" }, - "expr": "go_memstats_heap_alloc_bytes{job=~\"yunikorn\"}", + "expr": "go_memstats_heap_alloc_bytes{job=~\"yunikorn.*\"}", "format": "time_series", "intervalFactor": 1, "legendFormat": "{{__name__}}", @@ -253,7 +253,7 @@ "type": "prometheus", "uid": "prometheus" }, - "expr": "go_memstats_heap_sys_bytes{job=~\"yunikorn\"}", + "expr": "go_memstats_heap_sys_bytes{job=~\"yunikorn.*\"}", "format": "time_series", "intervalFactor": 1, "legendFormat": "{{__name__}}", @@ -264,7 +264,7 @@ "type": "prometheus", "uid": "prometheus" }, - "expr": "go_memstats_heap_idle_bytes{job=~\"yunikorn\"}", + "expr": "go_memstats_heap_idle_bytes{job=~\"yunikorn.*\"}", "format": "time_series", "intervalFactor": 1, "legendFormat": "{{__name__}}", @@ -276,7 +276,7 @@ "uid": "prometheus" }, "editorMode": "code", - "expr": "go_memstats_heap_inuse_bytes{job=~\"yunikorn\"}", + "expr": "go_memstats_heap_inuse_bytes{job=~\"yunikorn.*\"}", "format": "time_series", "intervalFactor": 1, "legendFormat": "{{__name__}}", @@ -288,7 +288,7 @@ "type": "prometheus", "uid": "prometheus" }, - "expr": "go_memstats_heap_released_bytes{job=~\"yunikorn\"}", + "expr": "go_memstats_heap_released_bytes{job=~\"yunikorn.*\"}", "format": "time_series", "intervalFactor": 1, "legendFormat": "{{__name__}}", @@ -386,7 +386,7 @@ "uid": "prometheus" }, "editorMode": "code", - "expr": "go_memstats_alloc_bytes{job=~\"yunikorn\"}", + "expr": "go_memstats_alloc_bytes{job=~\"yunikorn.*\"}", "format": "time_series", "intervalFactor": 2, "legendFormat": "{{pod}} - bytes allocated", @@ -401,7 +401,7 @@ "uid": "prometheus" }, "editorMode": "code", - "expr": "go_memstats_alloc_bytes_total{job=~\"yunikorn\"}", + "expr": "go_memstats_alloc_bytes_total{job=~\"yunikorn.*\"}", "format": "time_series", "intervalFactor": 2, "legendFormat": "{{pod}} - alloc rate", @@ -415,7 +415,7 @@ "type": "prometheus", "uid": "prometheus" }, - "expr": "go_memstats_stack_inuse_bytes{job=~\"yunikorn\"}", + "expr": "go_memstats_stack_inuse_bytes{job=~\"yunikorn.*\"}", "format": "time_series", "intervalFactor": 2, "legendFormat": "{{pod}} - stack inuse", @@ -428,7 +428,7 @@ "type": "prometheus", "uid": "prometheus" }, - "expr": "go_memstats_heap_inuse_bytes{job=~\"yunikorn\"}", + "expr": "go_memstats_heap_inuse_bytes{job=~\"yunikorn.*\"}", "format": "time_series", "hide": false, "intervalFactor": 2, @@ -526,7 +526,7 @@ "uid": "prometheus" }, "editorMode": "code", - "expr": "go_memstats_sys_bytes{job=~\"yunikorn\"}", + "expr": "go_memstats_sys_bytes{job=~\"yunikorn.*\"}", "format": "time_series", "intervalFactor": 1, "legendFormat": "{{__name__}}", @@ -621,7 +621,7 @@ "uid": "prometheus" }, "editorMode": "code", - "expr": "go_memstats_stack_inuse_bytes{job=~\"yunikorn\"}", + "expr": "go_memstats_stack_inuse_bytes{job=~\"yunikorn.*\"}", "format": "time_series", "intervalFactor": 1, "legendFormat": "{{__name__}}", @@ -634,7 +634,7 @@ "uid": "prometheus" }, "editorMode": "code", - "expr": "go_memstats_stack_sys_bytes{job=~\"yunikorn\"}", + "expr": "go_memstats_stack_sys_bytes{job=~\"yunikorn.*\"}", "format": "time_series", "intervalFactor": 1, "legendFormat": "{{__name__}}", @@ -729,7 +729,7 @@ "uid": "prometheus" }, "editorMode": "code", - "expr": "go_goroutines{job=~\"yunikorn\"}", + "expr": "go_goroutines{job=~\"yunikorn.*\"}", "format": "time_series", "intervalFactor": 1, "legendFormat": "{{__name__}}", @@ -826,7 +826,7 @@ "uid": "prometheus" }, "editorMode": "code", - "expr": "go_memstats_mallocs_total{job=~\"yunikorn\"} - go_memstats_frees_total{job=~\"yunikorn\"}", + "expr": "go_memstats_mallocs_total{job=~\"yunikorn.*\"} - go_memstats_frees_total{job=~\"yunikorn.*\"}", "format": "time_series", "intervalFactor": 1, "legendFormat": "# of live objects", @@ -922,7 +922,7 @@ "uid": "prometheus" }, "editorMode": "code", - "expr": "go_gc_duration_seconds{job=~\"yunikorn\"}", + "expr": "go_gc_duration_seconds{job=~\"yunikorn.*\"}", "format": "time_series", "intervalFactor": 1, "legendFormat": "{{quantile}}", @@ -1018,7 +1018,7 @@ "uid": "prometheus" }, "editorMode": "code", - "expr": "rate(go_memstats_lookups_total{job=~\"yunikorn\"}[30s])", + "expr": "rate(go_memstats_lookups_total{job=~\"yunikorn.*\"}[30s])", "format": "time_series", "intervalFactor": 1, "range": true, diff --git a/go.mod b/go.mod index fc0ffd5d3..195697f90 100644 --- a/go.mod +++ b/go.mod @@ -23,8 +23,8 @@ go 1.22.0 toolchain go1.22.5 require ( - github.com/apache/yunikorn-core v0.0.0-20241002095736-a2d3d43a145d - github.com/apache/yunikorn-scheduler-interface v0.0.0-20240924203603-aaf51c93d3a0 + github.com/apache/yunikorn-core v0.0.0-20241017135039-079a02dbdfa7 + github.com/apache/yunikorn-scheduler-interface v0.0.0-20241016105739-f0e241aa0146 github.com/google/go-cmp v0.6.0 github.com/google/uuid v1.6.0 github.com/looplab/fsm v1.0.1 diff --git a/go.sum b/go.sum index c9f1c1bd2..5cbb64964 100644 --- a/go.sum +++ b/go.sum @@ -8,10 +8,10 @@ github.com/NYTimes/gziphandler v1.1.1 h1:ZUDjpQae29j0ryrS0u/B8HZfJBtBQHjqw2rQ2cq github.com/NYTimes/gziphandler v1.1.1/go.mod h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c= github.com/antlr4-go/antlr/v4 v4.13.0 h1:lxCg3LAv+EUK6t1i0y1V6/SLeUi0eKEKdhQAlS8TVTI= github.com/antlr4-go/antlr/v4 v4.13.0/go.mod h1:pfChB/xh/Unjila75QW7+VU4TSnWnnk9UTnmpPaOR2g= -github.com/apache/yunikorn-core v0.0.0-20241002095736-a2d3d43a145d h1:awo2goBrw25P1aFNZgYJ0q7V+5ycMqMhvI60B75OzQg= -github.com/apache/yunikorn-core v0.0.0-20241002095736-a2d3d43a145d/go.mod h1:q6OXYpCTGvMJxsEorpIF6icKM/IioMmU6KcsclV1kI0= -github.com/apache/yunikorn-scheduler-interface v0.0.0-20240924203603-aaf51c93d3a0 h1:/9j0YXuifvoOl4YVEbO0r+DPkkYLzaQ+/ac+xCc7SY8= -github.com/apache/yunikorn-scheduler-interface v0.0.0-20240924203603-aaf51c93d3a0/go.mod h1:co3uU98sj1CUTPNTM13lTyi+CY0DOgDndDW2KiUjktU= +github.com/apache/yunikorn-core v0.0.0-20241017135039-079a02dbdfa7 h1:PY3kIiQYxsNcs42DK+8b7NxfTvMF0Z6eIuK+aJNWl18= +github.com/apache/yunikorn-core v0.0.0-20241017135039-079a02dbdfa7/go.mod h1:JA8Uee+D+T9v3p+YznGiGM9cLk5tzX+EM+YYr1TdFYo= +github.com/apache/yunikorn-scheduler-interface v0.0.0-20241016105739-f0e241aa0146 h1:CZ4U7y19YSxNJVBNox3DahhuoxDL++naBl/kj+kqVFc= +github.com/apache/yunikorn-scheduler-interface v0.0.0-20241016105739-f0e241aa0146/go.mod h1:co3uU98sj1CUTPNTM13lTyi+CY0DOgDndDW2KiUjktU= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a h1:idn718Q4B6AGu/h5Sxe66HYVdqdGu2l9Iebqhi/AEoA= diff --git a/pkg/cache/application_test.go b/pkg/cache/application_test.go index da6f085dc..8f52f33de 100644 --- a/pkg/cache/application_test.go +++ b/pkg/cache/application_test.go @@ -38,7 +38,6 @@ import ( "github.com/apache/yunikorn-k8shim/pkg/common/constants" "github.com/apache/yunikorn-k8shim/pkg/common/events" "github.com/apache/yunikorn-k8shim/pkg/common/utils" - "github.com/apache/yunikorn-k8shim/pkg/conf" "github.com/apache/yunikorn-k8shim/pkg/dispatcher" "github.com/apache/yunikorn-k8shim/pkg/locking" "github.com/apache/yunikorn-scheduler-interface/lib/go/api" @@ -132,8 +131,6 @@ func TestFailApplication(t *testing.T) { lock: &locking.RWMutex{}, } ms := &mockSchedulerAPI{} - // set test mode - conf.GetSchedulerConf().SetTestMode(true) // set Recorder to mocked type mr := events.NewMockedRecorder() mr.OnEventf = func() { @@ -228,8 +225,6 @@ func TestSetUnallocatedPodsToFailedWhenFailApplication(t *testing.T) { context.apiProvider.GetAPIs().KubeClient = mockClient ms := &mockSchedulerAPI{} - // set test mode - conf.GetSchedulerConf().SetTestMode(true) // set Recorder to mocked type mr := events.NewMockedRecorder() events.SetRecorder(mr) @@ -336,8 +331,6 @@ func TestSetUnallocatedPodsToFailedWhenRejectApplication(t *testing.T) { defer mgr.Stop() ms := &mockSchedulerAPI{} - // set test mode - conf.GetSchedulerConf().SetTestMode(true) // set Recorder to mocked type mr := events.NewMockedRecorder() events.SetRecorder(mr) diff --git a/pkg/cache/context.go b/pkg/cache/context.go index aaea690aa..28cac93f5 100644 --- a/pkg/cache/context.go +++ b/pkg/cache/context.go @@ -72,7 +72,7 @@ type Context struct { pluginMode bool // true if we are configured as a scheduler plugin namespace string // yunikorn namespace configMaps []*v1.ConfigMap // cached yunikorn configmaps - lock *locking.RWMutex // lock + lock *locking.RWMutex // lock - used not only for context data but also to ensure that multiple event types are not executed concurrently txnID atomic.Uint64 // transaction ID counter klogger klog.Logger } @@ -166,6 +166,8 @@ func (ctx *Context) addNode(obj interface{}) { } func (ctx *Context) updateNode(_, obj interface{}) { + ctx.lock.Lock() + defer ctx.lock.Unlock() node, err := convertToNode(obj) if err != nil { log.Log(log.ShimContext).Error("node conversion failed", zap.Error(err)) @@ -215,10 +217,8 @@ func (ctx *Context) updateNodeInternal(node *v1.Node, register bool) { if !common.Equals(prevCapacity, newCapacity) { // update capacity - if capacity, occupied, ok := ctx.schedulerCache.UpdateCapacity(node.Name, newCapacity); ok { - if err := ctx.updateNodeResources(node, capacity, occupied); err != nil { - log.Log(log.ShimContext).Warn("Failed to update node capacity", zap.Error(err)) - } + if err := ctx.updateNodeResources(node, newCapacity); err != nil { + log.Log(log.ShimContext).Warn("Failed to update node capacity", zap.Error(err)) } else { log.Log(log.ShimContext).Warn("Failed to update cached node capacity", zap.String("nodeName", node.Name)) } @@ -227,6 +227,8 @@ func (ctx *Context) updateNodeInternal(node *v1.Node, register bool) { } func (ctx *Context) deleteNode(obj interface{}) { + ctx.lock.Lock() + defer ctx.lock.Unlock() var node *v1.Node switch t := obj.(type) { case *v1.Node: @@ -246,6 +248,8 @@ func (ctx *Context) deleteNode(obj interface{}) { } func (ctx *Context) addNodesWithoutRegistering(nodes []*v1.Node) { + ctx.lock.Lock() + defer ctx.lock.Unlock() for _, node := range nodes { ctx.updateNodeInternal(node, false) } @@ -281,6 +285,8 @@ func (ctx *Context) AddPod(obj interface{}) { } func (ctx *Context) UpdatePod(_, newObj interface{}) { + ctx.lock.Lock() + defer ctx.lock.Unlock() pod, err := utils.Convert2Pod(newObj) if err != nil { log.Log(log.ShimContext).Error("failed to update pod", zap.Error(err)) @@ -328,7 +334,7 @@ func (ctx *Context) ensureAppAndTaskCreated(pod *v1.Pod, app *Application) { zap.String("name", pod.Name)) return } - app = ctx.AddApplication(&AddApplicationRequest{ + app = ctx.addApplication(&AddApplicationRequest{ Metadata: appMeta, }) } @@ -357,23 +363,26 @@ func (ctx *Context) updateForeignPod(pod *v1.Pod) { podStatusBefore = string(oldPod.Status.Phase) } - // conditions for allocate: - // 1. pod was previously assigned - // 2. pod is now assigned - // 3. pod is not in terminated state - // 4. pod references a known node - if oldPod == nil && utils.IsAssignedPod(pod) && !utils.IsPodTerminated(pod) { + // conditions for allocate/update: + // 1. pod is now assigned + // 2. pod is not in terminated state + // 3. pod references a known node + if utils.IsAssignedPod(pod) && !utils.IsPodTerminated(pod) { if ctx.schedulerCache.UpdatePod(pod) { // pod was accepted by a real node - log.Log(log.ShimContext).Debug("pod is assigned to a node, trigger occupied resource update", + log.Log(log.ShimContext).Debug("pod is assigned to a node, trigger foreign resource update", zap.String("namespace", pod.Namespace), zap.String("podName", pod.Name), zap.String("podStatusBefore", podStatusBefore), zap.String("podStatusCurrent", string(pod.Status.Phase))) - ctx.updateNodeOccupiedResources(pod.Spec.NodeName, pod.Namespace, pod.Name, common.GetPodResource(pod), schedulercache.AddOccupiedResource) + allocReq := common.CreateAllocationForForeignPod(pod) + if err := ctx.apiProvider.GetAPIs().SchedulerAPI.UpdateAllocation(allocReq); err != nil { + log.Log(log.ShimContext).Error("failed to add foreign allocation to the core", + zap.Error(err)) + } } else { // pod is orphaned (references an unknown node) - log.Log(log.ShimContext).Info("skipping occupied resource update for assigned orphaned pod", + log.Log(log.ShimContext).Info("skipping updating allocation for assigned orphaned pod", zap.String("namespace", pod.Namespace), zap.String("podName", pod.Name), zap.String("nodeName", pod.Spec.NodeName)) @@ -387,18 +396,22 @@ func (ctx *Context) updateForeignPod(pod *v1.Pod) { // 3. pod references a known node if oldPod != nil && utils.IsPodTerminated(pod) { if !ctx.schedulerCache.IsPodOrphaned(string(pod.UID)) { - log.Log(log.ShimContext).Debug("pod terminated, trigger occupied resource update", + log.Log(log.ShimContext).Debug("pod terminated, trigger foreign resource update", zap.String("namespace", pod.Namespace), zap.String("podName", pod.Name), zap.String("podStatusBefore", podStatusBefore), zap.String("podStatusCurrent", string(pod.Status.Phase))) // this means pod is terminated - // we need sub the occupied resource and re-sync with the scheduler-core - ctx.updateNodeOccupiedResources(pod.Spec.NodeName, pod.Namespace, pod.Name, common.GetPodResource(pod), schedulercache.SubOccupiedResource) + // remove from the scheduler cache and create release request to remove foreign allocation from the core ctx.schedulerCache.RemovePod(pod) + releaseReq := common.CreateReleaseRequestForForeignPod(string(pod.UID), constants.DefaultPartition) + if err := ctx.apiProvider.GetAPIs().SchedulerAPI.UpdateAllocation(releaseReq); err != nil { + log.Log(log.ShimContext).Error("failed to remove foreign allocation from the core", + zap.Error(err)) + } } else { // pod is orphaned (references an unknown node) - log.Log(log.ShimContext).Info("skipping occupied resource update for terminated orphaned pod", + log.Log(log.ShimContext).Info("skipping foreign resource update for terminated orphaned pod", zap.String("namespace", pod.Namespace), zap.String("podName", pod.Name), zap.String("nodeName", pod.Spec.NodeName)) @@ -432,8 +445,10 @@ func (ctx *Context) DeletePod(obj interface{}) { } func (ctx *Context) deleteYuniKornPod(pod *v1.Pod) { + ctx.lock.Lock() + defer ctx.lock.Unlock() if taskMeta, ok := getTaskMetadata(pod); ok { - ctx.notifyTaskComplete(ctx.GetApplication(taskMeta.ApplicationID), taskMeta.TaskID) + ctx.notifyTaskComplete(ctx.getApplication(taskMeta.ApplicationID), taskMeta.TaskID) } log.Log(log.ShimContext).Debug("removing pod from cache", zap.String("podName", pod.Name)) @@ -441,51 +456,16 @@ func (ctx *Context) deleteYuniKornPod(pod *v1.Pod) { } func (ctx *Context) deleteForeignPod(pod *v1.Pod) { - oldPod := ctx.schedulerCache.GetPod(string(pod.UID)) - if oldPod == nil { - // if pod is not in scheduler cache, no node updates are needed - log.Log(log.ShimContext).Debug("unknown foreign pod deleted, no resource updated needed", - zap.String("namespace", pod.Namespace), - zap.String("podName", pod.Name)) - return - } - - // conditions for release: - // 1. pod is already assigned to a node - // 2. pod was not in a terminal state before - // 3. pod references a known node - if !utils.IsPodTerminated(oldPod) { - if !ctx.schedulerCache.IsPodOrphaned(string(oldPod.UID)) { - log.Log(log.ShimContext).Debug("foreign pod deleted, triggering occupied resource update", - zap.String("namespace", pod.Namespace), - zap.String("podName", pod.Name), - zap.String("podStatusBefore", string(oldPod.Status.Phase)), - zap.String("podStatusCurrent", string(pod.Status.Phase))) - // this means pod is terminated - // we need sub the occupied resource and re-sync with the scheduler-core - ctx.updateNodeOccupiedResources(pod.Spec.NodeName, pod.Namespace, pod.Name, common.GetPodResource(pod), schedulercache.SubOccupiedResource) - } else { - // pod is orphaned (references an unknown node) - log.Log(log.ShimContext).Info("skipping occupied resource update for removed orphaned pod", - zap.String("namespace", pod.Namespace), - zap.String("podName", pod.Name), - zap.String("nodeName", pod.Spec.NodeName)) - } - ctx.schedulerCache.RemovePod(pod) + ctx.lock.Lock() + defer ctx.lock.Unlock() + releaseReq := common.CreateReleaseRequestForForeignPod(string(pod.UID), constants.DefaultPartition) + if err := ctx.apiProvider.GetAPIs().SchedulerAPI.UpdateAllocation(releaseReq); err != nil { + log.Log(log.ShimContext).Error("failed to remove foreign allocation from the core", + zap.Error(err)) } -} -func (ctx *Context) updateNodeOccupiedResources(nodeName string, namespace string, podName string, resource *si.Resource, opt schedulercache.UpdateType) { - if common.IsZero(resource) { - return - } - if node, capacity, occupied, ok := ctx.schedulerCache.UpdateOccupiedResource(nodeName, namespace, podName, resource, opt); ok { - if err := ctx.updateNodeResources(node, capacity, occupied); err != nil { - log.Log(log.ShimContext).Warn("scheduler rejected update to node occupied resources", zap.Error(err)) - } - } else { - log.Log(log.ShimContext).Warn("unable to update occupied resources for node", zap.String("nodeName", nodeName)) - } + log.Log(log.ShimContext).Debug("removing pod from cache", zap.String("podName", pod.Name)) + ctx.schedulerCache.RemovePod(pod) } // filter configMap for the scheduler @@ -571,6 +551,8 @@ func (ctx *Context) addPriorityClass(obj interface{}) { } func (ctx *Context) updatePriorityClass(_, newObj interface{}) { + ctx.lock.Lock() + defer ctx.lock.Unlock() if priorityClass := utils.Convert2PriorityClass(newObj); priorityClass != nil { ctx.updatePriorityClassInternal(priorityClass) } @@ -581,6 +563,8 @@ func (ctx *Context) updatePriorityClassInternal(priorityClass *schedulingv1.Prio } func (ctx *Context) deletePriorityClass(obj interface{}) { + ctx.lock.Lock() + defer ctx.lock.Unlock() log.Log(log.ShimContext).Debug("priorityClass deleted") var priorityClass *schedulingv1.PriorityClass switch t := obj.(type) { @@ -646,6 +630,8 @@ func (ctx *Context) EventsToRegister(queueingHintFn framework.QueueingHintFn) [] // IsPodFitNode evaluates given predicates based on current context func (ctx *Context) IsPodFitNode(name, node string, allocate bool) error { + ctx.lock.RLock() + defer ctx.lock.RUnlock() pod := ctx.schedulerCache.GetPod(name) if pod == nil { return ErrorPodNotFound @@ -666,6 +652,8 @@ func (ctx *Context) IsPodFitNode(name, node string, allocate bool) error { } func (ctx *Context) IsPodFitNodeViaPreemption(name, node string, allocations []string, startIndex int) (int, bool) { + ctx.lock.RLock() + defer ctx.lock.RUnlock() if pod := ctx.schedulerCache.GetPod(name); pod != nil { // if pod exists in cache, try to run predicates if targetNode := ctx.schedulerCache.GetNode(node); targetNode != nil { @@ -774,6 +762,8 @@ func (ctx *Context) bindPodVolumes(pod *v1.Pod) error { // this way, the core can make allocation decisions with consideration of // other assumed pods before they are actually bound to the node (bound is slow). func (ctx *Context) AssumePod(name, node string) error { + ctx.lock.Lock() + defer ctx.lock.Unlock() if pod := ctx.schedulerCache.GetPod(name); pod != nil { // when add assumed pod, we make a copy of the pod to avoid // modifying its original reference. otherwise, it may have @@ -833,6 +823,8 @@ func (ctx *Context) AssumePod(name, node string) error { // forget pod must be called when a pod is assumed to be running on a node, // but then for some reason it is failed to bind or released. func (ctx *Context) ForgetPod(name string) { + ctx.lock.Lock() + defer ctx.lock.Unlock() if pod := ctx.schedulerCache.GetPod(name); pod != nil { log.Log(log.ShimContext).Debug("forget pod", zap.String("pod", pod.Name)) ctx.schedulerCache.ForgetPod(pod) @@ -949,6 +941,10 @@ func (ctx *Context) AddApplication(request *AddApplicationRequest) *Application ctx.lock.Lock() defer ctx.lock.Unlock() + return ctx.addApplication(request) +} + +func (ctx *Context) addApplication(request *AddApplicationRequest) *Application { log.Log(log.ShimContext).Debug("AddApplication", zap.Any("Request", request)) if app := ctx.getApplication(request.Metadata.ApplicationID); app != nil { return app @@ -1026,6 +1022,8 @@ func (ctx *Context) RemoveApplication(appID string) { // this implements ApplicationManagementProtocol func (ctx *Context) AddTask(request *AddTaskRequest) *Task { + ctx.lock.Lock() + defer ctx.lock.Unlock() return ctx.addTask(request) } @@ -1074,8 +1072,8 @@ func (ctx *Context) addTask(request *AddTaskRequest) *Task { } func (ctx *Context) RemoveTask(appID, taskID string) { - ctx.lock.RLock() - defer ctx.lock.RUnlock() + ctx.lock.Lock() + defer ctx.lock.Unlock() app, ok := ctx.applications[appID] if !ok { log.Log(log.ShimContext).Debug("Attempted to remove task from non-existent application", zap.String("appID", appID)) @@ -1085,7 +1083,9 @@ func (ctx *Context) RemoveTask(appID, taskID string) { } func (ctx *Context) getTask(appID string, taskID string) *Task { - app := ctx.GetApplication(appID) + ctx.lock.RLock() + defer ctx.lock.RUnlock() + app := ctx.getApplication(appID) if app == nil { log.Log(log.ShimContext).Debug("application is not found in the context", zap.String("appID", appID)) @@ -1354,7 +1354,7 @@ func (ctx *Context) InitializeState() error { log.Log(log.ShimContext).Error("failed to load nodes", zap.Error(err)) return err } - acceptedNodes, err := ctx.registerNodes(nodes) + acceptedNodes, err := ctx.RegisterNodes(nodes) if err != nil { log.Log(log.ShimContext).Error("failed to register nodes", zap.Error(err)) return err @@ -1474,11 +1474,17 @@ func (ctx *Context) registerNode(node *v1.Node) error { return nil } +func (ctx *Context) RegisterNodes(nodes []*v1.Node) ([]*v1.Node, error) { + ctx.lock.Lock() + defer ctx.lock.Unlock() + return ctx.registerNodes(nodes) +} + +// registerNodes registers the nodes to the scheduler core. +// This method must be called while holding the Context write lock. func (ctx *Context) registerNodes(nodes []*v1.Node) ([]*v1.Node, error) { nodesToRegister := make([]*si.NodeInfo, 0) pendingNodes := make(map[string]*v1.Node) - acceptedNodes := make([]*v1.Node, 0) - rejectedNodes := make([]*v1.Node, 0) // Generate a NodeInfo object for each node and add to the registration request for _, node := range nodes { @@ -1492,17 +1498,38 @@ func (ctx *Context) registerNodes(nodes []*v1.Node) ([]*v1.Node, error) { constants.DefaultNodeAttributeRackNameKey: constants.DefaultRackName, }, SchedulableResource: common.GetNodeResource(&nodeStatus), - OccupiedResource: common.NewResourceBuilder().Build(), }) pendingNodes[node.Name] = node } - var wg sync.WaitGroup + acceptedNodes, rejectedNodes, err := ctx.registerNodesInternal(nodesToRegister, pendingNodes) + if err != nil { + log.Log(log.ShimContext).Error("Failed to register nodes", zap.Error(err)) + return nil, err + } + for _, node := range acceptedNodes { + // post a successful event to the node + events.GetRecorder().Eventf(node.DeepCopy(), nil, v1.EventTypeNormal, "NodeAccepted", "NodeAccepted", + fmt.Sprintf("node %s is accepted by the scheduler", node.Name)) + } + for _, node := range rejectedNodes { + // post a failure event to the node + events.GetRecorder().Eventf(node.DeepCopy(), nil, v1.EventTypeWarning, "NodeRejected", "NodeRejected", + fmt.Sprintf("node %s is rejected by the scheduler", node.Name)) + } + + return acceptedNodes, nil +} + +func (ctx *Context) registerNodesInternal(nodesToRegister []*si.NodeInfo, pendingNodes map[string]*v1.Node) ([]*v1.Node, []*v1.Node, error) { + acceptedNodes := make([]*v1.Node, 0) + rejectedNodes := make([]*v1.Node, 0) + + var wg sync.WaitGroup // initialize wait group with the number of responses we expect wg.Add(len(pendingNodes)) - // register with the dispatcher so that we can track our response handlerID := fmt.Sprintf("%s-%d", registerNodeContextHandler, ctx.txnID.Add(1)) dispatcher.RegisterEventHandler(handlerID, dispatcher.EventTypeNode, func(event interface{}) { nodeEvent, ok := event.(CachedSchedulerNodeEvent) @@ -1534,24 +1561,17 @@ func (ctx *Context) registerNodes(nodes []*v1.Node) ([]*v1.Node, error) { RmID: schedulerconf.GetSchedulerConf().ClusterID, }); err != nil { log.Log(log.ShimContext).Error("Failed to register nodes", zap.Error(err)) - return nil, err + return nil, nil, err } + // write lock must always be held at this point, releasing it while waiting to avoid any potential deadlocks + ctx.lock.Unlock() + defer ctx.lock.Lock() + // wait for all responses to accumulate wg.Wait() - for _, node := range acceptedNodes { - // post a successful event to the node - events.GetRecorder().Eventf(node.DeepCopy(), nil, v1.EventTypeNormal, "NodeAccepted", "NodeAccepted", - fmt.Sprintf("node %s is accepted by the scheduler", node.Name)) - } - for _, node := range rejectedNodes { - // post a failure event to the node - events.GetRecorder().Eventf(node.DeepCopy(), nil, v1.EventTypeWarning, "NodeRejected", "NodeRejected", - fmt.Sprintf("node %s is rejected by the scheduler", node.Name)) - } - - return acceptedNodes, nil + return acceptedNodes, rejectedNodes, nil } func (ctx *Context) decommissionNode(node *v1.Node) error { @@ -1559,8 +1579,8 @@ func (ctx *Context) decommissionNode(node *v1.Node) error { return ctx.apiProvider.GetAPIs().SchedulerAPI.UpdateNode(request) } -func (ctx *Context) updateNodeResources(node *v1.Node, capacity *si.Resource, occupied *si.Resource) error { - request := common.CreateUpdateRequestForUpdatedNode(node.Name, capacity, occupied) +func (ctx *Context) updateNodeResources(node *v1.Node, capacity *si.Resource) error { + request := common.CreateUpdateRequestForUpdatedNode(node.Name, capacity) return ctx.apiProvider.GetAPIs().SchedulerAPI.UpdateNode(request) } diff --git a/pkg/cache/context_test.go b/pkg/cache/context_test.go index 6d7bfafa4..efeeffd78 100644 --- a/pkg/cache/context_test.go +++ b/pkg/cache/context_test.go @@ -22,6 +22,7 @@ import ( "encoding/json" "fmt" "strings" + "sync/atomic" "testing" "time" @@ -35,14 +36,11 @@ import ( k8sEvents "k8s.io/client-go/tools/events" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding" - schedulercache "github.com/apache/yunikorn-k8shim/pkg/cache/external" "github.com/apache/yunikorn-k8shim/pkg/client" - "github.com/apache/yunikorn-k8shim/pkg/common" "github.com/apache/yunikorn-k8shim/pkg/common/constants" "github.com/apache/yunikorn-k8shim/pkg/common/events" "github.com/apache/yunikorn-k8shim/pkg/common/test" "github.com/apache/yunikorn-k8shim/pkg/common/utils" - "github.com/apache/yunikorn-k8shim/pkg/conf" "github.com/apache/yunikorn-k8shim/pkg/dispatcher" "github.com/apache/yunikorn-k8shim/pkg/log" siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common" @@ -75,11 +73,13 @@ const ( taskUID4 = "task00004" taskUnknown = "non_existing_taskID" - podName1 = "pod1" - podName2 = "pod2" - podName3 = "pod3" - podName4 = "pod4" - podNamespace = "yk" + podName1 = "pod1" + podName2 = "pod2" + podName3 = "pod3" + podName4 = "pod4" + podForeignName = "foreign-1" + podForeignUID = "UUID-foreign-1" + podNamespace = "yk" nodeName1 = "node1" nodeName2 = "node2" @@ -103,7 +103,6 @@ func initContextForTest() *Context { } func initContextAndAPIProviderForTest() (*Context, *client.MockedAPIProvider) { - conf.GetSchedulerConf().SetTestMode(true) apis := client.NewMockedAPIProvider(false) context := NewContext(apis) return context, apis @@ -182,8 +181,8 @@ func TestUpdateNodes(t *testing.T) { }) oldNodeResource := make(map[v1.ResourceName]resource.Quantity) - oldNodeResource[v1.ResourceName("memory")] = *resource.NewQuantity(1024*1000*1000, resource.DecimalSI) - oldNodeResource[v1.ResourceName("cpu")] = *resource.NewQuantity(2, resource.DecimalSI) + oldNodeResource["memory"] = *resource.NewQuantity(1024*1000*1000, resource.DecimalSI) + oldNodeResource["cpu"] = *resource.NewQuantity(2, resource.DecimalSI) oldNode := v1.Node{ ObjectMeta: apis.ObjectMeta{ Name: Host1, @@ -196,8 +195,8 @@ func TestUpdateNodes(t *testing.T) { } newNodeResource := make(map[v1.ResourceName]resource.Quantity) - newNodeResource[v1.ResourceName("memory")] = *resource.NewQuantity(2048*1000*1000, resource.DecimalSI) - newNodeResource[v1.ResourceName("cpu")] = *resource.NewQuantity(4, resource.DecimalSI) + newNodeResource["memory"] = *resource.NewQuantity(2048*1000*1000, resource.DecimalSI) + newNodeResource["cpu"] = *resource.NewQuantity(4, resource.DecimalSI) newNode := v1.Node{ ObjectMeta: apis.ObjectMeta{ Name: Host1, @@ -211,12 +210,6 @@ func TestUpdateNodes(t *testing.T) { ctx.addNode(&oldNode) ctx.updateNode(&oldNode, &newNode) - - _, capacity, _, ok := ctx.schedulerCache.UpdateOccupiedResource( - Host1, "n/a", "n/a", nil, schedulercache.AddOccupiedResource) - assert.Assert(t, ok, "unable to retrieve node capacity") - assert.Equal(t, int64(2048*1000*1000), capacity.Resources[siCommon.Memory].Value) - assert.Equal(t, int64(4000), capacity.Resources[siCommon.CPU].Value) } func TestDeleteNodes(t *testing.T) { @@ -529,39 +522,11 @@ func TestAddUpdatePodForeign(t *testing.T) { defer dispatcher.UnregisterAllEventHandlers() defer dispatcher.Stop() - executed := false - expectAdd := false - expectRemove := false - tc := "" - - validatorFunc := func(request *si.NodeRequest) error { - assert.Equal(t, len(request.Nodes), 1, "%s: wrong node count", tc) - updatedNode := request.Nodes[0] - assert.Equal(t, updatedNode.NodeID, Host1, "%s: wrong nodeID", tc) - switch updatedNode.Action { - case si.NodeInfo_CREATE_DRAIN: - return nil - case si.NodeInfo_DRAIN_TO_SCHEDULABLE: - return nil - case si.NodeInfo_UPDATE: - executed = true - default: - assert.Equal(t, false, "Unexpected action: %d", updatedNode.Action) - return nil - } - assert.Equal(t, updatedNode.SchedulableResource.Resources[siCommon.Memory].Value, int64(10000*1000*1000), "%s: wrong schedulable memory", tc) - assert.Equal(t, updatedNode.SchedulableResource.Resources[siCommon.CPU].Value, int64(10000), "%s: wrong schedulable cpu", tc) - if expectAdd { - assert.Equal(t, updatedNode.OccupiedResource.Resources[siCommon.Memory].Value, int64(1000*1000*1000), "%s: wrong occupied memory (add)", tc) - assert.Equal(t, updatedNode.OccupiedResource.Resources[siCommon.CPU].Value, int64(500), "%s: wrong occupied cpu (add)", tc) - } - if expectRemove { - assert.Equal(t, updatedNode.OccupiedResource.Resources[siCommon.Memory].Value, int64(0), "%s: wrong occupied memory (remove)", tc) - assert.Equal(t, updatedNode.OccupiedResource.Resources[siCommon.CPU].Value, int64(0), "%s: wrong occupied cpu (remove)", tc) - } + var allocRequest *si.AllocationRequest + apiProvider.MockSchedulerAPIUpdateAllocationFn(func(request *si.AllocationRequest) error { + allocRequest = request return nil - } - + }) apiProvider.MockSchedulerAPIUpdateNodeFn(func(request *si.NodeRequest) error { for _, node := range request.Nodes { if node.Action == si.NodeInfo_CREATE_DRAIN { @@ -571,10 +536,10 @@ func TestAddUpdatePodForeign(t *testing.T) { }) } } - return validatorFunc(request) + return nil }) - host1 := nodeForTest(Host1, "10G", "10") + host1 := nodeForTest(Host1, "10G", "10") // add existing foreign pod context.updateNode(nil, host1) // pod is not assigned to any node @@ -582,22 +547,18 @@ func TestAddUpdatePodForeign(t *testing.T) { pod1.Status.Phase = v1.PodPending pod1.Spec.NodeName = "" - // validate add - tc = "add-pod1" - executed = false - expectAdd = false - expectRemove = false + // validate add (pending, no node assigned) + allocRequest = nil context.AddPod(pod1) - assert.Assert(t, !executed, "unexpected update") + assert.Assert(t, allocRequest == nil, "unexpected update") pod := context.schedulerCache.GetPod(string(pod1.UID)) assert.Assert(t, pod == nil, "unassigned pod found in cache") - // validate update - tc = "update-pod1" - executed = false - expectRemove = false + // validate update (no change) + allocRequest = nil context.UpdatePod(nil, pod1) - assert.Assert(t, !executed, "unexpected update") + assert.Assert(t, allocRequest == nil, "unexpected update") + pod = context.schedulerCache.GetPod(string(pod1.UID)) assert.Assert(t, pod == nil, "unassigned pod found in cache") // pod is assigned to a node but still in pending state, should update @@ -606,155 +567,91 @@ func TestAddUpdatePodForeign(t *testing.T) { pod2.Spec.NodeName = Host1 // validate add - tc = "add-pod2" - executed = false - expectAdd = true - expectRemove = false context.AddPod(pod2) - assert.Assert(t, executed, "updated expected") + assert.Assert(t, allocRequest != nil, "update expected") + assertAddForeignPod(t, podName2, Host1, allocRequest) pod = context.schedulerCache.GetPod(string(pod2.UID)) assert.Assert(t, pod != nil, "pod not found in cache") - // validate update - tc = "update-pod2" - executed = false - expectAdd = false - expectRemove = false + // validate update (no change) + allocRequest = nil context.UpdatePod(nil, pod2) - assert.Assert(t, !executed, "unexpected update") + assert.Assert(t, allocRequest != nil, "update expected") pod = context.schedulerCache.GetPod(string(pod2.UID)) assert.Assert(t, pod != nil, "pod not found in cache") // validate update when not already in cache - tc = "update-pod2-nocache-pre" - executed = false - expectAdd = false - expectRemove = true + allocRequest = nil context.DeletePod(pod2) - assert.Assert(t, executed, "expected update") - tc = "update-pod2-nocache" - executed = false - expectAdd = true - expectRemove = false + assertReleaseForeignPod(t, podName2, allocRequest) + + allocRequest = nil context.UpdatePod(nil, pod2) - assert.Assert(t, executed, "expected update") + assert.Assert(t, allocRequest != nil, "expected update") pod = context.schedulerCache.GetPod(string(pod2.UID)) assert.Assert(t, pod != nil, "pod not found in cache") + assertAddForeignPod(t, podName2, Host1, allocRequest) // pod is failed, should trigger update if already in cache pod3 := pod2.DeepCopy() pod3.Status.Phase = v1.PodFailed // validate add - tc = "add-pod3" - executed = false - expectAdd = false - expectRemove = true + allocRequest = nil context.AddPod(pod3) - assert.Assert(t, executed, "expected update") + assert.Assert(t, allocRequest != nil, "expected update") pod = context.schedulerCache.GetPod(string(pod3.UID)) assert.Assert(t, pod == nil, "failed pod found in cache") + assert.Assert(t, allocRequest.Releases != nil) // expecting a release due to pod status + assertReleaseForeignPod(t, podName2, allocRequest) +} - // validate update when not already in cache - tc = "update-pod3-pre" - executed = false - expectAdd = true - expectRemove = false - context.AddPod(pod2) - tc = "update-pod3" - executed = false - expectAdd = false - expectRemove = true - context.UpdatePod(nil, pod3) - assert.Assert(t, executed, "expected update") - pod = context.schedulerCache.GetPod(string(pod3.UID)) - assert.Assert(t, pod == nil, "failed pod found in cache") +func assertAddForeignPod(t *testing.T, podName, host string, allocRequest *si.AllocationRequest) { + t.Helper() + assert.Equal(t, 1, len(allocRequest.Allocations)) + tags := allocRequest.Allocations[0].AllocationTags + assert.Equal(t, 2, len(tags)) + assert.Equal(t, siCommon.AllocTypeDefault, tags[siCommon.Foreign]) + assert.Equal(t, podName, allocRequest.Allocations[0].AllocationKey) + assert.Equal(t, host, allocRequest.Allocations[0].NodeID) +} + +func assertReleaseForeignPod(t *testing.T, podName string, allocRequest *si.AllocationRequest) { + t.Helper() + assert.Assert(t, allocRequest.Releases != nil) + assert.Equal(t, 1, len(allocRequest.Releases.AllocationsToRelease)) + assert.Equal(t, podName, allocRequest.Releases.AllocationsToRelease[0].AllocationKey) + assert.Equal(t, constants.DefaultPartition, allocRequest.Releases.AllocationsToRelease[0].PartitionName) + assert.Equal(t, "", allocRequest.Releases.AllocationsToRelease[0].ApplicationID) + assert.Equal(t, si.TerminationType_STOPPED_BY_RM, allocRequest.Releases.AllocationsToRelease[0].TerminationType) } func TestDeletePodForeign(t *testing.T) { context, apiProvider := initContextAndAPIProviderForTest() - dispatcher.Start() - defer dispatcher.UnregisterAllEventHandlers() - defer dispatcher.Stop() - executed := false - expectAdd := false - expectRemove := false - tc := "" - - validatorFunc := func(request *si.NodeRequest) error { - executed = true - assert.Equal(t, len(request.Nodes), 1, "%s: wrong node count", tc) - updatedNode := request.Nodes[0] - switch updatedNode.Action { - case si.NodeInfo_CREATE_DRAIN: - return nil - case si.NodeInfo_DRAIN_TO_SCHEDULABLE: - return nil - case si.NodeInfo_UPDATE: - executed = true - default: - assert.Equal(t, false, "Unexpected action: %d", updatedNode.Action) - return nil - } - assert.Equal(t, updatedNode.NodeID, Host1, "%s: wrong nodeID", tc) - assert.Equal(t, updatedNode.Action, si.NodeInfo_UPDATE, "%s: wrong action", tc) - assert.Equal(t, updatedNode.SchedulableResource.Resources[siCommon.Memory].Value, int64(10000*1000*1000), "%s: wrong schedulable memory", tc) - assert.Equal(t, updatedNode.SchedulableResource.Resources[siCommon.CPU].Value, int64(10000), "%s: wrong schedulable cpu", tc) - if expectAdd { - assert.Equal(t, updatedNode.OccupiedResource.Resources[siCommon.Memory].Value, int64(1000*1000*1000), "%s: wrong occupied memory (add)", tc) - assert.Equal(t, updatedNode.OccupiedResource.Resources[siCommon.CPU].Value, int64(500), "%s: wrong occupied cpu (add)", tc) - } - if expectRemove { - assert.Equal(t, updatedNode.OccupiedResource.Resources[siCommon.Memory].Value, int64(0), "%s: wrong occupied memory (remove)", tc) - assert.Equal(t, updatedNode.OccupiedResource.Resources[siCommon.CPU].Value, int64(0), "%s: wrong occupied cpu (remove)", tc) - } + var allocRequest *si.AllocationRequest + apiProvider.MockSchedulerAPIUpdateAllocationFn(func(request *si.AllocationRequest) error { + allocRequest = request return nil - } - - apiProvider.MockSchedulerAPIUpdateNodeFn(func(request *si.NodeRequest) error { - for _, node := range request.Nodes { - if node.Action == si.NodeInfo_CREATE_DRAIN { - dispatcher.Dispatch(CachedSchedulerNodeEvent{ - NodeID: node.NodeID, - Event: NodeAccepted, - }) - } - } - return validatorFunc(request) }) - host1 := nodeForTest(Host1, "10G", "10") - context.updateNode(nil, host1) - - // add existing pod + // add existing foreign pod pod1 := foreignPod(podName1, "1G", "500m") pod1.Status.Phase = v1.PodRunning pod1.Spec.NodeName = Host1 - - // validate deletion of existing assigned pod - tc = "delete-pod1-pre" - executed = false - expectAdd = true - expectRemove = false context.AddPod(pod1) - tc = "delete-pod1" - executed = false - expectAdd = false - expectRemove = true + allocRequest = nil context.DeletePod(pod1) - assert.Assert(t, executed, "update not executed") - pod := context.schedulerCache.GetPod(string(pod1.UID)) - assert.Assert(t, pod == nil, "deleted pod found in cache") - // validate delete when not already found - tc = "delete-pod1-again" - executed = false - expectAdd = false - expectRemove = false - context.DeletePod(pod1) - assert.Assert(t, !executed, "unexpected update") - pod = context.schedulerCache.GetPod(string(pod1.UID)) + assert.Assert(t, allocRequest != nil, "update not executed") + assert.Equal(t, 0, len(allocRequest.Allocations)) + assert.Assert(t, allocRequest.Releases != nil) + assert.Equal(t, 1, len(allocRequest.Releases.AllocationsToRelease)) + assert.Equal(t, podName1, allocRequest.Releases.AllocationsToRelease[0].AllocationKey) + assert.Equal(t, constants.DefaultPartition, allocRequest.Releases.AllocationsToRelease[0].PartitionName) + assert.Equal(t, "", allocRequest.Releases.AllocationsToRelease[0].ApplicationID) + assert.Equal(t, si.TerminationType_STOPPED_BY_RM, allocRequest.Releases.AllocationsToRelease[0].TerminationType) + pod := context.schedulerCache.GetPod(string(pod1.UID)) assert.Assert(t, pod == nil, "deleted pod found in cache") } @@ -1160,7 +1057,6 @@ func TestGetTask(t *testing.T) { } func TestNodeEventFailsPublishingWithoutNode(t *testing.T) { - conf.GetSchedulerConf().SetTestMode(true) recorder, ok := events.GetRecorder().(*k8sEvents.FakeRecorder) if !ok { t.Fatal("the EventRecorder is expected to be of type FakeRecorder") @@ -1189,7 +1085,6 @@ func TestNodeEventFailsPublishingWithoutNode(t *testing.T) { } func TestNodeEventPublishedCorrectly(t *testing.T) { - conf.GetSchedulerConf().SetTestMode(true) recorder, ok := events.GetRecorder().(*k8sEvents.FakeRecorder) if !ok { t.Fatal("the EventRecorder is expected to be of type FakeRecorder") @@ -1250,7 +1145,6 @@ func TestNodeEventPublishedCorrectly(t *testing.T) { } func TestFilteredEventsNotPublished(t *testing.T) { - conf.GetSchedulerConf().SetTestMode(true) recorder, ok := events.GetRecorder().(*k8sEvents.FakeRecorder) if !ok { t.Fatal("the EventRecorder is expected to be of type FakeRecorder") @@ -1335,7 +1229,6 @@ func TestFilteredEventsNotPublished(t *testing.T) { } func TestPublishEventsWithNotExistingAsk(t *testing.T) { - conf.GetSchedulerConf().SetTestMode(true) recorder, ok := events.GetRecorder().(*k8sEvents.FakeRecorder) if !ok { t.Fatal("the EventRecorder is expected to be of type FakeRecorder") @@ -1376,7 +1269,6 @@ func TestPublishEventsWithNotExistingAsk(t *testing.T) { } func TestPublishEventsCorrectly(t *testing.T) { - conf.GetSchedulerConf().SetTestMode(true) recorder, ok := events.GetRecorder().(*k8sEvents.FakeRecorder) if !ok { t.Fatal("the EventRecorder is expected to be of type FakeRecorder") @@ -2000,6 +1892,10 @@ func TestInitializeState(t *testing.T) { }, }} podLister.AddPod(orphaned) + // add an orphan foreign pod + orphanForeign := newPodHelper(podForeignName, "default", podForeignUID, nodeName2, "", v1.PodRunning) + orphanForeign.Spec.SchedulerName = "" + podLister.AddPod(orphanForeign) err := context.InitializeState() assert.NilError(t, err, "InitializeState failed") @@ -2011,20 +1907,14 @@ func TestInitializeState(t *testing.T) { assert.Equal(t, *pc.PreemptionPolicy, policy, "wrong preemption policy") assert.Equal(t, pc.Annotations[constants.AnnotationAllowPreemption], constants.True, "wrong allow-preemption value") - // verify occupied / capacity on node - capacity, occupied, ok := context.schedulerCache.SnapshotResources(nodeName1) - assert.Assert(t, ok, "Unable to retrieve node resources") - expectedCapacity := common.ParseResource("4", "10G") - assert.Equal(t, expectedCapacity.Resources["vcore"].Value, capacity.Resources["vcore"].Value, "wrong capacity vcore") - assert.Equal(t, expectedCapacity.Resources["memory"].Value, capacity.Resources["memory"].Value, "wrong capacity memory") - expectedOccupied := common.ParseResource("1500m", "2G") - assert.Equal(t, expectedOccupied.Resources["vcore"].Value, occupied.Resources["vcore"].Value, "wrong occupied vcore") - assert.Equal(t, expectedOccupied.Resources["memory"].Value, occupied.Resources["memory"].Value, "wrong occupied memory") - // check that pod orphan status is correct assert.Check(t, !context.schedulerCache.IsPodOrphaned(podName1), "pod1 should not be orphaned") assert.Check(t, !context.schedulerCache.IsPodOrphaned(podName2), "pod2 should not be orphaned") assert.Check(t, context.schedulerCache.IsPodOrphaned(podName3), "pod3 should be orphaned") + assert.Check(t, context.schedulerCache.IsPodOrphaned(podForeignUID), "foreign pod should be orphaned") + assert.Check(t, context.schedulerCache.GetPod("foreignRunning") != nil, "foreign running pod is not in the cache") + assert.Check(t, context.schedulerCache.GetPod("foreignPending") == nil, "foreign pending pod should not be in the cache") + assert.Check(t, !context.schedulerCache.IsPodOrphaned("foreignRunning"), "foreign running pod should not be orphaned") // pod1 is pending task1 := context.getTask(appID1, podName1) @@ -2041,6 +1931,145 @@ func TestInitializeState(t *testing.T) { assert.Assert(t, task3 == nil, "pod3 was found") } +func TestPodAdoption(t *testing.T) { + ctx, apiProvider := initContextAndAPIProviderForTest() + dispatcher.Start() + defer dispatcher.UnregisterAllEventHandlers() + defer dispatcher.Stop() + apiProvider.MockSchedulerAPIUpdateNodeFn(func(request *si.NodeRequest) error { + for _, node := range request.Nodes { + dispatcher.Dispatch(CachedSchedulerNodeEvent{ + NodeID: node.NodeID, + Event: NodeAccepted, + }) + } + return nil + }) + + // add pods w/o node & check orphan status + pod1 := newPodHelper(podName1, namespace, pod1UID, Host1, appID, v1.PodRunning) + pod2 := newPodHelper(podForeignName, namespace, podForeignUID, Host1, "", v1.PodRunning) + pod2.Spec.SchedulerName = "" + ctx.AddPod(pod1) + ctx.AddPod(pod2) + assert.Assert(t, ctx.schedulerCache.IsPodOrphaned(pod1UID), "Yunikorn pod is not orphan") + assert.Assert(t, ctx.schedulerCache.IsPodOrphaned(podForeignUID), "foreign pod is not orphan") + + // add node + node := v1.Node{ + ObjectMeta: apis.ObjectMeta{ + Name: Host1, + Namespace: "default", + UID: uid1, + }, + } + ctx.addNode(&node) + + // check that node has adopted the pods + assert.Assert(t, !ctx.schedulerCache.IsPodOrphaned(pod1UID), "Yunikorn pod has not been adopted") + assert.Assert(t, !ctx.schedulerCache.IsPodOrphaned(podForeignUID), "foreign pod has not been adopted") +} + +func TestOrphanPodUpdate(t *testing.T) { + ctx, apiProvider := initContextAndAPIProviderForTest() + dispatcher.Start() + defer dispatcher.UnregisterAllEventHandlers() + defer dispatcher.Stop() + var update atomic.Bool + apiProvider.MockSchedulerAPIUpdateAllocationFn(func(request *si.AllocationRequest) error { + update.Store(true) + return nil + }) + + // add pods w/o node & check orphan status + pod1 := newPodHelper(podName1, namespace, pod1UID, Host1, appID, v1.PodPending) + pod2 := newPodHelper(podForeignName, namespace, podForeignUID, Host1, "", v1.PodPending) + pod2.Spec.SchedulerName = "" + ctx.AddPod(pod1) + ctx.AddPod(pod2) + assert.Assert(t, ctx.schedulerCache.IsPodOrphaned(pod1UID), "Yunikorn pod is not orphan") + assert.Assert(t, ctx.schedulerCache.IsPodOrphaned(podForeignUID), "foreign pod is not orphan") + assert.Assert(t, ctx.getApplication(appID) == nil) + + // update orphan pods + pod1Upd := pod1.DeepCopy() + pod1Upd.Status.Phase = v1.PodRunning + pod2Upd := pod2.DeepCopy() + pod2Upd.Status.Phase = v1.PodRunning + + ctx.UpdatePod(pod1, pod1Upd) + assert.Assert(t, ctx.getApplication(appID) == nil) + assert.Assert(t, ctx.schedulerCache.IsPodOrphaned(pod1UID), "Yunikorn pod is not orphan after update") + assert.Equal(t, v1.PodRunning, ctx.schedulerCache.GetPod(pod1UID).Status.Phase, "pod has not been updated in the cache") + assert.Assert(t, !update.Load(), "allocation update has been triggered for Yunikorn orphan pod") + + ctx.UpdatePod(pod2, pod2Upd) + assert.Assert(t, ctx.schedulerCache.IsPodOrphaned(podForeignUID), "foreign pod is not orphan after update") + assert.Equal(t, v1.PodRunning, ctx.schedulerCache.GetPod(podForeignUID).Status.Phase, "foreign pod has not been updated in the cache") + assert.Assert(t, !update.Load(), "allocation update has been triggered for foreign orphan pod") +} + +func TestOrphanPodDelete(t *testing.T) { + ctx, apiProvider := initContextAndAPIProviderForTest() + dispatcher.Start() + defer dispatcher.UnregisterAllEventHandlers() + defer dispatcher.Stop() + var taskEventSent atomic.Bool + dispatcher.RegisterEventHandler("TestTaskHandler", dispatcher.EventTypeTask, func(obj interface{}) { + taskEventSent.Store(true) + }) + var request *si.AllocationRequest + apiProvider.MockSchedulerAPIUpdateAllocationFn(func(r *si.AllocationRequest) error { + request = r + return nil + }) + apiProvider.MockSchedulerAPIUpdateNodeFn(func(request *si.NodeRequest) error { + for _, node := range request.Nodes { + dispatcher.Dispatch(CachedSchedulerNodeEvent{ + NodeID: node.NodeID, + Event: NodeAccepted, + }) + } + return nil + }) + + // add pods w/o node & check orphan status + pod1 := newPodHelper(podName1, namespace, pod1UID, Host1, appID, v1.PodPending) + pod2 := newPodHelper(podForeignName, namespace, podForeignUID, Host1, "", v1.PodPending) + pod2.Spec.SchedulerName = "" + ctx.AddPod(pod1) + ctx.AddPod(pod2) + assert.Assert(t, ctx.schedulerCache.IsPodOrphaned(pod1UID), "Yunikorn pod is not orphan") + assert.Assert(t, ctx.schedulerCache.IsPodOrphaned(podForeignUID), "foreign pod is not orphan") + assert.Assert(t, ctx.getApplication(appID) == nil) + + // add a node with pod - this creates the application object + node := v1.Node{ + ObjectMeta: apis.ObjectMeta{ + Name: Host2, + Namespace: "default", + UID: uid1, + }, + } + ctx.addNode(&node) + pod3 := newPodHelper(podName2, namespace, pod2UID, Host2, appID, v1.PodPending) + ctx.AddPod(pod3) + assert.Assert(t, !ctx.schedulerCache.IsPodOrphaned(pod2UID), "Yunikorn pod is orphan") + assert.Assert(t, ctx.getApplication(appID) != nil) + + // delete orphan YK pod + ctx.DeletePod(pod1) + err := utils.WaitForCondition(taskEventSent.Load, 100*time.Millisecond, time.Second) + assert.NilError(t, err) + + // delete foreign pod + ctx.DeletePod(pod2) + assert.Assert(t, request != nil) + assert.Assert(t, request.Releases != nil) + assert.Equal(t, 1, len(request.Releases.AllocationsToRelease)) + assert.Equal(t, podForeignUID, request.Releases.AllocationsToRelease[0].AllocationKey) +} + func TestTaskRemoveOnCompletion(t *testing.T) { context := initContextForTest() dispatcher.Start() diff --git a/pkg/cache/external/scheduler_cache.go b/pkg/cache/external/scheduler_cache.go index a7ef737f2..fa4d86751 100644 --- a/pkg/cache/external/scheduler_cache.go +++ b/pkg/cache/external/scheduler_cache.go @@ -32,18 +32,9 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework" "github.com/apache/yunikorn-k8shim/pkg/client" - "github.com/apache/yunikorn-k8shim/pkg/common" "github.com/apache/yunikorn-k8shim/pkg/common/utils" "github.com/apache/yunikorn-k8shim/pkg/locking" "github.com/apache/yunikorn-k8shim/pkg/log" - "github.com/apache/yunikorn-scheduler-interface/lib/go/si" -) - -type UpdateType int - -const ( - AddOccupiedResource UpdateType = iota - SubOccupiedResource ) // SchedulerCache maintains some critical information about nodes and pods used for scheduling. @@ -59,8 +50,6 @@ const ( // is called in the plugin to signify completion of the allocation, it is removed. type SchedulerCache struct { nodesMap map[string]*framework.NodeInfo // node name to NodeInfo map - nodeCapacity map[string]*si.Resource // node name to node resource capacity - nodeOccupied map[string]*si.Resource // node name to node occupied resources podsMap map[string]*v1.Pod pcMap map[string]*schedulingv1.PriorityClass assignedPods map[string]string // map of pods to the node they are currently assigned to @@ -90,8 +79,6 @@ type taskBloomFilter struct { func NewSchedulerCache(clients *client.Clients) *SchedulerCache { cache := &SchedulerCache{ nodesMap: make(map[string]*framework.NodeInfo), - nodeCapacity: make(map[string]*si.Resource), - nodeOccupied: make(map[string]*si.Resource), podsMap: make(map[string]*v1.Pod), pcMap: make(map[string]*schedulingv1.PriorityClass), assignedPods: make(map[string]string), @@ -197,8 +184,6 @@ func (cache *SchedulerCache) updateNode(node *v1.Node) (*v1.Node, []*v1.Pod) { log.Log(log.ShimCacheExternal).Debug("Adding node to cache", zap.String("nodeName", node.Name)) nodeInfo = framework.NewNodeInfo() cache.nodesMap[node.Name] = nodeInfo - cache.nodeCapacity[node.Name] = common.GetNodeResource(&node.Status) - cache.nodeOccupied[node.Name] = common.NewResourceBuilder().Build() cache.nodesInfo = nil nodeInfo.SetNode(node) @@ -253,8 +238,6 @@ func (cache *SchedulerCache) removeNode(node *v1.Node) (*v1.Node, []*v1.Pod) { log.Log(log.ShimCacheExternal).Debug("Removing node from cache", zap.String("nodeName", node.Name)) delete(cache.nodesMap, node.Name) - delete(cache.nodeOccupied, node.Name) - delete(cache.nodeCapacity, node.Name) cache.nodesInfo = nil cache.nodesInfoPodsWithAffinity = nil cache.nodesInfoPodsWithReqAntiAffinity = nil @@ -263,72 +246,6 @@ func (cache *SchedulerCache) removeNode(node *v1.Node) (*v1.Node, []*v1.Pod) { return result, orphans } -func (cache *SchedulerCache) SnapshotResources(nodeName string) (capacity *si.Resource, occupied *si.Resource, ok bool) { - cache.lock.RLock() - defer cache.lock.RUnlock() - - occupied, ok1 := cache.nodeOccupied[nodeName] - capacity, ok2 := cache.nodeCapacity[nodeName] - if !ok1 || !ok2 { - log.Log(log.ShimCacheExternal).Warn("Unable to snapshot resources for node", zap.String("nodeName", nodeName)) - return nil, nil, false - } - return capacity, occupied, true -} - -func (cache *SchedulerCache) UpdateCapacity(nodeName string, resource *si.Resource) (capacity *si.Resource, occupied *si.Resource, ok bool) { - cache.lock.Lock() - defer cache.lock.Unlock() - - occupied, ok1 := cache.nodeOccupied[nodeName] - _, ok2 := cache.nodeCapacity[nodeName] - if !ok1 || !ok2 { - log.Log(log.ShimCacheExternal).Warn("Unable to update capacity for node", zap.String("nodeName", nodeName)) - return nil, nil, false - } - cache.nodeCapacity[nodeName] = resource - return resource, occupied, true -} - -func (cache *SchedulerCache) UpdateOccupiedResource(nodeName string, namespace string, podName string, resource *si.Resource, opt UpdateType) (node *v1.Node, capacity *si.Resource, occupied *si.Resource, ok bool) { - cache.lock.Lock() - defer cache.lock.Unlock() - - nodeInfo, ok1 := cache.nodesMap[nodeName] - occupied, ok2 := cache.nodeOccupied[nodeName] - capacity, ok3 := cache.nodeCapacity[nodeName] - if !ok1 || !ok2 || !ok3 { - log.Log(log.ShimCacheExternal).Warn("Unable to update occupied resources for node", - zap.String("nodeName", nodeName), - zap.String("namespace", namespace), - zap.String("podName", podName)) - return nil, nil, nil, false - } - node = nodeInfo.Node() - - switch opt { - case AddOccupiedResource: - log.Log(log.ShimCacheExternal).Info("Adding occupied resources to node", - zap.String("nodeID", nodeName), - zap.String("namespace", namespace), - zap.String("podName", podName), - zap.Stringer("occupied", resource)) - occupied = common.Add(occupied, resource) - cache.nodeOccupied[nodeName] = occupied - case SubOccupiedResource: - log.Log(log.ShimCacheExternal).Info("Subtracting occupied resources from node", - zap.String("nodeID", nodeName), - zap.String("namespace", namespace), - zap.String("podName", podName), - zap.Stringer("occupied", resource)) - occupied = common.Sub(occupied, resource) - cache.nodeOccupied[nodeName] = occupied - default: - // noop - } - return node, capacity, occupied, true -} - func (cache *SchedulerCache) GetPriorityClass(name string) *schedulingv1.PriorityClass { cache.lock.RLock() defer cache.lock.RUnlock() diff --git a/pkg/cache/external/scheduler_cache_test.go b/pkg/cache/external/scheduler_cache_test.go index c443491bf..f87464b6b 100644 --- a/pkg/cache/external/scheduler_cache_test.go +++ b/pkg/cache/external/scheduler_cache_test.go @@ -31,7 +31,6 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework" "github.com/apache/yunikorn-k8shim/pkg/client" - "github.com/apache/yunikorn-k8shim/pkg/common" ) const ( @@ -1097,90 +1096,6 @@ func TestUpdatePVCRefCounts(t *testing.T) { assert.Check(t, !cache.IsPVCUsedByPods(framework.GetNamespacedName(pod2.Namespace, pvcName2)), "pvc2 is in pvcRefCounts") } -func TestNodeResources(t *testing.T) { - cache := NewSchedulerCache(client.NewMockedAPIProvider(false).GetAPIs()) - resourceList := make(map[v1.ResourceName]resource.Quantity) - resourceList["memory"] = *resource.NewQuantity(1024*1000*1000, resource.DecimalSI) - resourceList["cpu"] = *resource.NewQuantity(10, resource.DecimalSI) - node := &v1.Node{ - ObjectMeta: apis.ObjectMeta{ - Name: host1, - Namespace: "default", - UID: nodeUID1, - }, - Status: v1.NodeStatus{ - Allocatable: resourceList, - }, - Spec: v1.NodeSpec{ - Unschedulable: false, - }, - } - cache.UpdateNode(node) - - // test snapshot with missing node - capacity, occupied, ok := cache.SnapshotResources("missing") - assert.Assert(t, !ok, "got result for missing host") - assert.Assert(t, capacity == nil, "got capacity for missing host") - assert.Assert(t, occupied == nil, "got occupied for missing host") - - // test snapshot with existing, unoccupied node - capacity, occupied, ok = cache.SnapshotResources(host1) - assert.Assert(t, ok, "no result for host1") - assert.Equal(t, int64(1024*1000*1000), capacity.Resources["memory"].Value, "wrong memory capacity for host1") - assert.Equal(t, int64(10*1000), capacity.Resources["vcore"].Value, "wrong vcore capacity for host1") - assert.Equal(t, 0, len(occupied.Resources), "non-empty occupied resources") - - res1 := common.NewResourceBuilder().AddResource("memory", 2048*1000*1000).AddResource("vcore", 20000).Build() - res2 := common.NewResourceBuilder().AddResource("memory", 512*1000*1000).AddResource("vcore", 5000).Build() - - // update capacity with missing node - capacity, occupied, ok = cache.UpdateCapacity("missing", res1) - assert.Assert(t, !ok, "got result for missing host") - assert.Assert(t, capacity == nil, "got capacity for missing host") - assert.Assert(t, occupied == nil, "got occupied for missing host") - - // update capacity with real node - capacity, occupied, ok = cache.UpdateCapacity(host1, res1) - assert.Assert(t, ok, "no result for host1") - assert.Equal(t, int64(2048*1000*1000), capacity.Resources["memory"].Value, "wrong memory capacity for host1") - assert.Equal(t, int64(20*1000), capacity.Resources["vcore"].Value, "wrong vcore capacity for host1") - assert.Equal(t, 0, len(occupied.Resources), "non-empty occupied resources") - - // update occupied resources with missing node - node, capacity, occupied, ok = cache.UpdateOccupiedResource("missing", "default", "podName", res2, AddOccupiedResource) - assert.Assert(t, !ok, "got result for missing host") - assert.Assert(t, node == nil, "got node for missing host") - assert.Assert(t, capacity == nil, "got capacity for missing host") - assert.Assert(t, occupied == nil, "got occupied for missing host") - - // update occupied resources with real node - node, capacity, occupied, ok = cache.UpdateOccupiedResource(host1, "default", "podName", res2, AddOccupiedResource) - assert.Assert(t, ok, "no result for host1") - assert.Equal(t, host1, node.Name, "wrong host name") - assert.Equal(t, int64(2048*1000*1000), capacity.Resources["memory"].Value, "wrong memory capacity for host1") - assert.Equal(t, int64(20*1000), capacity.Resources["vcore"].Value, "wrong vcore capacity for host1") - assert.Equal(t, int64(512*1000*1000), occupied.Resources["memory"].Value, "wrong memory occupied for host1") - assert.Equal(t, int64(5*1000), occupied.Resources["vcore"].Value, "wrong vcore occupied for host1") - - // retrieve snapshot again - capacity, occupied, ok = cache.SnapshotResources(host1) - assert.Assert(t, ok, "no result for host1") - assert.Equal(t, host1, node.Name, "wrong host name") - assert.Equal(t, int64(2048*1000*1000), capacity.Resources["memory"].Value, "wrong memory capacity for host1") - assert.Equal(t, int64(20*1000), capacity.Resources["vcore"].Value, "wrong vcore capacity for host1") - assert.Equal(t, int64(512*1000*1000), occupied.Resources["memory"].Value, "wrong memory occupied for host1") - assert.Equal(t, int64(5*1000), occupied.Resources["vcore"].Value, "wrong vcore occupied for host1") - - // subtract occupied resources with real node - node, capacity, occupied, ok = cache.UpdateOccupiedResource(host1, "default", "podName", res2, SubOccupiedResource) - assert.Assert(t, ok, "no result for host1") - assert.Equal(t, host1, node.Name, "wrong host name") - assert.Equal(t, int64(2048*1000*1000), capacity.Resources["memory"].Value, "wrong memory capacity for host1") - assert.Equal(t, int64(20*1000), capacity.Resources["vcore"].Value, "wrong vcore capacity for host1") - assert.Equal(t, int64(0), occupied.Resources["memory"].Value, "wrong memory occupied for host1") - assert.Equal(t, int64(0), occupied.Resources["vcore"].Value, "wrong vcore occupied for host1") -} - func TestOrphanPods(t *testing.T) { cache := NewSchedulerCache(client.NewMockedAPIProvider(false).GetAPIs()) resourceList := make(map[v1.ResourceName]resource.Quantity) diff --git a/pkg/cache/metadata_test.go b/pkg/cache/metadata_test.go index d4760e2d1..b5d45e050 100644 --- a/pkg/cache/metadata_test.go +++ b/pkg/cache/metadata_test.go @@ -107,8 +107,6 @@ func TestGetTaskMetadata(t *testing.T) { } func TestGetAppMetadata(t *testing.T) { //nolint:funlen - conf.GetSchedulerConf().SetTestMode(true) - defer utils.SetPluginMode(false) defer func() { conf.GetSchedulerConf().GenerateUniqueAppIds = false }() utils.SetPluginMode(false) diff --git a/pkg/cache/task_test.go b/pkg/cache/task_test.go index d1fcc3369..864cd7e87 100644 --- a/pkg/cache/task_test.go +++ b/pkg/cache/task_test.go @@ -34,7 +34,6 @@ import ( "github.com/apache/yunikorn-k8shim/pkg/common/constants" "github.com/apache/yunikorn-k8shim/pkg/common/events" "github.com/apache/yunikorn-k8shim/pkg/common/utils" - "github.com/apache/yunikorn-k8shim/pkg/conf" "github.com/apache/yunikorn-k8shim/pkg/locking" "github.com/apache/yunikorn-scheduler-interface/lib/go/si" @@ -521,7 +520,6 @@ func TestHandleSubmitTaskEvent(t *testing.T) { time: int64(0), lock: &locking.RWMutex{}, } - conf.GetSchedulerConf().SetTestMode(true) mr := events.NewMockedRecorder() mr.OnEventf = func() { rt.lock.Lock() @@ -621,7 +619,6 @@ func TestSimultaneousTaskCompleteAndAllocate(t *testing.T) { mockedAPIProvider, ok := mockedContext.apiProvider.(*client.MockedAPIProvider) assert.Equal(t, ok, true) - conf.GetSchedulerConf().SetTestMode(true) resources := make(map[v1.ResourceName]resource.Quantity) containers := make([]v1.Container, 0) containers = append(containers, v1.Container{ diff --git a/pkg/common/constants/constants.go b/pkg/common/constants/constants.go index 5a848b4af..09d9ea139 100644 --- a/pkg/common/constants/constants.go +++ b/pkg/common/constants/constants.go @@ -66,6 +66,7 @@ const SchedulerName = "yunikorn" // OwnerReferences const DaemonSetType = "DaemonSet" +const NodeKind = "Node" // Gang scheduling const PlaceholderContainerImage = "registry.k8s.io/pause:3.7" diff --git a/pkg/common/events/recorder.go b/pkg/common/events/recorder.go index 48a06fa71..cde9a64f2 100644 --- a/pkg/common/events/recorder.go +++ b/pkg/common/events/recorder.go @@ -19,43 +19,22 @@ package events import ( - "sync" + "sync/atomic" - "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/events" - - "github.com/apache/yunikorn-k8shim/pkg/client" - "github.com/apache/yunikorn-k8shim/pkg/common/constants" - "github.com/apache/yunikorn-k8shim/pkg/conf" - "github.com/apache/yunikorn-k8shim/pkg/locking" ) -var eventRecorder events.EventRecorder = events.NewFakeRecorder(1024) -var once sync.Once -var lock locking.RWMutex +var eventRecorder atomic.Pointer[events.EventRecorder] + +func init() { + r := events.EventRecorder(NewMockedRecorder()) + eventRecorder.Store(&r) +} func GetRecorder() events.EventRecorder { - lock.Lock() - defer lock.Unlock() - once.Do(func() { - // note, the initiation of the event recorder requires on a workable Kubernetes client, - // in test mode we should skip this and just use a fake recorder instead. - configs := conf.GetSchedulerConf() - if !configs.IsTestMode() { - k8sClient := client.NewKubeClient(configs.KubeConfig) - eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{ - Interface: k8sClient.GetClientSet().EventsV1()}) - eventBroadcaster.StartRecordingToSink(make(<-chan struct{})) - eventRecorder = eventBroadcaster.NewRecorder(scheme.Scheme, constants.SchedulerName) - } - }) - - return eventRecorder + return *eventRecorder.Load() } func SetRecorder(recorder events.EventRecorder) { - lock.Lock() - defer lock.Unlock() - eventRecorder = recorder - once.Do(func() {}) // make sure Do() doesn't fire elsewhere + eventRecorder.Store(&recorder) } diff --git a/pkg/common/events/recorder_test.go b/pkg/common/events/recorder_test.go index 7068ad262..b10e1f585 100644 --- a/pkg/common/events/recorder_test.go +++ b/pkg/common/events/recorder_test.go @@ -23,15 +23,10 @@ import ( "testing" "gotest.tools/v3/assert" - - "github.com/apache/yunikorn-k8shim/pkg/conf" ) func TestInit(t *testing.T) { // simply test the get won't fail - // which means the get function honors the testMode and - // skips initiating a real event recorder - conf.GetSchedulerConf().SetTestMode(true) recorder := GetRecorder() - assert.Equal(t, reflect.TypeOf(recorder).String(), "*events.FakeRecorder") + assert.Equal(t, reflect.TypeOf(recorder).String(), "*events.MockedRecorder") } diff --git a/pkg/common/si_helper.go b/pkg/common/si_helper.go index d94a708cf..272e18a18 100644 --- a/pkg/common/si_helper.go +++ b/pkg/common/si_helper.go @@ -114,6 +114,33 @@ func CreateAllocationForTask(appID, taskID, nodeID string, resource *si.Resource } } +func CreateAllocationForForeignPod(pod *v1.Pod) *si.AllocationRequest { + podType := common.AllocTypeDefault + for _, ref := range pod.OwnerReferences { + if ref.Kind == constants.NodeKind { + podType = common.AllocTypeStatic + break + } + } + + allocation := si.Allocation{ + AllocationTags: map[string]string{ + common.Foreign: podType, + }, + AllocationKey: string(pod.UID), + ResourcePerAlloc: GetPodResource(pod), + Priority: CreatePriorityForTask(pod), + NodeID: pod.Spec.NodeName, + } + + allocation.AllocationTags[common.CreationTime] = strconv.FormatInt(pod.CreationTimestamp.Unix(), 10) + + return &si.AllocationRequest{ + Allocations: []*si.Allocation{&allocation}, + RmID: conf.GetSchedulerConf().ClusterID, + } +} + func GetTerminationTypeFromString(terminationTypeStr string) si.TerminationType { if v, ok := si.TerminationType_value[terminationTypeStr]; ok { return si.TerminationType(v) @@ -141,13 +168,31 @@ func CreateReleaseRequestForTask(appID, taskID, partition string, terminationTyp } } -// CreateUpdateRequestForUpdatedNode builds a NodeRequest for capacity and occupied resource updates -func CreateUpdateRequestForUpdatedNode(nodeID string, capacity *si.Resource, occupied *si.Resource) *si.NodeRequest { +func CreateReleaseRequestForForeignPod(uid, partition string) *si.AllocationRequest { + allocToRelease := make([]*si.AllocationRelease, 1) + allocToRelease[0] = &si.AllocationRelease{ + AllocationKey: uid, + PartitionName: partition, + TerminationType: si.TerminationType_STOPPED_BY_RM, + Message: "pod terminated", + } + + releaseRequest := si.AllocationReleasesRequest{ + AllocationsToRelease: allocToRelease, + } + + return &si.AllocationRequest{ + Releases: &releaseRequest, + RmID: conf.GetSchedulerConf().ClusterID, + } +} + +// CreateUpdateRequestForUpdatedNode builds a NodeRequest for capacity updates +func CreateUpdateRequestForUpdatedNode(nodeID string, capacity *si.Resource) *si.NodeRequest { nodeInfo := &si.NodeInfo{ NodeID: nodeID, Attributes: map[string]string{}, SchedulableResource: capacity, - OccupiedResource: occupied, Action: si.NodeInfo_UPDATE, } diff --git a/pkg/common/si_helper_test.go b/pkg/common/si_helper_test.go index 9ccf619a9..9943f55cd 100644 --- a/pkg/common/si_helper_test.go +++ b/pkg/common/si_helper_test.go @@ -19,9 +19,11 @@ package common import ( "testing" + "time" "gotest.tools/v3/assert" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" apis "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/apache/yunikorn-scheduler-interface/lib/go/common" @@ -209,12 +211,10 @@ func TestCreateTagsForTask(t *testing.T) { func TestCreateUpdateRequestForUpdatedNode(t *testing.T) { capacity := NewResourceBuilder().AddResource(common.Memory, 200).AddResource(common.CPU, 2).Build() - occupied := NewResourceBuilder().AddResource(common.Memory, 50).AddResource(common.CPU, 1).Build() - request := CreateUpdateRequestForUpdatedNode(nodeID, capacity, occupied) + request := CreateUpdateRequestForUpdatedNode(nodeID, capacity) assert.Equal(t, len(request.Nodes), 1) assert.Equal(t, request.Nodes[0].NodeID, nodeID) assert.Equal(t, request.Nodes[0].SchedulableResource, capacity) - assert.Equal(t, request.Nodes[0].OccupiedResource, occupied) assert.Equal(t, len(request.Nodes[0].Attributes), 0) } @@ -415,3 +415,79 @@ func TestGetTerminationTypeFromString(t *testing.T) { }) } } + +func TestCreateAllocationForForeignPod(t *testing.T) { + cResources := make(map[v1.ResourceName]resource.Quantity) + cResources[v1.ResourceMemory] = resource.MustParse("500M") + cResources[v1.ResourceCPU] = resource.MustParse("1") + var containers []v1.Container + containers = append(containers, v1.Container{ + Name: "container-01", + Resources: v1.ResourceRequirements{ + Requests: cResources, + }, + }) + + pod := &v1.Pod{ + TypeMeta: apis.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + ObjectMeta: apis.ObjectMeta{ + Name: "test", + UID: "UID-00001", + CreationTimestamp: apis.Time{ + Time: time.Unix(1, 0), + }, + }, + Spec: v1.PodSpec{ + Containers: containers, + NodeName: nodeID, + }, + } + + allocReq := CreateAllocationForForeignPod(pod) + assert.Equal(t, 1, len(allocReq.Allocations)) + assert.Equal(t, "mycluster", allocReq.RmID) + assert.Assert(t, allocReq.Releases == nil) + alloc := allocReq.Allocations[0] + assert.Equal(t, nodeID, alloc.NodeID) + assert.Equal(t, "UID-00001", alloc.AllocationKey) + assert.Equal(t, int32(0), alloc.Priority) + res := alloc.ResourcePerAlloc + assert.Equal(t, 3, len(res.Resources)) + assert.Equal(t, int64(500000000), res.Resources["memory"].Value) + assert.Equal(t, int64(1000), res.Resources["vcore"].Value) + assert.Equal(t, int64(1), res.Resources["pods"].Value) + assert.Equal(t, 2, len(alloc.AllocationTags)) + assert.Equal(t, "1", alloc.AllocationTags[common.CreationTime]) + assert.Equal(t, common.AllocTypeDefault, alloc.AllocationTags[common.Foreign]) + + // set priority & change pod type to static + prio := int32(12) + pod.Spec.Priority = &prio + pod.OwnerReferences = []apis.OwnerReference{ + { + Kind: "Node", + }, + } + allocReq = CreateAllocationForForeignPod(pod) + assert.Equal(t, 2, len(alloc.AllocationTags)) + alloc = allocReq.Allocations[0] + assert.Equal(t, common.AllocTypeStatic, alloc.AllocationTags[common.Foreign]) + assert.Equal(t, int32(12), alloc.Priority) +} + +func TestCreateReleaseRequestForForeignPod(t *testing.T) { + allocReq := CreateReleaseRequestForForeignPod("UID-0001", "partition") + + assert.Assert(t, allocReq.Releases != nil) + assert.Equal(t, "mycluster", allocReq.RmID) + releaseReq := allocReq.Releases + assert.Equal(t, 1, len(releaseReq.AllocationsToRelease)) + release := releaseReq.AllocationsToRelease[0] + assert.Equal(t, "UID-0001", release.AllocationKey) + assert.Equal(t, "partition", release.PartitionName) + assert.Equal(t, si.TerminationType_STOPPED_BY_RM, release.TerminationType) + assert.Equal(t, "pod terminated", release.Message) +} diff --git a/pkg/conf/schedulerconf.go b/pkg/conf/schedulerconf.go index 2e941213c..54a500a89 100644 --- a/pkg/conf/schedulerconf.go +++ b/pkg/conf/schedulerconf.go @@ -117,7 +117,6 @@ type SchedulerConf struct { Interval time.Duration `json:"schedulingIntervalSecond"` KubeConfig string `json:"absoluteKubeConfigFilePath"` VolumeBindTimeout time.Duration `json:"volumeBindTimeout"` - TestMode bool `json:"testMode"` EventChannelCapacity int `json:"eventChannelCapacity"` DispatchTimeout time.Duration `json:"dispatchTimeout"` KubeQPS int `json:"kubeQPS"` @@ -145,7 +144,6 @@ func (conf *SchedulerConf) Clone() *SchedulerConf { Interval: conf.Interval, KubeConfig: conf.KubeConfig, VolumeBindTimeout: conf.VolumeBindTimeout, - TestMode: conf.TestMode, EventChannelCapacity: conf.EventChannelCapacity, DispatchTimeout: conf.DispatchTimeout, KubeQPS: conf.KubeQPS, @@ -257,18 +255,6 @@ func SetSchedulerConf(conf *SchedulerConf) { confHolder.Store(conf) } -func (conf *SchedulerConf) SetTestMode(testMode bool) { - conf.Lock() - defer conf.Unlock() - conf.TestMode = testMode -} - -func (conf *SchedulerConf) IsTestMode() bool { - conf.RLock() - defer conf.RUnlock() - return conf.TestMode -} - func (conf *SchedulerConf) IsConfigReloadable() bool { conf.RLock() defer conf.RUnlock() @@ -321,7 +307,6 @@ func CreateDefaultConfig() *SchedulerConf { Interval: DefaultSchedulingInterval, KubeConfig: GetDefaultKubeConfigPath(), VolumeBindTimeout: DefaultVolumeBindTimeout, - TestMode: false, EventChannelCapacity: DefaultEventChannelCapacity, DispatchTimeout: DefaultDispatchTimeout, KubeQPS: DefaultKubeQPS, diff --git a/pkg/plugin/predicates/predicate_manager_test.go b/pkg/plugin/predicates/predicate_manager_test.go index fe17db7a8..9e87c91d4 100644 --- a/pkg/plugin/predicates/predicate_manager_test.go +++ b/pkg/plugin/predicates/predicate_manager_test.go @@ -45,7 +45,6 @@ import ( "k8s.io/kubernetes/pkg/util/taints" "github.com/apache/yunikorn-k8shim/pkg/client" - "github.com/apache/yunikorn-k8shim/pkg/conf" "github.com/apache/yunikorn-k8shim/pkg/log" "github.com/apache/yunikorn-k8shim/pkg/plugin/support" ) @@ -56,7 +55,6 @@ var ( ) func TestPreemptionPredicatesEmpty(t *testing.T) { - conf.GetSchedulerConf().SetTestMode(true) clientSet := clientSet() informerFactory := informerFactory(clientSet) lister := lister() @@ -74,7 +72,6 @@ func TestPreemptionPredicatesEmpty(t *testing.T) { } func TestPreemptionPredicates(t *testing.T) { - conf.GetSchedulerConf().SetTestMode(true) clientSet := clientSet() informerFactory := informerFactory(clientSet) lister := lister() @@ -127,7 +124,6 @@ func TestPreemptionPredicates(t *testing.T) { } func TestEventsToRegister(t *testing.T) { - conf.GetSchedulerConf().SetTestMode(true) clientSet := clientSet() informerFactory := informerFactory(clientSet) lister := lister() @@ -154,7 +150,6 @@ func TestEventsToRegister(t *testing.T) { } func TestPodFitsHost(t *testing.T) { - conf.GetSchedulerConf().SetTestMode(true) clientSet := clientSet() informerFactory := informerFactory(clientSet) lister := lister() diff --git a/pkg/shim/scheduler.go b/pkg/shim/scheduler.go index ebf3fb118..291083f4f 100644 --- a/pkg/shim/scheduler.go +++ b/pkg/shim/scheduler.go @@ -19,15 +19,20 @@ package shim import ( + ctx "context" "time" "go.uber.org/zap" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/scheme" + k8events "k8s.io/client-go/tools/events" "github.com/apache/yunikorn-k8shim/pkg/cache" "github.com/apache/yunikorn-k8shim/pkg/client" + "github.com/apache/yunikorn-k8shim/pkg/common/constants" + "github.com/apache/yunikorn-k8shim/pkg/common/events" "github.com/apache/yunikorn-k8shim/pkg/common/utils" "github.com/apache/yunikorn-k8shim/pkg/conf" "github.com/apache/yunikorn-k8shim/pkg/dispatcher" @@ -67,6 +72,18 @@ func NewShimScheduler(scheduler api.SchedulerAPI, configs *conf.SchedulerConf, b apiFactory := client.NewAPIFactory(scheduler, informerFactory, configs, false) context := cache.NewContextWithBootstrapConfigMaps(apiFactory, bootstrapConfigMaps) rmCallback := cache.NewAsyncRMCallback(context) + + eventBroadcaster := k8events.NewBroadcaster(&k8events.EventSinkImpl{ + Interface: kubeClient.GetClientSet().EventsV1()}) + err := eventBroadcaster.StartRecordingToSinkWithContext(ctx.Background()) + if err != nil { + log.Log(log.Shim).Error("Could not create event broadcaster", + zap.Error(err)) + } else { + eventRecorder := eventBroadcaster.NewRecorder(scheme.Scheme, constants.SchedulerName) + events.SetRecorder(eventRecorder) + } + return newShimSchedulerInternal(context, apiFactory, rmCallback) } diff --git a/pkg/shim/scheduler_mock_test.go b/pkg/shim/scheduler_mock_test.go index b67746d72..7b8de8556 100644 --- a/pkg/shim/scheduler_mock_test.go +++ b/pkg/shim/scheduler_mock_test.go @@ -59,7 +59,6 @@ type MockScheduler struct { } func (fc *MockScheduler) init() { - conf.GetSchedulerConf().SetTestMode(true) fc.stopChan = make(chan struct{}) serviceContext := entrypoint.StartAllServices() fc.rmProxy = serviceContext.RMProxy @@ -342,7 +341,6 @@ func createUpdateRequestForNewNode(nodeID string, nodeLabels map[string]string, nodeInfo := &si.NodeInfo{ NodeID: nodeID, SchedulableResource: capacity, - OccupiedResource: occupied, Attributes: map[string]string{ constants.DefaultNodeAttributeHostNameKey: nodeID, constants.DefaultNodeAttributeRackNameKey: constants.DefaultRackName, diff --git a/scripts/run-e2e-tests.sh b/scripts/run-e2e-tests.sh index 53dfc8da8..ede07f976 100755 --- a/scripts/run-e2e-tests.sh +++ b/scripts/run-e2e-tests.sh @@ -198,10 +198,10 @@ Examples: ${NAME} -a test -n yk8s -v kindest/node:v1.28.13 ${NAME} -a test -n yk8s -v kindest/node:v1.29.8 ${NAME} -a test -n yk8s -v kindest/node:v1.30.4 - ${NAME} -a test -n yk8s -v kindest/node:v1.31.0 + ${NAME} -a test -n yk8s -v kindest/node:v1.31.1 Use a local helm chart path: - ${NAME} -a test -n yk8s -v kindest/node:v1.31.0 -p ../yunikorn-release/helm-charts/yunikorn + ${NAME} -a test -n yk8s -v kindest/node:v1.31.1 -p ../yunikorn-release/helm-charts/yunikorn EOF } diff --git a/test/e2e/foreign_pod/foreign_pod_suite_test.go b/test/e2e/foreign_pod/foreign_pod_suite_test.go new file mode 100644 index 000000000..e5a87cae7 --- /dev/null +++ b/test/e2e/foreign_pod/foreign_pod_suite_test.go @@ -0,0 +1,89 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package foreign_pod + +import ( + "path/filepath" + "runtime" + "testing" + + "github.com/onsi/ginkgo/v2" + "github.com/onsi/ginkgo/v2/reporters" + "github.com/onsi/gomega" + v1 "k8s.io/api/core/v1" + + "github.com/apache/yunikorn-k8shim/test/e2e/framework/configmanager" + "github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/common" + "github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/k8s" + "github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/yunikorn" +) + +func init() { + configmanager.YuniKornTestConfig.ParseFlags() +} + +func TestForeignPodHandling(t *testing.T) { + ginkgo.ReportAfterSuite("TestForeignPodHandling", func(report ginkgo.Report) { + err := common.CreateJUnitReportDir() + Ω(err).NotTo(gomega.HaveOccurred()) + err = reporters.GenerateJUnitReportWithConfig( + report, + filepath.Join(configmanager.YuniKornTestConfig.LogDir, "TEST-foreign_pod_junit.xml"), + reporters.JunitReportConfig{OmitSpecLabels: true}, + ) + Ω(err).NotTo(HaveOccurred()) + }) + gomega.RegisterFailHandler(ginkgo.Fail) + ginkgo.RunSpecs(t, "TestForeignPodHandling", ginkgo.Label("TestForeignPodHandling")) +} + +var suiteName string +var oldConfigMap = new(v1.ConfigMap) +var kClient = k8s.KubeCtl{} //nolint + +var _ = BeforeSuite(func() { + _, filename, _, _ := runtime.Caller(0) + suiteName = common.GetSuiteName(filename) + yunikorn.EnsureYuniKornConfigsPresent() + yunikorn.UpdateConfigMapWrapper(oldConfigMap, "fifo") +}) + +var _ = AfterSuite(func() { + yunikorn.RestoreConfigMapWrapper(oldConfigMap) +}) + +// Declarations for Ginkgo DSL +var Describe = ginkgo.Describe + +var It = ginkgo.It +var PIt = ginkgo.PIt +var By = ginkgo.By +var BeforeSuite = ginkgo.BeforeSuite +var AfterSuite = ginkgo.AfterSuite +var BeforeEach = ginkgo.BeforeEach +var AfterEach = ginkgo.AfterEach +var DescribeTable = ginkgo.Describe +var Entry = ginkgo.Entry + +// Declarations for Gomega Matchers +var Equal = gomega.Equal +var BeNumerically = gomega.BeNumerically +var Ω = gomega.Expect +var BeNil = gomega.BeNil +var HaveOccurred = gomega.HaveOccurred diff --git a/test/e2e/foreign_pod/foreign_pod_test.go b/test/e2e/foreign_pod/foreign_pod_test.go new file mode 100644 index 000000000..932afbfd5 --- /dev/null +++ b/test/e2e/foreign_pod/foreign_pod_test.go @@ -0,0 +1,78 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package foreign_pod + +import ( + "fmt" + + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + + tests "github.com/apache/yunikorn-k8shim/test/e2e" + "github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/k8s" + "github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/yunikorn" +) + +const kubeSystem = "kube-system" + +var _ = Describe("", func() { + It("Verify foreign pod tracking", func() { + By("Retrieving foreign pods from kube-system") + kClient = k8s.KubeCtl{} + Ω(kClient.SetClient()).To(BeNil()) + podList, err := kClient.GetPods(kubeSystem) + Ω(err).NotTo(gomega.HaveOccurred()) + + kubeUIDs := make(map[string]bool) + kubeNodes := make(map[string]string) + for _, pod := range podList.Items { + kubeUIDs[string(pod.UID)] = true + kubeNodes[string(pod.UID)] = pod.Spec.NodeName + fmt.Fprintf(ginkgo.GinkgoWriter, "pod: %s, uid: %s, node: %s\n", pod.Name, pod.UID, pod.Spec.NodeName) + } + + // retrieve foreign pod info + By("Retrieving foreign allocations") + var restClient yunikorn.RClient + nodes, err := restClient.GetNodes("default") + Ω(err).NotTo(gomega.HaveOccurred()) + foreignAllocs := make(map[string]bool) + foreignNodes := make(map[string]string) + for _, n := range *nodes { + fmt.Fprintf(ginkgo.GinkgoWriter, "Checking node %s\n", n.NodeID) + if len(n.ForeignAllocations) > 0 { + for _, falloc := range n.ForeignAllocations { + fmt.Fprintf(ginkgo.GinkgoWriter, "Found allocation %s on node %s\n", falloc.AllocationKey, falloc.NodeID) + foreignAllocs[falloc.AllocationKey] = true + foreignNodes[falloc.AllocationKey] = falloc.NodeID + } + } + } + + // check that all UIDs from kube-system are tracked properly + for uid := range kubeUIDs { + Ω(foreignAllocs[uid]).To(Equal(true), "pod %s from kube-system is not tracked in Yunikorn", uid) + Ω(foreignNodes[uid]).To(Equal(kubeNodes[uid]), "pod %s is tracked under incorrect node", uid) + } + }) + + ginkgo.AfterEach(func() { + tests.DumpClusterInfoIfSpecFailed(suiteName, []string{kubeSystem}) + }) +}) diff --git a/test/e2e/framework/helpers/k8s/k8s_utils.go b/test/e2e/framework/helpers/k8s/k8s_utils.go index 77a472f57..ef40becf4 100644 --- a/test/e2e/framework/helpers/k8s/k8s_utils.go +++ b/test/e2e/framework/helpers/k8s/k8s_utils.go @@ -17,20 +17,19 @@ package k8s import ( - "bytes" "context" "errors" "fmt" "net/http" "net/url" "os" - "os/exec" "path/filepath" "strings" "time" "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" + clientcmdapi "k8s.io/client-go/tools/clientcmd/api" "go.uber.org/zap" appsv1 "k8s.io/api/apps/v1" @@ -1091,14 +1090,6 @@ func (k *KubeCtl) CreateSecret(secret *v1.Secret, namespace string) (*v1.Secret, return k.clientSet.CoreV1().Secrets(namespace).Create(context.TODO(), secret, metav1.CreateOptions{}) } -func GetSecretObj(yamlPath string) (*v1.Secret, error) { - o, err := common.Yaml2Obj(yamlPath) - if err != nil { - return nil, err - } - return o.(*v1.Secret), err -} - func (k *KubeCtl) CreateServiceAccount(accountName string, namespace string) (*v1.ServiceAccount, error) { return k.clientSet.CoreV1().ServiceAccounts(namespace).Create(context.TODO(), &v1.ServiceAccount{ ObjectMeta: metav1.ObjectMeta{Name: accountName}, @@ -1383,21 +1374,6 @@ func (k *KubeCtl) isNumJobPodsInDesiredState(jobName string, namespace string, n } } -func ApplyYamlWithKubectl(path, namespace string) error { - cmd := exec.Command("kubectl", "apply", "-f", path, "-n", namespace) - var stderr bytes.Buffer - cmd.Stderr = &stderr - // if err != nil, isn't represent yaml format error. - // it only represent the cmd.Run() fail. - err := cmd.Run() - // if yaml format error, errStr will show the detail - errStr := stderr.String() - if err != nil && errStr != "" { - return fmt.Errorf("apply fail with %s", errStr) - } - return nil -} - func (k *KubeCtl) GetNodes() (*v1.NodeList, error) { return k.clientSet.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{}) } @@ -1779,3 +1755,87 @@ func (k *KubeCtl) DeleteStorageClass(scName string) error { } return nil } + +func (k *KubeCtl) GetSecrets(namespace string) (*v1.SecretList, error) { + return k.clientSet.CoreV1().Secrets(namespace).List(context.TODO(), metav1.ListOptions{}) +} + +// GetSecretValue retrieves the value for a specific key from a Kubernetes secret. +func (k *KubeCtl) GetSecretValue(namespace, secretName, key string) (string, error) { + err := k.WaitForSecret(namespace, secretName, 5*time.Second) + if err != nil { + return "", err + } + secret, err := k.GetSecret(namespace, secretName) + if err != nil { + return "", err + } + // Check if the key exists in the secret + value, ok := secret.Data[key] + if !ok { + return "", fmt.Errorf("key %s not found in secret %s", key, secretName) + } + return string(value), nil +} + +func (k *KubeCtl) GetSecret(namespace, secretName string) (*v1.Secret, error) { + secret, err := k.clientSet.CoreV1().Secrets(namespace).Get(context.TODO(), secretName, metav1.GetOptions{}) + if err != nil { + return nil, err + } + return secret, nil +} + +func (k *KubeCtl) WaitForSecret(namespace, secretName string, timeout time.Duration) error { + var cond wait.ConditionFunc // nolint:gosimple + cond = func() (done bool, err error) { + secret, err := k.GetSecret(namespace, secretName) + if err != nil { + return false, err + } + if secret != nil { + return true, nil + } + return false, nil + } + return wait.PollUntilContextTimeout(context.TODO(), time.Second, timeout, false, cond.WithContext()) +} + +func WriteConfigToFile(config *rest.Config, kubeconfigPath string) error { + // Build the kubeconfig API object from the rest.Config + kubeConfig := &clientcmdapi.Config{ + Clusters: map[string]*clientcmdapi.Cluster{ + "default-cluster": { + Server: config.Host, + CertificateAuthorityData: config.CAData, + InsecureSkipTLSVerify: config.Insecure, + }, + }, + AuthInfos: map[string]*clientcmdapi.AuthInfo{ + "default-auth": { + Token: config.BearerToken, + }, + }, + Contexts: map[string]*clientcmdapi.Context{ + "default-context": { + Cluster: "default-cluster", + AuthInfo: "default-auth", + }, + }, + CurrentContext: "default-context", + } + + // Ensure the directory where the file is being written exists + err := os.MkdirAll(filepath.Dir(kubeconfigPath), os.ModePerm) + if err != nil { + return fmt.Errorf("failed to create directory for kubeconfig file: %v", err) + } + + // Write the kubeconfig to the specified file + err = clientcmd.WriteToFile(*kubeConfig, kubeconfigPath) + if err != nil { + return fmt.Errorf("failed to write kubeconfig to file: %v", err) + } + + return nil +} diff --git a/test/e2e/user_group_limit/user_group_limit_test.go b/test/e2e/user_group_limit/user_group_limit_test.go index 7eb3d016b..45ba70c13 100644 --- a/test/e2e/user_group_limit/user_group_limit_test.go +++ b/test/e2e/user_group_limit/user_group_limit_test.go @@ -19,31 +19,45 @@ package user_group_limit_test import ( + "context" "encoding/json" "fmt" "net/url" + "os" + "path/filepath" "runtime" + "strings" "time" "github.com/apache/yunikorn-core/pkg/common/configs" "github.com/apache/yunikorn-core/pkg/common/resources" "github.com/apache/yunikorn-core/pkg/webservice/dao" + amCommon "github.com/apache/yunikorn-k8shim/pkg/admission/common" amconf "github.com/apache/yunikorn-k8shim/pkg/admission/conf" "github.com/apache/yunikorn-k8shim/pkg/common/constants" + tests "github.com/apache/yunikorn-k8shim/test/e2e" + "github.com/apache/yunikorn-k8shim/test/e2e/framework/configmanager" "github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/common" "github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/k8s" "github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/yunikorn" + siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common" "github.com/apache/yunikorn-scheduler-interface/lib/go/si" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" v1 "k8s.io/api/core/v1" + rbacv1 "k8s.io/api/rbac/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "k8s.io/client-go/kubernetes" ) type TestType int @@ -103,8 +117,11 @@ var _ = ginkgo.BeforeEach(func() { var _ = ginkgo.AfterSuite(func() { ginkgo.By("Check Yunikorn's health") checks, err := yunikorn.GetFailedHealthChecks() - Ω(err).NotTo(gomega.HaveOccurred()) + Ω(err).NotTo(HaveOccurred()) Ω(checks).To(gomega.Equal(""), checks) + ginkgo.By("Tearing down namespace: " + dev) + err = kClient.TearDownNamespace(dev) + Ω(err).NotTo(HaveOccurred()) }) var _ = ginkgo.Describe("UserGroupLimit", func() { @@ -909,40 +926,229 @@ var _ = ginkgo.Describe("UserGroupLimit", func() { checkUsageWildcardGroups(groupTestType, group2, sandboxQueue1, []*v1.Pod{group2Sandbox1Pod1, group2Sandbox1Pod2, group2Sandbox1Pod3}) }) + ginkgo.It("Verify User info for the non kube admin user", func() { + var clientset *kubernetes.Clientset + var namespace = "default" + var serviceAccountName = "test-user-sa" + var podName = "test-pod" + var secretName = "test-user-sa-token" // #nosec G101 + + ginkgo.By("Update config") + // The wait wrapper still can't fully guarantee that the config in AdmissionController has been updated. + admissionCustomConfig = map[string]string{ + "log.core.scheduler.ugm.level": "debug", + amconf.AMAccessControlBypassAuth: constants.False, + } + yunikorn.WaitForAdmissionControllerRefreshConfAfterAction(func() { + yunikorn.UpdateCustomConfigMapWrapperWithMap(oldConfigMap, "", admissionCustomConfig, func(sc *configs.SchedulerConfig) error { + // remove placement rules so we can control queue + sc.Partitions[0].PlacementRules = nil + err := common.AddQueue(sc, constants.DefaultPartition, constants.RootQueue, configs.QueueConfig{ + Name: "default", + Limits: []configs.Limit{ + { + Limit: "user entry", + Users: []string{user1}, + MaxApplications: 1, + MaxResources: map[string]string{ + siCommon.Memory: fmt.Sprintf("%dM", mediumMem), + }, + }, + { + Limit: "user2 entry", + Users: []string{user2}, + MaxApplications: 2, + MaxResources: map[string]string{ + siCommon.Memory: fmt.Sprintf("%dM", largeMem), + }, + }, + }}) + if err != nil { + return err + } + return common.AddQueue(sc, constants.DefaultPartition, constants.RootQueue, configs.QueueConfig{Name: "sandbox2"}) + }) + }) + defer func() { + // cleanup + ginkgo.By("Cleaning up resources...") + err := clientset.CoreV1().Pods(namespace).Delete(context.TODO(), podName, metav1.DeleteOptions{}) + gomega.Ω(err).NotTo(HaveOccurred()) + err = clientset.CoreV1().ServiceAccounts(namespace).Delete(context.TODO(), serviceAccountName, metav1.DeleteOptions{}) + gomega.Ω(err).NotTo(HaveOccurred()) + err = kClient.DeleteClusterRole("pod-creator-role") + gomega.Ω(err).NotTo(HaveOccurred()) + err = kClient.DeleteClusterRoleBindings("pod-creator-role-binding") + gomega.Ω(err).NotTo(HaveOccurred()) + }() + // Create Service Account + ginkgo.By("Creating Service Account...") + sa, err := kClient.CreateServiceAccount(serviceAccountName, namespace) + gomega.Ω(err).NotTo(HaveOccurred()) + // Create a ClusterRole with necessary permissions + ginkgo.By("Creating ClusterRole...") + clusterRole := &rbacv1.ClusterRole{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-creator-role", + }, + Rules: []rbacv1.PolicyRule{ + { + APIGroups: []string{""}, + Resources: []string{"pods", "serviceaccounts", "test-user-sa"}, + Verbs: []string{"create", "get", "list", "watch", "delete"}, + }, + }, + } + _, err = kClient.CreateClusterRole(clusterRole) + gomega.Ω(err).NotTo(HaveOccurred()) + // Create a ClusterRoleBinding to bind the ClusterRole to the service account + ginkgo.By("Creating ClusterRoleBinding...") + clusterRoleBinding := &rbacv1.ClusterRoleBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-creator-role-binding", + }, + RoleRef: rbacv1.RoleRef{ + APIGroup: "rbac.authorization.k8s.io", + Kind: "ClusterRole", + Name: "pod-creator-role", + }, + Subjects: []rbacv1.Subject{ + { + Kind: "ServiceAccount", + Name: sa.Name, + Namespace: namespace, + }, + }, + } + _, err = kClient.CreateClusterRoleBinding(clusterRoleBinding.ObjectMeta.Name, clusterRoleBinding.RoleRef.Name, clusterRoleBinding.Subjects[0].Namespace, clusterRoleBinding.Subjects[0].Name) + gomega.Ω(err).NotTo(HaveOccurred()) + // Create a Secret for the Service Account + ginkgo.By("Creating Secret for the Service Account...") + // create a object of v1.Secret + secret := &v1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: secretName, + Annotations: map[string]string{ + "kubernetes.io/service-account.name": serviceAccountName, + }, + }, + Type: v1.SecretTypeServiceAccountToken, + } + _, err = kClient.CreateSecret(secret, namespace) + gomega.Ω(err).NotTo(HaveOccurred()) + // Get the token value from the Secret + ginkgo.By("Getting the token value from the Secret...") + userTokenValue, err := kClient.GetSecretValue(namespace, secretName, "token") + gomega.Ω(err).NotTo(HaveOccurred()) + // use deep copy not to hardcode the kubeconfig + config, err := kClient.GetKubeConfig() + gomega.Ω(err).NotTo(HaveOccurred()) + config.BearerToken = userTokenValue + newConf := rest.CopyConfig(config) // copy existing config + // Use token-based authentication instead of client certificates + newConf.CAFile = "" + newConf.CertFile = "" + newConf.KeyFile = "" + newConf.BearerToken = userTokenValue + kubeconfigPath := filepath.Join(os.TempDir(), "test-user-config") + err = k8s.WriteConfigToFile(newConf, kubeconfigPath) + gomega.Ω(err).NotTo(HaveOccurred()) + config, err = clientcmd.BuildConfigFromFlags("", kubeconfigPath) + gomega.Ω(err).NotTo(HaveOccurred()) + clientset, err = kubernetes.NewForConfig(config) + gomega.Ω(err).NotTo(HaveOccurred()) + ginkgo.By("Creating Pod...") + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: podName, + Annotations: map[string]string{ + "created-by": fmt.Sprintf("system:serviceaccount:%s:%s", namespace, serviceAccountName), + "user-token": userTokenValue, // Log the token in the annotation + }, + Labels: map[string]string{"applicationId": "test-app"}, + }, + Spec: v1.PodSpec{ + ServiceAccountName: serviceAccountName, + Containers: []v1.Container{ + { + Name: "nginx", + Image: "nginx", + Ports: []v1.ContainerPort{{ContainerPort: 80}}, + }, + }, + }, + } + _, err = clientset.CoreV1().Pods(namespace).Create(context.TODO(), pod, metav1.CreateOptions{}) + gomega.Ω(err).NotTo(HaveOccurred()) + createdPod, err := clientset.CoreV1().Pods(namespace).Get(context.TODO(), podName, metav1.GetOptions{}) + gomega.Ω(err).NotTo(HaveOccurred()) + ginkgo.By("Verifying User Info...") + userInfo, err := GetUserInfoFromPodAnnotation(createdPod) + gomega.Ω(err).NotTo(HaveOccurred()) + // user info should contain the substring "system:serviceaccount:default:test-user-sa" + gomega.Ω(strings.Contains(fmt.Sprintf("%v", userInfo), "system:serviceaccount:default:test-user-sa")).To(gomega.BeTrue()) + queueName2 := "root_22" + yunikorn.UpdateCustomConfigMapWrapper(oldConfigMap, "", func(sc *configs.SchedulerConfig) error { + // remove placement rules so we can control queue + sc.Partitions[0].PlacementRules = nil + var err error + if err = common.AddQueue(sc, "default", "root", configs.QueueConfig{ + Name: queueName2, + Resources: configs.Resources{Guaranteed: map[string]string{"memory": fmt.Sprintf("%dM", 200)}}, + Properties: map[string]string{"preemption.delay": "1s"}, + }); err != nil { + return err + } + return nil + }) + }) ginkgo.AfterEach(func() { tests.DumpClusterInfoIfSpecFailed(suiteName, []string{dev}) ginkgo.By("Tearing down namespace: " + dev) err := kClient.TearDownNamespace(dev) - Ω(err).NotTo(gomega.HaveOccurred()) + Ω(err).NotTo(HaveOccurred()) // reset config ginkgo.By("Restoring YuniKorn configuration") yunikorn.RestoreConfigMapWrapper(oldConfigMap) }) }) +func GetUserInfoFromPodAnnotation(pod *v1.Pod) (*si.UserGroupInformation, error) { + userInfo, ok := pod.Annotations[amCommon.UserInfoAnnotation] + if !ok { + return nil, fmt.Errorf("user info not found in pod annotation") + } + var userInfoObj si.UserGroupInformation + err := json.Unmarshal([]byte(userInfo), &userInfoObj) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal user info from pod annotation") + } + return &userInfoObj, nil +} + func deploySleepPod(usergroup *si.UserGroupInformation, queuePath string, expectedRunning bool, reason string) *v1.Pod { usergroupJsonBytes, err := json.Marshal(usergroup) - Ω(err).NotTo(gomega.HaveOccurred()) + Ω(err).NotTo(HaveOccurred()) sleepPodConfig := k8s.SleepPodConfig{NS: dev, Mem: smallMem, Labels: map[string]string{constants.LabelQueueName: queuePath}} sleepPodObj, err := k8s.InitSleepPod(sleepPodConfig) - Ω(err).NotTo(gomega.HaveOccurred()) + Ω(err).NotTo(HaveOccurred()) sleepPodObj.Annotations[amCommon.UserInfoAnnotation] = string(usergroupJsonBytes) ginkgo.By(fmt.Sprintf("%s deploys the sleep pod %s to queue %s", usergroup, sleepPodObj.Name, queuePath)) sleepPod, err := kClient.CreatePod(sleepPodObj, dev) - gomega.Ω(err).NotTo(gomega.HaveOccurred()) + gomega.Ω(err).NotTo(HaveOccurred()) if expectedRunning { ginkgo.By(fmt.Sprintf("The sleep pod %s can be scheduled %s", sleepPod.Name, reason)) err = kClient.WaitForPodRunning(dev, sleepPod.Name, 60*time.Second) - gomega.Ω(err).NotTo(gomega.HaveOccurred()) + gomega.Ω(err).NotTo(HaveOccurred()) } else { ginkgo.By(fmt.Sprintf("The sleep pod %s can't be scheduled %s", sleepPod.Name, reason)) // Since Pending is the initial state of PodPhase, sleep for 5 seconds, then check whether the pod is still in Pending state. time.Sleep(5 * time.Second) err = kClient.WaitForPodPending(sleepPod.Namespace, sleepPod.Name, 60*time.Second) - gomega.Ω(err).NotTo(gomega.HaveOccurred()) + gomega.Ω(err).NotTo(HaveOccurred()) } return sleepPod } @@ -952,14 +1158,14 @@ func checkUsage(testType TestType, name string, queuePath string, expectedRunnin if testType == userTestType { ginkgo.By(fmt.Sprintf("Check user resource usage for %s in queue %s", name, queuePath)) userUsageDAOInfo, err := restClient.GetUserUsage(constants.DefaultPartition, name) - Ω(err).NotTo(gomega.HaveOccurred()) + Ω(err).NotTo(HaveOccurred()) Ω(userUsageDAOInfo).NotTo(gomega.BeNil()) rootQueueResourceUsageDAO = userUsageDAOInfo.Queues } else if testType == groupTestType { ginkgo.By(fmt.Sprintf("Check group resource usage for %s in queue %s", name, queuePath)) groupUsageDAOInfo, err := restClient.GetGroupUsage(constants.DefaultPartition, name) - Ω(err).NotTo(gomega.HaveOccurred()) + Ω(err).NotTo(HaveOccurred()) Ω(groupUsageDAOInfo).NotTo(gomega.BeNil()) rootQueueResourceUsageDAO = groupUsageDAOInfo.Queues