Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove streams delete and extend unit tests #2737

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 1 addition & 9 deletions pkg/cluster/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,15 +451,6 @@ func (c *Cluster) syncStream(appId string) error {
if stream.Spec.ApplicationId != appId {
continue
}
if streamExists {
c.logger.Warningf("more than one event stream with applicationId %s found, delete it", appId)
if err = c.KubeClient.FabricEventStreams(stream.ObjectMeta.Namespace).Delete(context.TODO(), stream.ObjectMeta.Name, metav1.DeleteOptions{}); err != nil {
c.logger.Errorf("could not delete event stream %q with applicationId %s: %v", stream.ObjectMeta.Name, appId, err)
} else {
c.logger.Infof("redundant event stream %q with applicationId %s has been successfully deleted", stream.ObjectMeta.Name, appId)
}
continue
}
streamExists = true
desiredStreams := c.generateFabricEventStream(appId)
if !reflect.DeepEqual(stream.ObjectMeta.OwnerReferences, desiredStreams.ObjectMeta.OwnerReferences) {
Expand All @@ -482,6 +473,7 @@ func (c *Cluster) syncStream(appId string) error {
c.Streams[appId] = updatedStream
c.logger.Infof("event streams %q with applicationId %s have been successfully updated", updatedStream.Name, appId)
}
break
}

if !streamExists {
Expand Down
158 changes: 104 additions & 54 deletions pkg/cluster/streams_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ var (
Namespace: namespace,
Labels: map[string]string{
"application": "spilo",
"cluster-name": fmt.Sprintf("%s-2", clusterName),
"cluster-name": clusterName,
"team": "acid",
},
OwnerReferences: []metav1.OwnerReference{
Expand Down Expand Up @@ -494,14 +494,13 @@ func TestSyncStreams(t *testing.T) {
OpConfig: config.Config{
PodManagementPolicy: "ordered_ready",
Resources: config.Resources{
ClusterLabels: map[string]string{"application": "spilo"},
ClusterNameLabel: "cluster-name",
DefaultCPURequest: "300m",
DefaultCPULimit: "300m",
DefaultMemoryRequest: "300Mi",
DefaultMemoryLimit: "300Mi",
EnableOwnerReferences: util.True(),
PodRoleLabel: "spilo-role",
ClusterLabels: map[string]string{"application": "spilo"},
ClusterNameLabel: "cluster-name",
DefaultCPURequest: "300m",
DefaultCPULimit: "300m",
DefaultMemoryRequest: "300Mi",
DefaultMemoryLimit: "300Mi",
PodRoleLabel: "spilo-role",
},
},
}, client, pg, logger, eventRecorder)
Expand All @@ -514,33 +513,17 @@ func TestSyncStreams(t *testing.T) {
err = cluster.syncStream(appId)
assert.NoError(t, err)

// create a second stream with same spec but with different name
createdStream, err := cluster.KubeClient.FabricEventStreams(namespace).Create(
context.TODO(), fes, metav1.CreateOptions{})
// sync the stream again
err = cluster.syncStream(appId)
assert.NoError(t, err)
assert.Equal(t, createdStream.Spec.ApplicationId, appId)

// check that two streams exist
// check that only one stream remains after sync
listOptions := metav1.ListOptions{
LabelSelector: cluster.labelsSet(true).String(),
}
streams, err := cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions)
assert.NoError(t, err)
assert.Equalf(t, 2, len(streams.Items), "unexpected number of streams found: got %d, but expected only 2", len(streams.Items))

// sync the stream which should remove the redundant stream
err = cluster.syncStream(appId)
assert.NoError(t, err)

// check that only one stream remains after sync
streams, err = cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions)
assert.NoError(t, err)
assert.Equalf(t, 1, len(streams.Items), "unexpected number of streams found: got %d, but expected only 1", len(streams.Items))

// check owner references
if !reflect.DeepEqual(streams.Items[0].OwnerReferences, cluster.ownerReferences()) {
t.Errorf("unexpected owner references, expected %#v, got %#v", cluster.ownerReferences(), streams.Items[0].OwnerReferences)
}
}

func TestSameStreams(t *testing.T) {
Expand Down Expand Up @@ -663,13 +646,14 @@ func TestUpdateStreams(t *testing.T) {
OpConfig: config.Config{
PodManagementPolicy: "ordered_ready",
Resources: config.Resources{
ClusterLabels: map[string]string{"application": "spilo"},
ClusterNameLabel: "cluster-name",
DefaultCPURequest: "300m",
DefaultCPULimit: "300m",
DefaultMemoryRequest: "300Mi",
DefaultMemoryLimit: "300Mi",
PodRoleLabel: "spilo-role",
ClusterLabels: map[string]string{"application": "spilo"},
ClusterNameLabel: "cluster-name",
DefaultCPURequest: "300m",
DefaultCPULimit: "300m",
DefaultMemoryRequest: "300Mi",
DefaultMemoryLimit: "300Mi",
EnableOwnerReferences: util.True(),
PodRoleLabel: "spilo-role",
},
},
}, client, pg, logger, eventRecorder)
Expand All @@ -678,10 +662,31 @@ func TestUpdateStreams(t *testing.T) {
context.TODO(), &pg, metav1.CreateOptions{})
assert.NoError(t, err)

// create the stream
// create stream with different owner reference
fes.ObjectMeta.Name = fmt.Sprintf("%s-12345", pg.Name)
fes.ObjectMeta.Labels["cluster-name"] = pg.Name
createdStream, err := cluster.KubeClient.FabricEventStreams(namespace).Create(
context.TODO(), fes, metav1.CreateOptions{})
assert.NoError(t, err)
assert.Equal(t, createdStream.Spec.ApplicationId, appId)

// sync the stream which should update the owner reference
err = cluster.syncStream(appId)
assert.NoError(t, err)

// check that only one stream exists after sync
listOptions := metav1.ListOptions{
LabelSelector: cluster.labelsSet(true).String(),
}
streams, err := cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions)
assert.NoError(t, err)
assert.Equalf(t, 1, len(streams.Items), "unexpected number of streams found: got %d, but expected only 1", len(streams.Items))

// compare owner references
if !reflect.DeepEqual(streams.Items[0].OwnerReferences, cluster.ownerReferences()) {
t.Errorf("unexpected owner references, expected %#v, got %#v", cluster.ownerReferences(), streams.Items[0].OwnerReferences)
}

// change specs of streams and patch CRD
for i, stream := range pg.Spec.Streams {
if stream.ApplicationId == appId {
Expand All @@ -694,10 +699,7 @@ func TestUpdateStreams(t *testing.T) {
}

// compare stream returned from API with expected stream
listOptions := metav1.ListOptions{
LabelSelector: cluster.labelsSet(true).String(),
}
streams := patchPostgresqlStreams(t, cluster, &pg.Spec, listOptions)
streams = patchPostgresqlStreams(t, cluster, &pg.Spec, listOptions)
result := cluster.generateFabricEventStream(appId)
if match, _ := cluster.compareStreams(&streams.Items[0], result); !match {
t.Errorf("Malformed FabricEventStream after updating manifest, expected %#v, got %#v", streams.Items[0], result)
Expand All @@ -716,9 +718,51 @@ func TestUpdateStreams(t *testing.T) {
if match, _ := cluster.compareStreams(&streams.Items[0], result); !match {
t.Errorf("Malformed FabricEventStream after disabling event recovery, expected %#v, got %#v", streams.Items[0], result)
}
}

mockClient := k8sutil.NewMockKubernetesClient()
cluster.KubeClient.CustomResourceDefinitionsGetter = mockClient.CustomResourceDefinitionsGetter
func patchPostgresqlStreams(t *testing.T, cluster *Cluster, pgSpec *acidv1.PostgresSpec, listOptions metav1.ListOptions) (streams *zalandov1.FabricEventStreamList) {
patchData, err := specPatch(pgSpec)
assert.NoError(t, err)

pgPatched, err := cluster.KubeClient.Postgresqls(namespace).Patch(
context.TODO(), cluster.Name, types.MergePatchType, patchData, metav1.PatchOptions{}, "spec")
assert.NoError(t, err)

cluster.Postgresql.Spec = pgPatched.Spec
err = cluster.syncStream(appId)
assert.NoError(t, err)

streams, err = cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions)
assert.NoError(t, err)

return streams
}

func TestDeleteStreams(t *testing.T) {
pg.Name = fmt.Sprintf("%s-4", pg.Name)
var cluster = New(
Config{
OpConfig: config.Config{
PodManagementPolicy: "ordered_ready",
Resources: config.Resources{
ClusterLabels: map[string]string{"application": "spilo"},
ClusterNameLabel: "cluster-name",
DefaultCPURequest: "300m",
DefaultCPULimit: "300m",
DefaultMemoryRequest: "300Mi",
DefaultMemoryLimit: "300Mi",
PodRoleLabel: "spilo-role",
},
},
}, client, pg, logger, eventRecorder)

_, err := cluster.KubeClient.Postgresqls(namespace).Create(
context.TODO(), &pg, metav1.CreateOptions{})
assert.NoError(t, err)

// create the stream
err = cluster.syncStream(appId)
assert.NoError(t, err)

// remove streams from manifest
pg.Spec.Streams = nil
Expand All @@ -729,26 +773,32 @@ func TestUpdateStreams(t *testing.T) {
appIds := getDistinctApplicationIds(pgUpdated.Spec.Streams)
cluster.cleanupRemovedStreams(appIds)

streams, err = cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions)
if len(streams.Items) > 0 || err != nil {
t.Errorf("stream resource has not been removed or unexpected error %v", err)
// check that streams have been deleted
listOptions := metav1.ListOptions{
LabelSelector: cluster.labelsSet(true).String(),
}
}

func patchPostgresqlStreams(t *testing.T, cluster *Cluster, pgSpec *acidv1.PostgresSpec, listOptions metav1.ListOptions) (streams *zalandov1.FabricEventStreamList) {
patchData, err := specPatch(pgSpec)
streams, err := cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions)
assert.NoError(t, err)
assert.Equalf(t, 0, len(streams.Items), "unexpected number of streams found: got %d, but expected none", len(streams.Items))

pgPatched, err := cluster.KubeClient.Postgresqls(namespace).Patch(
context.TODO(), cluster.Name, types.MergePatchType, patchData, metav1.PatchOptions{}, "spec")
// create stream to test deleteStreams code
fes.ObjectMeta.Name = fmt.Sprintf("%s-12345", pg.Name)
fes.ObjectMeta.Labels["cluster-name"] = pg.Name
_, err = cluster.KubeClient.FabricEventStreams(namespace).Create(
context.TODO(), fes, metav1.CreateOptions{})
assert.NoError(t, err)

cluster.Postgresql.Spec = pgPatched.Spec
// sync it once to cluster struct
err = cluster.syncStream(appId)
assert.NoError(t, err)

// we need a mock client because deleteStreams checks for CRD existance
mockClient := k8sutil.NewMockKubernetesClient()
cluster.KubeClient.CustomResourceDefinitionsGetter = mockClient.CustomResourceDefinitionsGetter
cluster.deleteStreams()

// check that streams have been deleted
streams, err = cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions)
assert.NoError(t, err)

return streams
assert.Equalf(t, 0, len(streams.Items), "unexpected number of streams found: got %d, but expected none", len(streams.Items))
}
Loading