Skip to content

Commit

Permalink
refactor(runtime): make maxSkew configurable
Browse files Browse the repository at this point in the history
Read the TopologySpreadConstraints maxSkew from the YAML config
so we can tune based on cluster/game. By default it will be set to 5.
Using a low value lead to underutilized nodes which impacts in cost
  • Loading branch information
hspedro committed Oct 15, 2024
1 parent bde7835 commit 871a93b
Show file tree
Hide file tree
Showing 9 changed files with 57 additions and 23 deletions.
4 changes: 4 additions & 0 deletions config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ adapters:
inCluster: false
masterUrl: "https://127.0.0.1:6443"
kubeconfig: "./kubeconfig/kubeconfig.yaml"
topologySpreadConstraint:
maxSkew: 5
topologyKey: "topology.kubernetes.io/zone"
whenUnsatisfiableScheduleAnyway: false
grpc:
keepAlive:
time: 30s
Expand Down
2 changes: 1 addition & 1 deletion internal/adapters/runtime/kubernetes/game_room.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ const (
)

func (k *kubernetes) CreateGameRoomInstance(ctx context.Context, scheduler *entities.Scheduler, gameRoomName string, gameRoomSpec game_room.Spec) (*game_room.Instance, error) {
pod, err := convertGameRoomSpec(*scheduler, gameRoomName, gameRoomSpec)
pod, err := convertGameRoomSpec(*scheduler, gameRoomName, gameRoomSpec, k.config)
if err != nil {
return nil, errors.NewErrInvalidArgument("invalid game room spec: %s", err)
}
Expand Down
12 changes: 8 additions & 4 deletions internal/adapters/runtime/kubernetes/game_room_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,18 @@ var invalidPodWaitingStates = []string{
"RunContainerError",
}

func convertGameRoomSpec(scheduler entities.Scheduler, gameRoomName string, gameRoomSpec game_room.Spec) (*v1.Pod, error) {
func convertGameRoomSpec(scheduler entities.Scheduler, gameRoomName string, gameRoomSpec game_room.Spec, config KubernetesConfig) (*v1.Pod, error) {
defaultAnnotations := map[string]string{safeToEvictAnnotation: safeToEvictValue}
defaultLabels := map[string]string{
maestroLabelKey: maestroLabelValue,
schedulerLabelKey: scheduler.Name,
versionLabelKey: gameRoomSpec.Version,
}

whenUnsatisfiable := v1.DoNotSchedule
if config.WhenUnsatisfiableScheduleAnyway {
whenUnsatisfiable = v1.ScheduleAnyway
}
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: gameRoomName,
Expand All @@ -95,9 +99,9 @@ func convertGameRoomSpec(scheduler entities.Scheduler, gameRoomName string, game
// 5. Add a conversion function
TopologySpreadConstraints: []v1.TopologySpreadConstraint{
{
MaxSkew: 1,
TopologyKey: "topology.kubernetes.io/zone",
WhenUnsatisfiable: v1.ScheduleAnyway,
MaxSkew: int32(config.MaxSkew),
TopologyKey: config.TopologyKey,
WhenUnsatisfiable: whenUnsatisfiable,
LabelSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{
schedulerLabelKey: scheduler.Name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,7 @@ func TestConvertGameSpec(t *testing.T) {
// Name: test.scheduler,
// Annotations: test.expectedPod.ObjectMeta.Annotations,
//}
res, err := convertGameRoomSpec(test.scheduler, test.roomName, test.gameSpec)
res, err := convertGameRoomSpec(test.scheduler, test.roomName, test.gameSpec, KubernetesConfig{MaxSkew: 1, WhenUnsatisfiableScheduleAnyway: true})
if test.withError {
require.Error(t, err)
return
Expand Down
6 changes: 3 additions & 3 deletions internal/adapters/runtime/kubernetes/game_room_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestGameRoomCreation(t *testing.T) {
ctx := context.Background()
gameRoomName := "some-game-room-name"
client := test.GetKubernetesClientSet(t, kubernetesContainer)
kubernetesRuntime := New(client)
kubernetesRuntime := New(client, KubernetesConfig{MaxSkew: 1, WhenUnsatisfiableScheduleAnyway: true})

t.Run("successfully create a room", func(t *testing.T) {
t.Parallel()
Expand Down Expand Up @@ -98,7 +98,7 @@ func TestGameRoomDeletion(t *testing.T) {
ctx := context.Background()
client := test.GetKubernetesClientSet(t, kubernetesContainer)
gameRoomName := "some-game-room"
kubernetesRuntime := New(client)
kubernetesRuntime := New(client, KubernetesConfig{MaxSkew: 1, WhenUnsatisfiableScheduleAnyway: true})

t.Run("successfully delete a room", func(t *testing.T) {
t.Parallel()
Expand Down Expand Up @@ -155,7 +155,7 @@ func TestCreateGameRoomName(t *testing.T) {
t.Parallel()
ctx := context.Background()
client := test.GetKubernetesClientSet(t, kubernetesContainer)
kubernetesRuntime := New(client)
kubernetesRuntime := New(client, KubernetesConfig{MaxSkew: 1, WhenUnsatisfiableScheduleAnyway: true})

t.Run("When scheduler name is greater than max name length minus randomLength", func(t *testing.T) {
t.Run("return the name with randomLength", func(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestGameRoomsWatch(t *testing.T) {
t.Parallel()
ctx := context.Background()
client := test.GetKubernetesClientSet(t, kubernetesContainer)
kubernetesRuntime := New(client)
kubernetesRuntime := New(client, KubernetesConfig{MaxSkew: 1, WhenUnsatisfiableScheduleAnyway: true})

scheduler := &entities.Scheduler{Name: "watch-room-addition"}
err := kubernetesRuntime.CreateScheduler(ctx, scheduler)
Expand Down Expand Up @@ -96,7 +96,7 @@ func TestGameRoomsWatch(t *testing.T) {
t.Parallel()
ctx := context.Background()
client := test.GetKubernetesClientSet(t, kubernetesContainer)
kubernetesRuntime := New(client)
kubernetesRuntime := New(client, KubernetesConfig{MaxSkew: 1, WhenUnsatisfiableScheduleAnyway: true})

scheduler := &entities.Scheduler{Name: "watch-room-ready"}
err := kubernetesRuntime.CreateScheduler(ctx, scheduler)
Expand Down Expand Up @@ -158,7 +158,7 @@ func TestGameRoomsWatch(t *testing.T) {
t.Parallel()
ctx := context.Background()
client := test.GetKubernetesClientSet(t, kubernetesContainer)
kubernetesRuntime := New(client)
kubernetesRuntime := New(client, KubernetesConfig{MaxSkew: 1})

scheduler := &entities.Scheduler{Name: "watch-room-error"}
err := kubernetesRuntime.CreateScheduler(ctx, scheduler)
Expand Down Expand Up @@ -213,7 +213,7 @@ func TestGameRoomsWatch(t *testing.T) {
t.Parallel()
ctx := context.Background()
client := test.GetKubernetesClientSet(t, kubernetesContainer)
kubernetesRuntime := New(client)
kubernetesRuntime := New(client, KubernetesConfig{MaxSkew: 1, WhenUnsatisfiableScheduleAnyway: true})

scheduler := &entities.Scheduler{Name: "watch-room-delete"}
err := kubernetesRuntime.CreateScheduler(ctx, scheduler)
Expand Down
21 changes: 20 additions & 1 deletion internal/adapters/runtime/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,35 @@ import (

var _ ports.Runtime = (*kubernetes)(nil)

const (
DefaultMaxSkew = 5
DefaultTopologyKey = "topology.kubernetes.io/zone"
)

type KubernetesConfig struct {
MaxSkew int
TopologyKey string
WhenUnsatisfiableScheduleAnyway bool
}

type kubernetes struct {
clientSet kube.Interface
logger *zap.Logger
eventRecorder record.EventRecorder
config KubernetesConfig
}

func New(clientSet kube.Interface) *kubernetes {
func New(clientSet kube.Interface, config KubernetesConfig) *kubernetes {
if config.MaxSkew <= 0 {
config.MaxSkew = DefaultMaxSkew
}
if config.TopologyKey == "" {
config.TopologyKey = DefaultTopologyKey
}
k := &kubernetes{
clientSet: clientSet,
logger: zap.L().With(zap.String(logs.LogFieldRuntime, "kubernetes")),
config: config,
}

eventBroadcaster := record.NewBroadcaster()
Expand Down
6 changes: 3 additions & 3 deletions internal/adapters/runtime/kubernetes/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import (
func TestSchedulerCreation(t *testing.T) {
ctx := context.Background()
client := test.GetKubernetesClientSet(t, kubernetesContainer)
kubernetesRuntime := New(client)
kubernetesRuntime := New(client, KubernetesConfig{MaxSkew: 1, WhenUnsatisfiableScheduleAnyway: true})

t.Run("create single scheduler", func(t *testing.T) {
scheduler := &entities.Scheduler{Name: "single-scheduler-test", PdbMaxUnavailable: "5%"}
Expand All @@ -66,7 +66,7 @@ func TestSchedulerCreation(t *testing.T) {
func TestSchedulerDeletion(t *testing.T) {
ctx := context.Background()
client := test.GetKubernetesClientSet(t, kubernetesContainer)
kubernetesRuntime := New(client)
kubernetesRuntime := New(client, KubernetesConfig{MaxSkew: 1, WhenUnsatisfiableScheduleAnyway: true})

t.Run("delete scheduler", func(t *testing.T) {
scheduler := &entities.Scheduler{Name: "delete-scheduler-test", PdbMaxUnavailable: "5%"}
Expand All @@ -92,7 +92,7 @@ func TestSchedulerDeletion(t *testing.T) {
func TestPDBCreationAndDeletion(t *testing.T) {
ctx := context.Background()
client := test.GetKubernetesClientSet(t, kubernetesContainer)
kubernetesRuntime := New(client)
kubernetesRuntime := New(client, KubernetesConfig{MaxSkew: 1, WhenUnsatisfiableScheduleAnyway: true})

t.Run("create pdb from scheduler without autoscaling", func(t *testing.T) {
if !kubernetesRuntime.isPDBSupported() {
Expand Down
19 changes: 13 additions & 6 deletions internal/service/adapters.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,14 @@ const (
grpcKeepAliveTimePath = "adapters.grpc.keepalive.time"
grpcKeepAliveTimeoutPath = "adapters.grpc.keepalive.timeout"
// Kubernetes runtime
runtimeKubernetesMasterURLPath = "adapters.runtime.kubernetes.masterUrl"
runtimeKubernetesKubeconfigPath = "adapters.runtime.kubernetes.kubeconfig"
runtimeKubernetesInClusterPath = "adapters.runtime.kubernetes.inCluster"
runtimeKubernetesQPS = "adapters.runtime.kubernetes.qps"
runtimeKubernetesBurst = "adapters.runtime.kubernetes.burst"
runtimeKubernetesMasterURLPath = "adapters.runtime.kubernetes.masterUrl"
runtimeKubernetesKubeconfigPath = "adapters.runtime.kubernetes.kubeconfig"
runtimeKubernetesInClusterPath = "adapters.runtime.kubernetes.inCluster"
runtimeKubernetesQPS = "adapters.runtime.kubernetes.qps"
runtimeKubernetesBurst = "adapters.runtime.kubernetes.burst"
runtimeKubernetesMaxSkew = "adapters.runtime.kubernetes.topologySpreadConstraint.maxSkew"
runtimeKubernetesTopologyKey = "adapters.runtime.kubernetes.topologySpreadConstraint.topologyKey"
runtimeKubernetesWhenUnsatisfiableScheduleAnyway = "adapters.runtime.kubernetes.topologySpreadConstraint.whenUnsatisfiableScheduleAnyway"
// Redis operation storage
operationStorageRedisURLPath = "adapters.operationStorage.redis.url"
operationLeaseStorageRedisURLPath = "adapters.operationLeaseStorage.redis.url"
Expand Down Expand Up @@ -146,7 +149,11 @@ func NewRuntimeKubernetes(c config.Config) (ports.Runtime, error) {
return nil, fmt.Errorf("failed to initialize Kubernetes runtime: %w", err)
}

return kubernetesRuntime.New(clientSet), nil
return kubernetesRuntime.New(clientSet, kubernetesRuntime.KubernetesConfig{
MaxSkew: c.GetInt(runtimeKubernetesMaxSkew),
TopologyKey: c.GetString(runtimeKubernetesTopologyKey),
WhenUnsatisfiableScheduleAnyway: c.GetBool(runtimeKubernetesWhenUnsatisfiableScheduleAnyway),
}), nil
}

// NewOperationStorageRedis instantiates redis as operation storage.
Expand Down

0 comments on commit 871a93b

Please sign in to comment.