diff --git a/pkg/common/events/recorder.go b/pkg/common/events/recorder.go index 48a06fa71..cde9a64f2 100644 --- a/pkg/common/events/recorder.go +++ b/pkg/common/events/recorder.go @@ -19,43 +19,22 @@ package events import ( - "sync" + "sync/atomic" - "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/events" - - "github.com/apache/yunikorn-k8shim/pkg/client" - "github.com/apache/yunikorn-k8shim/pkg/common/constants" - "github.com/apache/yunikorn-k8shim/pkg/conf" - "github.com/apache/yunikorn-k8shim/pkg/locking" ) -var eventRecorder events.EventRecorder = events.NewFakeRecorder(1024) -var once sync.Once -var lock locking.RWMutex +var eventRecorder atomic.Pointer[events.EventRecorder] + +func init() { + r := events.EventRecorder(NewMockedRecorder()) + eventRecorder.Store(&r) +} func GetRecorder() events.EventRecorder { - lock.Lock() - defer lock.Unlock() - once.Do(func() { - // note, the initiation of the event recorder requires on a workable Kubernetes client, - // in test mode we should skip this and just use a fake recorder instead. - configs := conf.GetSchedulerConf() - if !configs.IsTestMode() { - k8sClient := client.NewKubeClient(configs.KubeConfig) - eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{ - Interface: k8sClient.GetClientSet().EventsV1()}) - eventBroadcaster.StartRecordingToSink(make(<-chan struct{})) - eventRecorder = eventBroadcaster.NewRecorder(scheme.Scheme, constants.SchedulerName) - } - }) - - return eventRecorder + return *eventRecorder.Load() } func SetRecorder(recorder events.EventRecorder) { - lock.Lock() - defer lock.Unlock() - eventRecorder = recorder - once.Do(func() {}) // make sure Do() doesn't fire elsewhere + eventRecorder.Store(&recorder) } diff --git a/pkg/common/events/recorder_test.go b/pkg/common/events/recorder_test.go index 7068ad262..b10e1f585 100644 --- a/pkg/common/events/recorder_test.go +++ b/pkg/common/events/recorder_test.go @@ -23,15 +23,10 @@ import ( "testing" "gotest.tools/v3/assert" - - "github.com/apache/yunikorn-k8shim/pkg/conf" ) func TestInit(t *testing.T) { // simply test the get won't fail - // which means the get function honors the testMode and - // skips initiating a real event recorder - conf.GetSchedulerConf().SetTestMode(true) recorder := GetRecorder() - assert.Equal(t, reflect.TypeOf(recorder).String(), "*events.FakeRecorder") + assert.Equal(t, reflect.TypeOf(recorder).String(), "*events.MockedRecorder") } diff --git a/pkg/shim/scheduler.go b/pkg/shim/scheduler.go index ebf3fb118..291083f4f 100644 --- a/pkg/shim/scheduler.go +++ b/pkg/shim/scheduler.go @@ -19,15 +19,20 @@ package shim import ( + ctx "context" "time" "go.uber.org/zap" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/scheme" + k8events "k8s.io/client-go/tools/events" "github.com/apache/yunikorn-k8shim/pkg/cache" "github.com/apache/yunikorn-k8shim/pkg/client" + "github.com/apache/yunikorn-k8shim/pkg/common/constants" + "github.com/apache/yunikorn-k8shim/pkg/common/events" "github.com/apache/yunikorn-k8shim/pkg/common/utils" "github.com/apache/yunikorn-k8shim/pkg/conf" "github.com/apache/yunikorn-k8shim/pkg/dispatcher" @@ -67,6 +72,18 @@ func NewShimScheduler(scheduler api.SchedulerAPI, configs *conf.SchedulerConf, b apiFactory := client.NewAPIFactory(scheduler, informerFactory, configs, false) context := cache.NewContextWithBootstrapConfigMaps(apiFactory, bootstrapConfigMaps) rmCallback := cache.NewAsyncRMCallback(context) + + eventBroadcaster := k8events.NewBroadcaster(&k8events.EventSinkImpl{ + Interface: kubeClient.GetClientSet().EventsV1()}) + err := eventBroadcaster.StartRecordingToSinkWithContext(ctx.Background()) + if err != nil { + log.Log(log.Shim).Error("Could not create event broadcaster", + zap.Error(err)) + } else { + eventRecorder := eventBroadcaster.NewRecorder(scheme.Scheme, constants.SchedulerName) + events.SetRecorder(eventRecorder) + } + return newShimSchedulerInternal(context, apiFactory, rmCallback) }