Skip to content
This repository has been archived by the owner on Sep 2, 2024. It is now read-only.

Commit

Permalink
Retry on Webhook EOF Errors (#978)
Browse files Browse the repository at this point in the history
Mitigation for knative/pkg#1509.

Same fix was used in eventing core to mitigate webhook EOF
errors.

Signed-off-by: Pierangelo Di Pilato <[email protected]>
  • Loading branch information
pierDipi authored Nov 9, 2021
1 parent 6f97941 commit 86d36b6
Showing 1 changed file with 49 additions and 33 deletions.
82 changes: 49 additions & 33 deletions test/lib/creation.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,75 +31,91 @@ import (
)

func CreateKafkaChannelV1Beta1OrFail(c *testlib.Client, kafkaChannel *channelsv1beta1.KafkaChannel) {
kafkaChannelClientSet, err := kafkaclientset.NewForConfig(c.Config)
client, err := kafkaclientset.NewForConfig(c.Config)
if err != nil {
c.T.Fatalf("Failed to create v1beta1 KafkaChannel client: %v", err)
}

kChannels := kafkaChannelClientSet.MessagingV1beta1().KafkaChannels(c.Namespace)
if createdKafkaChannel, err := kChannels.Create(context.Background(), kafkaChannel, metav1.CreateOptions{}); err != nil {
c.T.Fatalf("Failed to create v1beta1 KafkaChannel %q: %v", kafkaChannel.Name, err)
} else {
err = c.RetryWebhookErrors(func(i int) error {
createdKafkaChannel, err := client.MessagingV1beta1().KafkaChannels(c.Namespace).Create(context.Background(), kafkaChannel, metav1.CreateOptions{})
if err != nil {
return err
}
c.Tracker.AddObj(createdKafkaChannel)
return nil
})
if err != nil {
c.T.Fatalf("Failed to create v1beta1 KafkaChannel %q: %v", kafkaChannel.Name, err)
}
}

func GetKafkaChannelV1Beta1OrFail(c *testlib.Client, kafkaChannel string) *channelsv1beta1.KafkaChannel {
kafkaChannelClientSet, err := kafkaclientset.NewForConfig(c.Config)
client, err := kafkaclientset.NewForConfig(c.Config)
if err != nil {
c.T.Fatalf("Failed to create v1beta1 KafkaChannel client: %v", err)
}

kChannels := kafkaChannelClientSet.MessagingV1beta1().KafkaChannels(c.Namespace)
if kcObj, err := kChannels.Get(context.Background(), kafkaChannel, metav1.GetOptions{}); err != nil {
var kcObj *channelsv1beta1.KafkaChannel
err = c.RetryWebhookErrors(func(i int) error {
kcObj, err = client.MessagingV1beta1().KafkaChannels(c.Namespace).Get(context.Background(), kafkaChannel, metav1.GetOptions{})
return err
})
if err != nil {
c.T.Fatalf("Failed to get v1beta1 KafkaChannel %q: %v", kafkaChannel, err)
} else {
return kcObj
}
return nil
return kcObj
}

func CreateKafkaSourceV1Beta1OrFail(c *testlib.Client, kafkaSource *sourcesv1beta1.KafkaSource) {
kafkaSourceClientSet, err := kafkaclientset.NewForConfig(c.Config)
client, err := kafkaclientset.NewForConfig(c.Config)
if err != nil {
c.T.Fatalf("Failed to create v1beta1 KafkaSource client: %v", err)
}

kSources := kafkaSourceClientSet.SourcesV1beta1().KafkaSources(c.Namespace)
if createdKafkaSource, err := kSources.Create(context.Background(), kafkaSource, metav1.CreateOptions{}); err != nil {
c.T.Fatalf("Failed to create v1beta1 KafkaSource %q: %v", kafkaSource.Name, err)
} else {
err = c.RetryWebhookErrors(func(i int) error {
createdKafkaSource, err := client.SourcesV1beta1().KafkaSources(c.Namespace).Create(context.Background(), kafkaSource, metav1.CreateOptions{})
if err != nil {
return err
}
c.Tracker.AddObj(createdKafkaSource)
return nil
})
if err != nil {
c.T.Fatalf("Failed to create v1beta1 KafkaSource %q: %v", kafkaSource.Name, err)
}
}

func GetKafkaSourceV1Beta1OrFail(c *testlib.Client, kafkaSource string) *sourcesv1beta1.KafkaSource {
kafkaSourceClientSet, err := kafkaclientset.NewForConfig(c.Config)
client, err := kafkaclientset.NewForConfig(c.Config)
if err != nil {
c.T.Fatalf("Failed to create v1beta1 KafkaSource client: %v", err)
}

kSources := kafkaSourceClientSet.SourcesV1beta1().KafkaSources(c.Namespace)
if ksObj, err := kSources.Get(context.Background(), kafkaSource, metav1.GetOptions{}); err != nil {
var ksObj *sourcesv1beta1.KafkaSource
err = c.RetryWebhookErrors(func(i int) error {
ksObj, err = client.SourcesV1beta1().KafkaSources(c.Namespace).Get(context.Background(), kafkaSource, metav1.GetOptions{})
return err
})
if err != nil {
c.T.Fatalf("Failed to get v1beta1 KafkaSource %q: %v", kafkaSource, err)
} else {
return ksObj
}
return nil
return ksObj
}

func UpdateKafkaSourceV1Beta1OrFail(c *testlib.Client, kafkaSource *sourcesv1beta1.KafkaSource) {
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
latestKafkaSource := GetKafkaSourceV1Beta1OrFail(c, kafkaSource.Name)
kafkaSource.Spec.DeepCopyInto(&latestKafkaSource.Spec)
kafkaSourceClientSet, err := kafkaclientset.NewForConfig(c.Config)
if err != nil {
c.T.Fatalf("Failed to create v1beta1 KafkaSource client: %v", err)
}

kSources := kafkaSourceClientSet.SourcesV1beta1().KafkaSources(c.Namespace)
_, err = kSources.Update(context.Background(), latestKafkaSource, metav1.UpdateOptions{})
return err
err := c.RetryWebhookErrors(func(i int) error {
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
latestKafkaSource := GetKafkaSourceV1Beta1OrFail(c, kafkaSource.Name)
kafkaSource.Spec.DeepCopyInto(&latestKafkaSource.Spec)
kafkaSourceClientSet, err := kafkaclientset.NewForConfig(c.Config)
if err != nil {
c.T.Fatalf("Failed to create v1beta1 KafkaSource client: %v", err)
}

kSources := kafkaSourceClientSet.SourcesV1beta1().KafkaSources(c.Namespace)
_, err = kSources.Update(context.Background(), latestKafkaSource, metav1.UpdateOptions{})
return err
})
})
if err != nil {
c.T.Fatalf("Failed to update v1beta1 KafkaSource %q: %v", kafkaSource.Name, err)
Expand Down

0 comments on commit 86d36b6

Please sign in to comment.