Skip to content

Commit

Permalink
Automated cherry pick of #2574: Start Controller and Webhook on new C…
Browse files Browse the repository at this point in the history
…RDs availability (#2991)

* Start Controller and Webhook on new CRDs availability

* Log errors from JobFramework controller and webhook

* Add test case for delayed JobFramework API becoming available

* Wait for API integration to be enabled

* Implement synchronization for safe concurrent access

* Implement exponential backoff for waitForAPI() and add function type to streamline REST mapping checks

* Add Job Framework API to mapper directly

* Cast mgr.GetRESTMapper() to *TestRESTMapper and ensure proper locking for tests
  • Loading branch information
ChristianZaccaria authored Sep 5, 2024
1 parent 06559f2 commit ee26f96
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 26 deletions.
6 changes: 3 additions & 3 deletions cmd/kueue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func main() {
// Cert won't be ready until manager starts, so start a goroutine here which
// will block until the cert is ready before setting up the controllers.
// Controllers who register after manager starts will start directly.
go setupControllers(mgr, cCache, queues, certsReady, &cfg, serverVersionFetcher)
go setupControllers(ctx, mgr, cCache, queues, certsReady, &cfg, serverVersionFetcher)

go queues.CleanUpOnContext(ctx)
go cCache.CleanUpOnContext(ctx)
Expand Down Expand Up @@ -229,7 +229,7 @@ func setupIndexes(ctx context.Context, mgr ctrl.Manager, cfg *configapi.Configur
return jobframework.SetupIndexes(ctx, mgr.GetFieldIndexer(), opts...)
}

func setupControllers(mgr ctrl.Manager, cCache *cache.Cache, queues *queue.Manager, certsReady chan struct{}, cfg *configapi.Configuration, serverVersionFetcher *kubeversion.ServerVersionFetcher) {
func setupControllers(ctx context.Context, mgr ctrl.Manager, cCache *cache.Cache, queues *queue.Manager, certsReady chan struct{}, cfg *configapi.Configuration, serverVersionFetcher *kubeversion.ServerVersionFetcher) {
// The controllers won't work until the webhooks are operating, and the webhook won't work until the
// certs are all in place.
cert.WaitForCertsReady(setupLog, certsReady)
Expand Down Expand Up @@ -282,7 +282,7 @@ func setupControllers(mgr ctrl.Manager, cCache *cache.Cache, queues *queue.Manag
jobframework.WithCache(cCache),
jobframework.WithQueues(queues),
}
if err := jobframework.SetupControllers(mgr, setupLog, opts...); err != nil {
if err := jobframework.SetupControllers(ctx, mgr, setupLog, opts...); err != nil {
setupLog.Error(err, "Unable to create controller or webhook", "kubernetesVersion", serverVersionFetcher.GetServerVersion())
os.Exit(1)
}
Expand Down
16 changes: 14 additions & 2 deletions pkg/controller/jobframework/integrationmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"sort"
"sync"
"testing"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -76,6 +77,7 @@ type integrationManager struct {
integrations map[string]IntegrationCallbacks
enabledIntegrations set.Set[string]
externalIntegrations map[string]runtime.Object
mu sync.RWMutex
}

var manager integrationManager
Expand Down Expand Up @@ -147,7 +149,15 @@ func (m *integrationManager) getExternal(kindArg string) (runtime.Object, bool)
return jt, f
}

func (m *integrationManager) getEnabledIntegrations() set.Set[string] {
m.mu.RLock()
defer m.mu.RUnlock()
return m.enabledIntegrations.Clone()
}

func (m *integrationManager) enableIntegration(name string) {
m.mu.Lock()
defer m.mu.Unlock()
if m.enabledIntegrations == nil {
m.enabledIntegrations = set.New(name)
} else {
Expand All @@ -163,7 +173,7 @@ func (m *integrationManager) getList() []string {
}

func (m *integrationManager) getJobTypeForOwner(ownerRef *metav1.OwnerReference) runtime.Object {
for jobKey := range m.enabledIntegrations {
for jobKey := range m.getEnabledIntegrations() {
cbs, found := m.integrations[jobKey]
if found && cbs.IsManagingObjectsOwner != nil && cbs.IsManagingObjectsOwner(ownerRef) {
return cbs.JobType
Expand Down Expand Up @@ -207,12 +217,14 @@ func EnableIntegration(name string) {
// Mark the frameworks identified by names and return a revert function.
func EnableIntegrationsForTest(tb testing.TB, names ...string) func() {
tb.Helper()
old := manager.enabledIntegrations.Clone()
old := manager.getEnabledIntegrations()
for _, name := range names {
manager.enableIntegration(name)
}
return func() {
manager.mu.Lock()
manager.enabledIntegrations = old
manager.mu.Unlock()
}
}

Expand Down
80 changes: 64 additions & 16 deletions pkg/controller/jobframework/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,24 @@ import (
"errors"
"fmt"
"os"
"time"

"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/util/workqueue"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"

"sigs.k8s.io/kueue/pkg/controller/jobs/noop"
)

const (
baseBackoffWaitForIntegration = 1 * time.Second
maxBackoffWaitForIntegration = 2 * time.Minute
)

var (
errFailedMappingResource = errors.New("restMapper failed mapping resource")
)
Expand All @@ -44,11 +52,11 @@ var (
// this function needs to be called after the certs get ready because the controllers won't work
// until the webhooks are operating, and the webhook won't work until the
// certs are all in place.
func SetupControllers(mgr ctrl.Manager, log logr.Logger, opts ...Option) error {
return manager.setupControllers(mgr, log, opts...)
func SetupControllers(ctx context.Context, mgr ctrl.Manager, log logr.Logger, opts ...Option) error {
return manager.setupControllers(ctx, mgr, log, opts...)
}

func (m *integrationManager) setupControllers(mgr ctrl.Manager, log logr.Logger, opts ...Option) error {
func (m *integrationManager) setupControllers(ctx context.Context, mgr ctrl.Manager, log logr.Logger, opts ...Option) error {
options := ProcessOptions(opts...)

for fwkName := range options.EnabledExternalFrameworks {
Expand All @@ -71,25 +79,21 @@ func (m *integrationManager) setupControllers(mgr ctrl.Manager, log logr.Logger,
if err != nil {
return fmt.Errorf("%s: %w: %w", fwkNamePrefix, errFailedMappingResource, err)
}
if _, err = mgr.GetRESTMapper().RESTMapping(gvk.GroupKind(), gvk.Version); err != nil {
if err := restMappingExists(mgr, gvk); err != nil {
if !meta.IsNoMatchError(err) {
return fmt.Errorf("%s: %w", fwkNamePrefix, err)
}
logger.Info("No matching API in the server for job framework, skipped setup of controller and webhook")
go waitForAPI(ctx, mgr, log, gvk, func() {
log.Info("API now available, starting controller and webhook", "gvk", gvk)
if err := m.setupControllerAndWebhook(mgr, name, fwkNamePrefix, cb, options, opts...); err != nil {
log.Error(err, "Failed to setup controller and webhook for job framework")
}
})
} else {
if err = cb.NewReconciler(
mgr.GetClient(),
mgr.GetEventRecorderFor(fmt.Sprintf("%s-%s-controller", name, options.ManagerName)),
opts...,
).SetupWithManager(mgr); err != nil {
return fmt.Errorf("%s: %w", fwkNamePrefix, err)
}
if err = cb.SetupWebhook(mgr, opts...); err != nil {
return fmt.Errorf("%s: unable to create webhook: %w", fwkNamePrefix, err)
if err := m.setupControllerAndWebhook(mgr, name, fwkNamePrefix, cb, options, opts...); err != nil {
return err
}
m.enableIntegration(name)
logger.Info("Set up controller and webhook for job framework")
return nil
}
}
if err := noop.SetupWebhook(mgr, cb.JobType); err != nil {
Expand All @@ -99,6 +103,50 @@ func (m *integrationManager) setupControllers(mgr ctrl.Manager, log logr.Logger,
})
}

func (m *integrationManager) setupControllerAndWebhook(mgr ctrl.Manager, name string, fwkNamePrefix string, cb IntegrationCallbacks, options Options, opts ...Option) error {
if err := cb.NewReconciler(
mgr.GetClient(),
mgr.GetEventRecorderFor(fmt.Sprintf("%s-%s-controller", name, options.ManagerName)),
opts...,
).SetupWithManager(mgr); err != nil {
return fmt.Errorf("%s: %w", fwkNamePrefix, err)
}
if err := cb.SetupWebhook(mgr, opts...); err != nil {
return fmt.Errorf("%s: unable to create webhook: %w", fwkNamePrefix, err)
}
m.enableIntegration(name)
return nil
}

func waitForAPI(ctx context.Context, mgr ctrl.Manager, log logr.Logger, gvk schema.GroupVersionKind, action func()) {
rateLimiter := workqueue.NewItemExponentialFailureRateLimiter(baseBackoffWaitForIntegration, maxBackoffWaitForIntegration)
item := gvk.String()
for {
err := restMappingExists(mgr, gvk)
if err == nil {
rateLimiter.Forget(item)
action()
return
} else if !meta.IsNoMatchError(err) {
log.Error(err, "Failed to get REST mapping for gvk", "gvk", gvk)
}
select {
case <-ctx.Done():
return
case <-time.After(rateLimiter.When(item)):
continue
}
}
}

func restMappingExists(mgr ctrl.Manager, gvk schema.GroupVersionKind) error {
_, err := mgr.GetRESTMapper().RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
return fmt.Errorf("failed to get REST mapping for %v: %w", gvk, err)
}
return nil
}

// SetupIndexes setups the indexers for integrations.
// When the platform developers implement a separate kueue-manager to manage the in-house custom jobs,
// they can easily setup indexers for the in-house custom jobs.
Expand Down
80 changes: 75 additions & 5 deletions pkg/controller/jobframework/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ package jobframework
import (
"context"
"net/http"
"strings"
"sync"
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
Expand Down Expand Up @@ -68,11 +71,20 @@ func TestSetupControllers(t *testing.T) {
AddToScheme: testAddToScheme,
CanSupportIntegration: testCanSupportIntegration,
},
"ray.io/raycluster": {
NewReconciler: testNewReconciler,
SetupWebhook: testSetupWebhook,
JobType: &rayv1.RayCluster{},
SetupIndexes: testSetupIndexes,
AddToScheme: testAddToScheme,
CanSupportIntegration: testCanSupportIntegration,
},
}

cases := map[string]struct {
opts []Option
mapperGVKs []schema.GroupVersionKind
delayedGVKs []*schema.GroupVersionKind
wantError error
wantEnabledIntegrations []string
}{
Expand All @@ -99,6 +111,20 @@ func TestSetupControllers(t *testing.T) {
},
wantEnabledIntegrations: []string{"batch/job"},
},
"mapper doesn't have ray.io/raycluster when Controllers have been setup, but eventually does": {
opts: []Option{
WithEnabledFrameworks([]string{"batch/job", "kubeflow.org/mpijob", "ray.io/raycluster"}),
},
mapperGVKs: []schema.GroupVersionKind{
batchv1.SchemeGroupVersion.WithKind("Job"),
kubeflow.SchemeGroupVersionKind,
// Not including RayCluster
},
delayedGVKs: []*schema.GroupVersionKind{
{Group: "ray.io", Version: "v1", Kind: "RayCluster"},
},
wantEnabledIntegrations: []string{"batch/job", "kubeflow.org/mpijob", "ray.io/raycluster"},
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
Expand All @@ -110,7 +136,7 @@ func TestSetupControllers(t *testing.T) {
}
}

_, logger := utiltesting.ContextWithLog(t)
ctx, logger := utiltesting.ContextWithLog(t)
k8sClient := utiltesting.NewClientBuilder(jobset.AddToScheme, kubeflow.AddToScheme, kftraining.AddToScheme, rayv1.AddToScheme).Build()

mgrOpts := ctrlmgr.Options{
Expand All @@ -124,29 +150,73 @@ func TestSetupControllers(t *testing.T) {
gvs = append(gvs, gvk.GroupVersion())
}
mapper := apimeta.NewDefaultRESTMapper(gvs)
testMapper := &TestRESTMapper{
DefaultRESTMapper: mapper,
lock: sync.RWMutex{},
}
for _, gvk := range tc.mapperGVKs {
mapper.Add(gvk, apimeta.RESTScopeNamespace)
testMapper.Add(gvk, apimeta.RESTScopeNamespace)
}
return mapper, nil
return testMapper, nil
},
}
mgr, err := ctrlmgr.New(&rest.Config{}, mgrOpts)
if err != nil {
t.Fatalf("Failed to setup manager: %v", err)
}

gotError := manager.setupControllers(mgr, logger, tc.opts...)
gotError := manager.setupControllers(ctx, mgr, logger, tc.opts...)
if diff := cmp.Diff(tc.wantError, gotError, cmpopts.EquateErrors()); len(diff) != 0 {
t.Errorf("Unexpected error from SetupControllers (-want,+got):\n%s", diff)
}

if diff := cmp.Diff(tc.wantEnabledIntegrations, manager.enabledIntegrations.SortedList()); len(diff) != 0 {
if len(tc.delayedGVKs) > 0 {
simulateDelayedIntegration(mgr, tc.delayedGVKs)
for _, gvk := range tc.delayedGVKs {
testDelayedIntegration(&manager, gvk.Group+"/"+strings.ToLower(gvk.Kind))
}
}

diff := cmp.Diff(tc.wantEnabledIntegrations, manager.getEnabledIntegrations().SortedList())
if len(diff) != 0 {
t.Errorf("Unexpected enabled integrations (-want,+got):\n%s", diff)
}
})
}
}

type TestRESTMapper struct {
*apimeta.DefaultRESTMapper
lock sync.RWMutex
}

func (m *TestRESTMapper) RESTMapping(gk schema.GroupKind, versions ...string) (*apimeta.RESTMapping, error) {
m.lock.Lock()
defer m.lock.Unlock()
return m.DefaultRESTMapper.RESTMapping(gk, versions...)
}

// Simulates the delayed availability of GVKs
func simulateDelayedIntegration(mgr ctrlmgr.Manager, delayedGVKs []*schema.GroupVersionKind) {
mapper := mgr.GetRESTMapper().(*TestRESTMapper)
mapper.lock.Lock()
defer mapper.lock.Unlock()

for _, gvk := range delayedGVKs {
mapper.Add(*gvk, apimeta.RESTScopeNamespace)
}
}

func testDelayedIntegration(manager *integrationManager, crdName string) {
for {
_, ok := manager.getEnabledIntegrations()[crdName]
if ok {
break
}
time.Sleep(10 * time.Millisecond)
}
}

func TestSetupIndexes(t *testing.T) {
testNamespace := "test"

Expand Down

0 comments on commit ee26f96

Please sign in to comment.