diff --git a/config/config.yaml b/config/config.yaml index bdcc71bfb..161291f6c 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -49,6 +49,11 @@ adapters: inCluster: false masterUrl: "https://127.0.0.1:6443" kubeconfig: "./kubeconfig/kubeconfig.yaml" + topologySpreadConstraint: + enabled: true + maxSkew: 5 + topologyKey: "topology.kubernetes.io/zone" + whenUnsatisfiableScheduleAnyway: false grpc: keepAlive: time: 30s diff --git a/internal/adapters/runtime/kubernetes/game_room.go b/internal/adapters/runtime/kubernetes/game_room.go index e30338952..858b4320c 100644 --- a/internal/adapters/runtime/kubernetes/game_room.go +++ b/internal/adapters/runtime/kubernetes/game_room.go @@ -40,7 +40,7 @@ const ( ) func (k *kubernetes) CreateGameRoomInstance(ctx context.Context, scheduler *entities.Scheduler, gameRoomName string, gameRoomSpec game_room.Spec) (*game_room.Instance, error) { - pod, err := convertGameRoomSpec(*scheduler, gameRoomName, gameRoomSpec) + pod, err := convertGameRoomSpec(*scheduler, gameRoomName, gameRoomSpec, k.config) if err != nil { return nil, errors.NewErrInvalidArgument("invalid game room spec: %s", err) } diff --git a/internal/adapters/runtime/kubernetes/game_room_convert.go b/internal/adapters/runtime/kubernetes/game_room_convert.go index 9c981226a..6da28220b 100644 --- a/internal/adapters/runtime/kubernetes/game_room_convert.go +++ b/internal/adapters/runtime/kubernetes/game_room_convert.go @@ -67,14 +67,13 @@ var invalidPodWaitingStates = []string{ "RunContainerError", } -func convertGameRoomSpec(scheduler entities.Scheduler, gameRoomName string, gameRoomSpec game_room.Spec) (*v1.Pod, error) { +func convertGameRoomSpec(scheduler entities.Scheduler, gameRoomName string, gameRoomSpec game_room.Spec, config KubernetesConfig) (*v1.Pod, error) { defaultAnnotations := map[string]string{safeToEvictAnnotation: safeToEvictValue} defaultLabels := map[string]string{ maestroLabelKey: maestroLabelValue, schedulerLabelKey: scheduler.Name, versionLabelKey: gameRoomSpec.Version, } - pod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: gameRoomName, @@ -87,25 +86,31 @@ func convertGameRoomSpec(scheduler entities.Scheduler, gameRoomName string, game Containers: []v1.Container{}, Tolerations: convertSpecTolerations(gameRoomSpec), Affinity: convertSpecAffinity(gameRoomSpec), - // TODO: make it configurable - // 1. Add to proto/API - // 2. Generate message - // 3. Read from it - // 4. Add to game_room.Spec - // 5. Add a conversion function - TopologySpreadConstraints: []v1.TopologySpreadConstraint{ - { - MaxSkew: 1, - TopologyKey: "topology.kubernetes.io/zone", - WhenUnsatisfiable: v1.ScheduleAnyway, - LabelSelector: &metav1.LabelSelector{ - MatchLabels: map[string]string{ - schedulerLabelKey: scheduler.Name, - }, + }, + } + if config.TopologySpreadConstraintConfig.Enabled { + whenUnsatisfiable := v1.DoNotSchedule + if config.TopologySpreadConstraintConfig.WhenUnsatisfiableScheduleAnyway { + whenUnsatisfiable = v1.ScheduleAnyway + } + // TODO: make it configurable on scheduler + // 1. Add to proto/API + // 2. Generate message + // 3. Read from it + // 4. Add to game_room.Spec + // 5. Add a conversion function + pod.Spec.TopologySpreadConstraints = []v1.TopologySpreadConstraint{ + { + MaxSkew: int32(config.TopologySpreadConstraintConfig.MaxSkew), + TopologyKey: config.TopologySpreadConstraintConfig.TopologyKey, + WhenUnsatisfiable: whenUnsatisfiable, + LabelSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + schedulerLabelKey: scheduler.Name, }, }, }, - }, + } } for _, container := range gameRoomSpec.Containers { podContainer, err := convertContainer(container, scheduler.Name, pod.Name) diff --git a/internal/adapters/runtime/kubernetes/game_room_convert_test.go b/internal/adapters/runtime/kubernetes/game_room_convert_test.go index 79e4bf688..aa45d86a1 100644 --- a/internal/adapters/runtime/kubernetes/game_room_convert_test.go +++ b/internal/adapters/runtime/kubernetes/game_room_convert_test.go @@ -675,7 +675,7 @@ func TestConvertGameSpec(t *testing.T) { // Name: test.scheduler, // Annotations: test.expectedPod.ObjectMeta.Annotations, //} - res, err := convertGameRoomSpec(test.scheduler, test.roomName, test.gameSpec) + res, err := convertGameRoomSpec(test.scheduler, test.roomName, test.gameSpec, KubernetesConfig{}) if test.withError { require.Error(t, err) return diff --git a/internal/adapters/runtime/kubernetes/game_room_test.go b/internal/adapters/runtime/kubernetes/game_room_test.go index 9e6901338..dff868e94 100644 --- a/internal/adapters/runtime/kubernetes/game_room_test.go +++ b/internal/adapters/runtime/kubernetes/game_room_test.go @@ -44,7 +44,7 @@ func TestGameRoomCreation(t *testing.T) { ctx := context.Background() gameRoomName := "some-game-room-name" client := test.GetKubernetesClientSet(t, kubernetesContainer) - kubernetesRuntime := New(client) + kubernetesRuntime := New(client, KubernetesConfig{}) t.Run("successfully create a room", func(t *testing.T) { t.Parallel() @@ -98,7 +98,7 @@ func TestGameRoomDeletion(t *testing.T) { ctx := context.Background() client := test.GetKubernetesClientSet(t, kubernetesContainer) gameRoomName := "some-game-room" - kubernetesRuntime := New(client) + kubernetesRuntime := New(client, KubernetesConfig{}) t.Run("successfully delete a room", func(t *testing.T) { t.Parallel() @@ -155,7 +155,7 @@ func TestCreateGameRoomName(t *testing.T) { t.Parallel() ctx := context.Background() client := test.GetKubernetesClientSet(t, kubernetesContainer) - kubernetesRuntime := New(client) + kubernetesRuntime := New(client, KubernetesConfig{}) t.Run("When scheduler name is greater than max name length minus randomLength", func(t *testing.T) { t.Run("return the name with randomLength", func(t *testing.T) { diff --git a/internal/adapters/runtime/kubernetes/game_room_watcher_test.go b/internal/adapters/runtime/kubernetes/game_room_watcher_test.go index 35a6f2aea..0071d4b2a 100644 --- a/internal/adapters/runtime/kubernetes/game_room_watcher_test.go +++ b/internal/adapters/runtime/kubernetes/game_room_watcher_test.go @@ -45,7 +45,7 @@ func TestGameRoomsWatch(t *testing.T) { t.Parallel() ctx := context.Background() client := test.GetKubernetesClientSet(t, kubernetesContainer) - kubernetesRuntime := New(client) + kubernetesRuntime := New(client, KubernetesConfig{}) scheduler := &entities.Scheduler{Name: "watch-room-addition"} err := kubernetesRuntime.CreateScheduler(ctx, scheduler) @@ -96,7 +96,7 @@ func TestGameRoomsWatch(t *testing.T) { t.Parallel() ctx := context.Background() client := test.GetKubernetesClientSet(t, kubernetesContainer) - kubernetesRuntime := New(client) + kubernetesRuntime := New(client, KubernetesConfig{}) scheduler := &entities.Scheduler{Name: "watch-room-ready"} err := kubernetesRuntime.CreateScheduler(ctx, scheduler) @@ -158,7 +158,7 @@ func TestGameRoomsWatch(t *testing.T) { t.Parallel() ctx := context.Background() client := test.GetKubernetesClientSet(t, kubernetesContainer) - kubernetesRuntime := New(client) + kubernetesRuntime := New(client, KubernetesConfig{MaxSkew: 1}) scheduler := &entities.Scheduler{Name: "watch-room-error"} err := kubernetesRuntime.CreateScheduler(ctx, scheduler) @@ -213,7 +213,7 @@ func TestGameRoomsWatch(t *testing.T) { t.Parallel() ctx := context.Background() client := test.GetKubernetesClientSet(t, kubernetesContainer) - kubernetesRuntime := New(client) + kubernetesRuntime := New(client, KubernetesConfig{}) scheduler := &entities.Scheduler{Name: "watch-room-delete"} err := kubernetesRuntime.CreateScheduler(ctx, scheduler) diff --git a/internal/adapters/runtime/kubernetes/kubernetes.go b/internal/adapters/runtime/kubernetes/kubernetes.go index aeca5e3ba..89eaf047f 100644 --- a/internal/adapters/runtime/kubernetes/kubernetes.go +++ b/internal/adapters/runtime/kubernetes/kubernetes.go @@ -35,16 +35,40 @@ import ( var _ ports.Runtime = (*kubernetes)(nil) +const ( + DefaultMaxSkew = 5 + DefaultTopologyKey = "topology.kubernetes.io/zone" +) + +type TopologySpreadConstraintConfig struct { + Enabled bool + MaxSkew int + TopologyKey string + WhenUnsatisfiableScheduleAnyway bool +} + +type KubernetesConfig struct { + TopologySpreadConstraintConfig TopologySpreadConstraintConfig +} + type kubernetes struct { clientSet kube.Interface logger *zap.Logger eventRecorder record.EventRecorder + config KubernetesConfig } -func New(clientSet kube.Interface) *kubernetes { +func New(clientSet kube.Interface, config KubernetesConfig) *kubernetes { + if config.TopologySpreadConstraintConfig.MaxSkew <= 0 { + config.TopologySpreadConstraintConfig.MaxSkew = DefaultMaxSkew + } + if config.TopologySpreadConstraintConfig.TopologyKey == "" { + config.TopologySpreadConstraintConfig.TopologyKey = DefaultTopologyKey + } k := &kubernetes{ clientSet: clientSet, logger: zap.L().With(zap.String(logs.LogFieldRuntime, "kubernetes")), + config: config, } eventBroadcaster := record.NewBroadcaster() diff --git a/internal/adapters/runtime/kubernetes/scheduler_test.go b/internal/adapters/runtime/kubernetes/scheduler_test.go index c80fff3f1..ce4c30248 100644 --- a/internal/adapters/runtime/kubernetes/scheduler_test.go +++ b/internal/adapters/runtime/kubernetes/scheduler_test.go @@ -41,7 +41,7 @@ import ( func TestSchedulerCreation(t *testing.T) { ctx := context.Background() client := test.GetKubernetesClientSet(t, kubernetesContainer) - kubernetesRuntime := New(client) + kubernetesRuntime := New(client, KubernetesConfig{}) t.Run("create single scheduler", func(t *testing.T) { scheduler := &entities.Scheduler{Name: "single-scheduler-test", PdbMaxUnavailable: "5%"} @@ -66,7 +66,7 @@ func TestSchedulerCreation(t *testing.T) { func TestSchedulerDeletion(t *testing.T) { ctx := context.Background() client := test.GetKubernetesClientSet(t, kubernetesContainer) - kubernetesRuntime := New(client) + kubernetesRuntime := New(client, KubernetesConfig{}) t.Run("delete scheduler", func(t *testing.T) { scheduler := &entities.Scheduler{Name: "delete-scheduler-test", PdbMaxUnavailable: "5%"} @@ -92,7 +92,7 @@ func TestSchedulerDeletion(t *testing.T) { func TestPDBCreationAndDeletion(t *testing.T) { ctx := context.Background() client := test.GetKubernetesClientSet(t, kubernetesContainer) - kubernetesRuntime := New(client) + kubernetesRuntime := New(client, KubernetesConfig{}) t.Run("create pdb from scheduler without autoscaling", func(t *testing.T) { if !kubernetesRuntime.isPDBSupported() { diff --git a/internal/service/adapters.go b/internal/service/adapters.go index ae3f8bd5f..85126edaf 100644 --- a/internal/service/adapters.go +++ b/internal/service/adapters.go @@ -69,11 +69,15 @@ const ( grpcKeepAliveTimePath = "adapters.grpc.keepalive.time" grpcKeepAliveTimeoutPath = "adapters.grpc.keepalive.timeout" // Kubernetes runtime - runtimeKubernetesMasterURLPath = "adapters.runtime.kubernetes.masterUrl" - runtimeKubernetesKubeconfigPath = "adapters.runtime.kubernetes.kubeconfig" - runtimeKubernetesInClusterPath = "adapters.runtime.kubernetes.inCluster" - runtimeKubernetesQPS = "adapters.runtime.kubernetes.qps" - runtimeKubernetesBurst = "adapters.runtime.kubernetes.burst" + runtimeKubernetesMasterURLPath = "adapters.runtime.kubernetes.masterUrl" + runtimeKubernetesKubeconfigPath = "adapters.runtime.kubernetes.kubeconfig" + runtimeKubernetesInClusterPath = "adapters.runtime.kubernetes.inCluster" + runtimeKubernetesQPS = "adapters.runtime.kubernetes.qps" + runtimeKubernetesBurst = "adapters.runtime.kubernetes.burst" + runtimeKubernetesTopologySpreadEnabled = "adapters.runtime.kubernetes.topologySpreadConstraint.enabled" + runtimeKubernetesTopologySpreadMaxSkew = "adapters.runtime.kubernetes.topologySpreadConstraint.maxSkew" + runtimeKubernetesTopologySpreadTopologyKey = "adapters.runtime.kubernetes.topologySpreadConstraint.topologyKey" + runtimeKubernetesTopologySpreadWhenUnsatisfiableScheduleAnyway = "adapters.runtime.kubernetes.topologySpreadConstraint.whenUnsatisfiableScheduleAnyway" // Redis operation storage operationStorageRedisURLPath = "adapters.operationStorage.redis.url" operationLeaseStorageRedisURLPath = "adapters.operationLeaseStorage.redis.url" @@ -146,7 +150,14 @@ func NewRuntimeKubernetes(c config.Config) (ports.Runtime, error) { return nil, fmt.Errorf("failed to initialize Kubernetes runtime: %w", err) } - return kubernetesRuntime.New(clientSet), nil + return kubernetesRuntime.New(clientSet, kubernetesRuntime.KubernetesConfig{ + TopologySpreadConstraintConfig: kubernetesRuntime.TopologySpreadConstraintConfig{ + Enabled: c.GetBool(runtimeKubernetesTopologySpreadEnabled), + MaxSkew: c.GetInt(runtimeKubernetesTopologySpreadMaxSkew), + TopologyKey: c.GetString(runtimeKubernetesTopologySpreadTopologyKey), + WhenUnsatisfiableScheduleAnyway: c.GetBool(runtimeKubernetesTopologySpreadWhenUnsatisfiableScheduleAnyway), + }, + }), nil } // NewOperationStorageRedis instantiates redis as operation storage.