diff --git a/knative-operator/pkg/controller/knativekafka/knativekafka_controller.go b/knative-operator/pkg/controller/knativekafka/knativekafka_controller.go index 2c60d39220..4a00394997 100644 --- a/knative-operator/pkg/controller/knativekafka/knativekafka_controller.go +++ b/knative-operator/pkg/controller/knativekafka/knativekafka_controller.go @@ -362,6 +362,7 @@ func (r *ReconcileKnativeKafka) transform(manifest *mf.Manifest, instance *serve socommon.ApplyCABundlesTransform(), operatorcommon.OverridesTransform(instance.Spec.Workloads, logging.FromContext(context.TODO())), socommon.ConfigMapVolumeChecksumTransform(context.Background(), r.client, dependentConfigMaps), + socommon.JobsRemoveTTLSecondsAfterFinished(), injectNamespacedBrokerMonitoring(r.client)), socommon.DeprecatedAPIsTranformersFromConfig()...) tfs = append(tfs, rbacProxyTranforms...) diff --git a/openshift-knative-operator/pkg/common/job.go b/openshift-knative-operator/pkg/common/job.go index a39ee3b21c..ca141459bd 100644 --- a/openshift-knative-operator/pkg/common/job.go +++ b/openshift-knative-operator/pkg/common/job.go @@ -31,3 +31,19 @@ func VersionedJobNameTransform() mf.Transformer { return nil } } + +func JobsRemoveTTLSecondsAfterFinished() mf.Transformer { + return func(u *unstructured.Unstructured) error { + if u.GetKind() == "Job" { + job := &batchv1.Job{} + if err := scheme.Scheme.Convert(u, job, nil); err != nil { + return err + } + if job.Spec.TTLSecondsAfterFinished != nil { + job.Spec.TTLSecondsAfterFinished = nil + } + return scheme.Scheme.Convert(job, u, nil) + } + return nil + } +} diff --git a/openshift-knative-operator/pkg/common/job_test.go b/openshift-knative-operator/pkg/common/job_test.go index ae529ce784..e7d7220ccd 100644 --- a/openshift-knative-operator/pkg/common/job_test.go +++ b/openshift-knative-operator/pkg/common/job_test.go @@ -7,6 +7,7 @@ import ( "github.com/google/go-cmp/cmp" batchv1 "k8s.io/api/batch/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/pointer" util "knative.dev/operator/pkg/reconciler/common/testing" ) @@ -48,6 +49,24 @@ func TestJobGeneratedNameTransform(t *testing.T) { } +func TestJobsRemoveTTLSecondsAfterFinished(t *testing.T) { + got := createJob("", "gen") + got.Spec.TTLSecondsAfterFinished = pointer.Int32(32) + + expected := createJob("", "gen") + expected.Spec.TTLSecondsAfterFinished = nil + + u := util.MakeUnstructured(t, &got) + if err := JobsRemoveTTLSecondsAfterFinished()(&u); err != nil { + t.Fatal("Unexpected error from transformer", err) + } + + expectedU := util.MakeUnstructured(t, &expected) + if diff := cmp.Diff(u, expectedU); diff != "" { + t.Errorf("Got = %#v, want = %#v\n%s", u, expectedU, diff) + } +} + func createJob(name, gen string) batchv1.Job { return batchv1.Job{ TypeMeta: metav1.TypeMeta{ diff --git a/openshift-knative-operator/pkg/eventing/extension.go b/openshift-knative-operator/pkg/eventing/extension.go index 6c0d1d6ecc..59cc49bd1e 100644 --- a/openshift-knative-operator/pkg/eventing/extension.go +++ b/openshift-knative-operator/pkg/eventing/extension.go @@ -93,6 +93,7 @@ func (e *extension) Transformers(ke base.KComponent) []mf.Transformer { common.VersionedJobNameTransform(), common.InjectCommonEnvironment(), common.ApplyCABundlesTransform(), + common.JobsRemoveTTLSecondsAfterFinished(), } tf = append(tf, monitoring.GetEventingTransformers(ke)...) return append(tf, common.DeprecatedAPIsTranformers(e.kubeclient.Discovery())...) diff --git a/test/e2e/knative_eventing_test.go b/test/e2e/knative_eventing_test.go index dd5f2fa547..1f5f3981fe 100644 --- a/test/e2e/knative_eventing_test.go +++ b/test/e2e/knative_eventing_test.go @@ -2,17 +2,20 @@ package e2e import ( "context" + "fmt" "testing" - "github.com/openshift-knative/serverless-operator/test" - "github.com/openshift-knative/serverless-operator/test/monitoringe2e" - "github.com/openshift-knative/serverless-operator/test/upgrade" - "github.com/openshift-knative/serverless-operator/test/v1beta1" + batchv1 "k8s.io/api/batch/v1" "knative.dev/pkg/client/injection/kube/client" "knative.dev/pkg/injection/clients/dynamicclient" "knative.dev/pkg/logging" logtesting "knative.dev/pkg/logging/testing" "knative.dev/pkg/ptr" + + "github.com/openshift-knative/serverless-operator/test" + "github.com/openshift-knative/serverless-operator/test/monitoringe2e" + "github.com/openshift-knative/serverless-operator/test/upgrade" + "github.com/openshift-knative/serverless-operator/test/v1beta1" ) const ( @@ -71,6 +74,12 @@ func TestKnativeEventing(t *testing.T) { upgrade.VerifyPostInstallJobs(caCtx, upgrade.VerifyPostJobsConfig{ Namespace: eventingNamespace, FailOnNoJobs: true, + ValidateJob: func(j batchv1.Job) error { + if j.Spec.TTLSecondsAfterFinished != nil { + return fmt.Errorf("job %s/%s has TTLSecondsAfterFinished", eventingNamespace, j.Name) + } + return nil + }, }) }) diff --git a/test/e2ekafka/knative_kafka_test.go b/test/e2ekafka/knative_kafka_test.go index 2358727842..301a030d7d 100644 --- a/test/e2ekafka/knative_kafka_test.go +++ b/test/e2ekafka/knative_kafka_test.go @@ -2,9 +2,10 @@ package e2ekafka import ( "context" + "fmt" "testing" - "github.com/openshift-knative/serverless-operator/test/v1alpha1" + batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" duckv1 "knative.dev/pkg/apis/duck/v1" @@ -14,6 +15,8 @@ import ( logtesting "knative.dev/pkg/logging/testing" "knative.dev/pkg/ptr" + "github.com/openshift-knative/serverless-operator/test/v1alpha1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/messaging/v1beta1" @@ -193,6 +196,12 @@ func TestKnativeKafka(t *testing.T) { upgrade.VerifyPostInstallJobs(caCtx, upgrade.VerifyPostJobsConfig{ Namespace: knativeKafkaNamespace, FailOnNoJobs: true, + ValidateJob: func(j batchv1.Job) error { + if j.Spec.TTLSecondsAfterFinished != nil { + return fmt.Errorf("job %s/%s has TTLSecondsAfterFinished", knativeKafkaNamespace, j.Name) + } + return nil + }, }) }) } diff --git a/test/upgrade/verify_jobs.go b/test/upgrade/verify_jobs.go index 32c1a0727b..c0c0b13f86 100644 --- a/test/upgrade/verify_jobs.go +++ b/test/upgrade/verify_jobs.go @@ -5,17 +5,20 @@ import ( "fmt" "time" - "github.com/openshift-knative/serverless-operator/test" "golang.org/x/sync/errgroup" + batchv1 "k8s.io/api/batch/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" "knative.dev/pkg/test/upgrade" + + "github.com/openshift-knative/serverless-operator/test" ) type VerifyPostJobsConfig struct { Namespace string FailOnNoJobs bool + ValidateJob func(j batchv1.Job) error } func VerifyPostInstallJobs(ctx *test.Context, cfg VerifyPostJobsConfig) upgrade.Operation { @@ -44,6 +47,12 @@ func verifyPostInstallJobs(ctx context.Context, testCtx *test.Context, c upgrade for _, j := range jobs.Items { j := j + if cfg.ValidateJob != nil { + if err := cfg.ValidateJob(j); err != nil { + return fmt.Errorf("failed to validate job %s: %w", j.Name, err) + } + } + if j.Status.Succeeded > 0 { // We don't need to wait for a job that is already succeeded. // In addition, an already succeeded job might go away due to the job's TTL.