From 80f94baa6a5fcb8de8b41b5725f120523a7f3ca3 Mon Sep 17 00:00:00 2001 From: Pierangelo Di Pilato Date: Tue, 9 Nov 2021 15:33:35 +0100 Subject: [PATCH] Retry on Webhook EOF Errors Mitigation for https://github.com/knative/pkg/issues/1509. Same fix was used in eventing core to mitigate webhook EOF errors. Signed-off-by: Pierangelo Di Pilato --- test/lib/creation.go | 82 ++++++++++++++++++++++++++------------------ 1 file changed, 49 insertions(+), 33 deletions(-) diff --git a/test/lib/creation.go b/test/lib/creation.go index d667b36861..bb796517d6 100644 --- a/test/lib/creation.go +++ b/test/lib/creation.go @@ -31,75 +31,91 @@ import ( ) func CreateKafkaChannelV1Beta1OrFail(c *testlib.Client, kafkaChannel *channelsv1beta1.KafkaChannel) { - kafkaChannelClientSet, err := kafkaclientset.NewForConfig(c.Config) + client, err := kafkaclientset.NewForConfig(c.Config) if err != nil { c.T.Fatalf("Failed to create v1beta1 KafkaChannel client: %v", err) } - kChannels := kafkaChannelClientSet.MessagingV1beta1().KafkaChannels(c.Namespace) - if createdKafkaChannel, err := kChannels.Create(context.Background(), kafkaChannel, metav1.CreateOptions{}); err != nil { - c.T.Fatalf("Failed to create v1beta1 KafkaChannel %q: %v", kafkaChannel.Name, err) - } else { + err = c.RetryWebhookErrors(func(i int) error { + createdKafkaChannel, err := client.MessagingV1beta1().KafkaChannels(c.Namespace).Create(context.Background(), kafkaChannel, metav1.CreateOptions{}) + if err != nil { + return err + } c.Tracker.AddObj(createdKafkaChannel) + return nil + }) + if err != nil { + c.T.Fatalf("Failed to create v1beta1 KafkaChannel %q: %v", kafkaChannel.Name, err) } } func GetKafkaChannelV1Beta1OrFail(c *testlib.Client, kafkaChannel string) *channelsv1beta1.KafkaChannel { - kafkaChannelClientSet, err := kafkaclientset.NewForConfig(c.Config) + client, err := kafkaclientset.NewForConfig(c.Config) if err != nil { c.T.Fatalf("Failed to create v1beta1 KafkaChannel client: %v", err) } - kChannels := kafkaChannelClientSet.MessagingV1beta1().KafkaChannels(c.Namespace) - if kcObj, err := kChannels.Get(context.Background(), kafkaChannel, metav1.GetOptions{}); err != nil { + var kcObj *channelsv1beta1.KafkaChannel + err = c.RetryWebhookErrors(func(i int) error { + kcObj, err = client.MessagingV1beta1().KafkaChannels(c.Namespace).Get(context.Background(), kafkaChannel, metav1.GetOptions{}) + return err + }) + if err != nil { c.T.Fatalf("Failed to get v1beta1 KafkaChannel %q: %v", kafkaChannel, err) - } else { - return kcObj } - return nil + return kcObj } func CreateKafkaSourceV1Beta1OrFail(c *testlib.Client, kafkaSource *sourcesv1beta1.KafkaSource) { - kafkaSourceClientSet, err := kafkaclientset.NewForConfig(c.Config) + client, err := kafkaclientset.NewForConfig(c.Config) if err != nil { c.T.Fatalf("Failed to create v1beta1 KafkaSource client: %v", err) } - kSources := kafkaSourceClientSet.SourcesV1beta1().KafkaSources(c.Namespace) - if createdKafkaSource, err := kSources.Create(context.Background(), kafkaSource, metav1.CreateOptions{}); err != nil { - c.T.Fatalf("Failed to create v1beta1 KafkaSource %q: %v", kafkaSource.Name, err) - } else { + err = c.RetryWebhookErrors(func(i int) error { + createdKafkaSource, err := client.SourcesV1beta1().KafkaSources(c.Namespace).Create(context.Background(), kafkaSource, metav1.CreateOptions{}) + if err != nil { + return err + } c.Tracker.AddObj(createdKafkaSource) + return nil + }) + if err != nil { + c.T.Fatalf("Failed to create v1beta1 KafkaSource %q: %v", kafkaSource.Name, err) } } func GetKafkaSourceV1Beta1OrFail(c *testlib.Client, kafkaSource string) *sourcesv1beta1.KafkaSource { - kafkaSourceClientSet, err := kafkaclientset.NewForConfig(c.Config) + client, err := kafkaclientset.NewForConfig(c.Config) if err != nil { c.T.Fatalf("Failed to create v1beta1 KafkaSource client: %v", err) } - kSources := kafkaSourceClientSet.SourcesV1beta1().KafkaSources(c.Namespace) - if ksObj, err := kSources.Get(context.Background(), kafkaSource, metav1.GetOptions{}); err != nil { + var ksObj *sourcesv1beta1.KafkaSource + err = c.RetryWebhookErrors(func(i int) error { + ksObj, err = client.SourcesV1beta1().KafkaSources(c.Namespace).Get(context.Background(), kafkaSource, metav1.GetOptions{}) + return err + }) + if err != nil { c.T.Fatalf("Failed to get v1beta1 KafkaSource %q: %v", kafkaSource, err) - } else { - return ksObj } - return nil + return ksObj } func UpdateKafkaSourceV1Beta1OrFail(c *testlib.Client, kafkaSource *sourcesv1beta1.KafkaSource) { - err := retry.RetryOnConflict(retry.DefaultRetry, func() error { - latestKafkaSource := GetKafkaSourceV1Beta1OrFail(c, kafkaSource.Name) - kafkaSource.Spec.DeepCopyInto(&latestKafkaSource.Spec) - kafkaSourceClientSet, err := kafkaclientset.NewForConfig(c.Config) - if err != nil { - c.T.Fatalf("Failed to create v1beta1 KafkaSource client: %v", err) - } - - kSources := kafkaSourceClientSet.SourcesV1beta1().KafkaSources(c.Namespace) - _, err = kSources.Update(context.Background(), latestKafkaSource, metav1.UpdateOptions{}) - return err + err := c.RetryWebhookErrors(func(i int) error { + return retry.RetryOnConflict(retry.DefaultRetry, func() error { + latestKafkaSource := GetKafkaSourceV1Beta1OrFail(c, kafkaSource.Name) + kafkaSource.Spec.DeepCopyInto(&latestKafkaSource.Spec) + kafkaSourceClientSet, err := kafkaclientset.NewForConfig(c.Config) + if err != nil { + c.T.Fatalf("Failed to create v1beta1 KafkaSource client: %v", err) + } + + kSources := kafkaSourceClientSet.SourcesV1beta1().KafkaSources(c.Namespace) + _, err = kSources.Update(context.Background(), latestKafkaSource, metav1.UpdateOptions{}) + return err + }) }) if err != nil { c.T.Fatalf("Failed to update v1beta1 KafkaSource %q: %v", kafkaSource.Name, err)