diff --git a/charts/postgres-operator/crds/postgresqls.yaml b/charts/postgres-operator/crds/postgresqls.yaml index 8265f29e2..ebaf2d1f8 100644 --- a/charts/postgres-operator/crds/postgresqls.yaml +++ b/charts/postgres-operator/crds/postgresqls.yaml @@ -226,7 +226,7 @@ spec: type: array items: type: string - pattern: '^\ *((Mon|Tue|Wed|Thu|Fri|Sat|Sun):(2[0-3]|[01]?\d):([0-5]?\d)|(2[0-3]|[01]?\d):([0-5]?\d))-((Mon|Tue|Wed|Thu|Fri|Sat|Sun):(2[0-3]|[01]?\d):([0-5]?\d)|(2[0-3]|[01]?\d):([0-5]?\d))\ *$' + pattern: '^\ *((Mon|Tue|Wed|Thu|Fri|Sat|Sun):(2[0-3]|[01]?\d):([0-5]?\d)|(2[0-3]|[01]?\d):([0-5]?\d))-((2[0-3]|[01]?\d):([0-5]?\d)|(2[0-3]|[01]?\d):([0-5]?\d))\ *$' masterServiceAnnotations: type: object additionalProperties: diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index bd7dfef57..06e5c5231 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -2131,130 +2131,136 @@ def test_stream_resources(self): verbs=["create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"] ) cluster_role.rules.append(fes_cluster_role_rule) - k8s.api.rbac_api.patch_cluster_role("postgres-operator", cluster_role) - # create a table in one of the database of acid-minimal-cluster - create_stream_table = """ - CREATE TABLE test_table (id int, payload jsonb); - """ - self.query_database(leader.metadata.name, "foo", create_stream_table) + try: + k8s.api.rbac_api.patch_cluster_role("postgres-operator", cluster_role) - # update the manifest with the streams section - patch_streaming_config = { - "spec": { - "patroni": { - "slots": { - "manual_slot": { - "type": "physical" - } - } - }, - "streams": [ - { - "applicationId": "test-app", - "batchSize": 100, - "database": "foo", - "enableRecovery": True, - "tables": { - "test_table": { - "eventType": "test-event", - "idColumn": "id", - "payloadColumn": "payload", - "recoveryEventType": "test-event-dlq" + # create a table in one of the database of acid-minimal-cluster + create_stream_table = """ + CREATE TABLE test_table (id int, payload jsonb); + """ + self.query_database(leader.metadata.name, "foo", create_stream_table) + + # update the manifest with the streams section + patch_streaming_config = { + "spec": { + "patroni": { + "slots": { + "manual_slot": { + "type": "physical" } } }, - { - "applicationId": "test-app2", - "batchSize": 100, - "database": "foo", - "enableRecovery": True, - "tables": { - "test_non_exist_table": { - "eventType": "test-event", - "idColumn": "id", - "payloadColumn": "payload", - "recoveryEventType": "test-event-dlq" + "streams": [ + { + "applicationId": "test-app", + "batchSize": 100, + "database": "foo", + "enableRecovery": True, + "tables": { + "test_table": { + "eventType": "test-event", + "idColumn": "id", + "payloadColumn": "payload", + "recoveryEventType": "test-event-dlq" + } + } + }, + { + "applicationId": "test-app2", + "batchSize": 100, + "database": "foo", + "enableRecovery": True, + "tables": { + "test_non_exist_table": { + "eventType": "test-event", + "idColumn": "id", + "payloadColumn": "payload", + "recoveryEventType": "test-event-dlq" + } } } - } - ] + ] + } } - } - k8s.api.custom_objects_api.patch_namespaced_custom_object( - 'acid.zalan.do', 'v1', 'default', 'postgresqls', 'acid-minimal-cluster', patch_streaming_config) - self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") + k8s.api.custom_objects_api.patch_namespaced_custom_object( + 'acid.zalan.do', 'v1', 'default', 'postgresqls', 'acid-minimal-cluster', patch_streaming_config) + self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") - # check if publication, slot, and fes resource are created - get_publication_query = """ - SELECT * FROM pg_publication WHERE pubname = 'fes_foo_test_app'; - """ - get_slot_query = """ - SELECT * FROM pg_replication_slots WHERE slot_name = 'fes_foo_test_app'; - """ - self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_publication_query)), 1, - "Publication is not created", 10, 5) - self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_slot_query)), 1, - "Replication slot is not created", 10, 5) - self.eventuallyEqual(lambda: len(k8s.api.custom_objects_api.list_namespaced_custom_object( + # check if publication, slot, and fes resource are created + get_publication_query = """ + SELECT * FROM pg_publication WHERE pubname = 'fes_foo_test_app'; + """ + get_slot_query = """ + SELECT * FROM pg_replication_slots WHERE slot_name = 'fes_foo_test_app'; + """ + self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_publication_query)), 1, + "Publication is not created", 10, 5) + self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_slot_query)), 1, + "Replication slot is not created", 10, 5) + self.eventuallyEqual(lambda: len(k8s.api.custom_objects_api.list_namespaced_custom_object( "zalando.org", "v1", "default", "fabriceventstreams", label_selector="cluster-name=acid-minimal-cluster")["items"]), 1, "Could not find Fabric Event Stream resource", 10, 5) - # check if the non-existing table in the stream section does not create a publication and slot - get_publication_query_not_exist_table = """ - SELECT * FROM pg_publication WHERE pubname = 'fes_foo_test_app2'; - """ - get_slot_query_not_exist_table = """ - SELECT * FROM pg_replication_slots WHERE slot_name = 'fes_foo_test_app2'; - """ - self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_publication_query_not_exist_table)), 0, - "Publication is created for non-existing tables", 10, 5) - self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_slot_query_not_exist_table)), 0, - "Replication slot is created for non-existing tables", 10, 5) - - # grant create and ownership of test_table to foo_user, reset search path to default - grant_permission_foo_user = """ - GRANT CREATE ON DATABASE foo TO foo_user; - ALTER TABLE test_table OWNER TO foo_user; - ALTER ROLE foo_user RESET search_path; - """ - self.query_database(leader.metadata.name, "foo", grant_permission_foo_user) - # non-postgres user creates a publication - create_nonstream_publication = """ - CREATE PUBLICATION mypublication FOR TABLE test_table; - """ - self.query_database_with_user(leader.metadata.name, "foo", create_nonstream_publication, "foo_user") + # check if the non-existing table in the stream section does not create a publication and slot + get_publication_query_not_exist_table = """ + SELECT * FROM pg_publication WHERE pubname = 'fes_foo_test_app2'; + """ + get_slot_query_not_exist_table = """ + SELECT * FROM pg_replication_slots WHERE slot_name = 'fes_foo_test_app2'; + """ + self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_publication_query_not_exist_table)), 0, + "Publication is created for non-existing tables", 10, 5) + self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_slot_query_not_exist_table)), 0, + "Replication slot is created for non-existing tables", 10, 5) + + # grant create and ownership of test_table to foo_user, reset search path to default + grant_permission_foo_user = """ + GRANT CREATE ON DATABASE foo TO foo_user; + ALTER TABLE test_table OWNER TO foo_user; + ALTER ROLE foo_user RESET search_path; + """ + self.query_database(leader.metadata.name, "foo", grant_permission_foo_user) + # non-postgres user creates a publication + create_nonstream_publication = """ + CREATE PUBLICATION mypublication FOR TABLE test_table; + """ + self.query_database_with_user(leader.metadata.name, "foo", create_nonstream_publication, "foo_user") - # remove the streams section from the manifest - patch_streaming_config_removal = { - "spec": { - "streams": [] + # remove the streams section from the manifest + patch_streaming_config_removal = { + "spec": { + "streams": [] + } } - } - k8s.api.custom_objects_api.patch_namespaced_custom_object( - 'acid.zalan.do', 'v1', 'default', 'postgresqls', 'acid-minimal-cluster', patch_streaming_config_removal) - self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") + k8s.api.custom_objects_api.patch_namespaced_custom_object( + 'acid.zalan.do', 'v1', 'default', 'postgresqls', 'acid-minimal-cluster', patch_streaming_config_removal) + self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync") - # check if publication, slot, and fes resource are removed - self.eventuallyEqual(lambda: len(k8s.api.custom_objects_api.list_namespaced_custom_object( + # check if publication, slot, and fes resource are removed + self.eventuallyEqual(lambda: len(k8s.api.custom_objects_api.list_namespaced_custom_object( "zalando.org", "v1", "default", "fabriceventstreams", label_selector="cluster-name=acid-minimal-cluster")["items"]), 0, 'Could not delete Fabric Event Stream resource', 10, 5) - self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_publication_query)), 0, - "Publication is not deleted", 10, 5) - self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_slot_query)), 0, - "Replication slot is not deleted", 10, 5) - - # check the manual_slot and mypublication should not get deleted - get_manual_slot_query = """ - SELECT * FROM pg_replication_slots WHERE slot_name = 'manual_slot'; - """ - get_nonstream_publication_query = """ - SELECT * FROM pg_publication WHERE pubname = 'mypublication'; - """ - self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "postgres", get_manual_slot_query)), 1, - "Slot defined in patroni config is deleted", 10, 5) - self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_nonstream_publication_query)), 1, - "Publication defined not in stream section is deleted", 10, 5) + self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_publication_query)), 0, + "Publication is not deleted", 10, 5) + self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_slot_query)), 0, + "Replication slot is not deleted", 10, 5) + + # check the manual_slot and mypublication should not get deleted + get_manual_slot_query = """ + SELECT * FROM pg_replication_slots WHERE slot_name = 'manual_slot'; + """ + get_nonstream_publication_query = """ + SELECT * FROM pg_publication WHERE pubname = 'mypublication'; + """ + self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "postgres", get_manual_slot_query)), 1, + "Slot defined in patroni config is deleted", 10, 5) + self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_nonstream_publication_query)), 1, + "Publication defined not in stream section is deleted", 10, 5) + + except timeout_decorator.TimeoutError: + print('Operator log: {}'.format(k8s.get_operator_log())) + raise @timeout_decorator.timeout(TEST_TIMEOUT_SEC) def test_taint_based_eviction(self): diff --git a/manifests/postgresql.crd.yaml b/manifests/postgresql.crd.yaml index 75e8ab342..9207c83d4 100644 --- a/manifests/postgresql.crd.yaml +++ b/manifests/postgresql.crd.yaml @@ -224,7 +224,7 @@ spec: type: array items: type: string - pattern: '^\ *((Mon|Tue|Wed|Thu|Fri|Sat|Sun):(2[0-3]|[01]?\d):([0-5]?\d)|(2[0-3]|[01]?\d):([0-5]?\d))-((Mon|Tue|Wed|Thu|Fri|Sat|Sun):(2[0-3]|[01]?\d):([0-5]?\d)|(2[0-3]|[01]?\d):([0-5]?\d))\ *$' + pattern: '^\ *((Mon|Tue|Wed|Thu|Fri|Sat|Sun):(2[0-3]|[01]?\d):([0-5]?\d)|(2[0-3]|[01]?\d):([0-5]?\d))-((2[0-3]|[01]?\d):([0-5]?\d)|(2[0-3]|[01]?\d):([0-5]?\d))\ *$' masterServiceAnnotations: type: object additionalProperties: diff --git a/pkg/apis/acid.zalan.do/v1/postgresql_type.go b/pkg/apis/acid.zalan.do/v1/postgresql_type.go index 612cf7041..3d731743f 100644 --- a/pkg/apis/acid.zalan.do/v1/postgresql_type.go +++ b/pkg/apis/acid.zalan.do/v1/postgresql_type.go @@ -133,7 +133,7 @@ type Volume struct { Size string `json:"size"` StorageClass string `json:"storageClass,omitempty"` SubPath string `json:"subPath,omitempty"` - IsSubPathExpr *bool `json:"isSubPathExpr,omitemtpy"` + IsSubPathExpr *bool `json:"isSubPathExpr,omitempty"` Iops *int64 `json:"iops,omitempty"` Throughput *int64 `json:"throughput,omitempty"` VolumeType string `json:"type,omitempty"` @@ -144,7 +144,7 @@ type AdditionalVolume struct { Name string `json:"name"` MountPath string `json:"mountPath"` SubPath string `json:"subPath,omitempty"` - IsSubPathExpr *bool `json:"isSubPathExpr,omitemtpy"` + IsSubPathExpr *bool `json:"isSubPathExpr,omitempty"` TargetContainers []string `json:"targetContainers"` VolumeSource v1.VolumeSource `json:"volumeSource"` } diff --git a/pkg/apis/acid.zalan.do/v1/util_test.go b/pkg/apis/acid.zalan.do/v1/util_test.go index bb01816c0..bef6cc3ec 100644 --- a/pkg/apis/acid.zalan.do/v1/util_test.go +++ b/pkg/apis/acid.zalan.do/v1/util_test.go @@ -123,6 +123,8 @@ var maintenanceWindows = []struct { {"expect error as weekday is empty", []byte(`":00:00-10:00"`), MaintenanceWindow{}, errors.New(`could not parse weekday: incorrect weekday`)}, {"expect error as maintenance window set seconds", []byte(`"Mon:00:00:00-10:00:00"`), MaintenanceWindow{}, errors.New(`incorrect maintenance window format`)}, {"expect error as 'To' time set seconds", []byte(`"Mon:00:00-00:00:00"`), MaintenanceWindow{}, errors.New("could not parse end time: incorrect time format")}, + // ideally, should be implemented + {"expect error as 'To' has a weekday", []byte(`"Mon:00:00-Fri:00:00"`), MaintenanceWindow{}, errors.New("could not parse end time: incorrect time format")}, {"expect error as 'To' time is missing", []byte(`"Mon:00:00"`), MaintenanceWindow{}, errors.New("incorrect maintenance window format")}} var postgresStatus = []struct { diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index f0f432753..d9997463a 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -668,7 +668,7 @@ func compareEnv(a, b []v1.EnvVar) bool { if len(a) != len(b) { return false } - equal := true + var equal bool for _, enva := range a { hasmatch := false for _, envb := range b { diff --git a/pkg/cluster/exec.go b/pkg/cluster/exec.go index 8b5089b4e..5605a70f6 100644 --- a/pkg/cluster/exec.go +++ b/pkg/cluster/exec.go @@ -15,7 +15,7 @@ import ( "github.com/zalando/postgres-operator/pkg/util/constants" ) -//ExecCommand executes arbitrary command inside the pod +// ExecCommand executes arbitrary command inside the pod func (c *Cluster) ExecCommand(podName *spec.NamespacedName, command ...string) (string, error) { c.setProcessName("executing command %q", strings.Join(command, " ")) @@ -59,7 +59,7 @@ func (c *Cluster) ExecCommand(podName *spec.NamespacedName, command ...string) ( return "", fmt.Errorf("failed to init executor: %v", err) } - err = exec.Stream(remotecommand.StreamOptions{ + err = exec.StreamWithContext(context.TODO(), remotecommand.StreamOptions{ Stdout: &execOut, Stderr: &execErr, Tty: false, diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index 89fb4b558..8934b6b49 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -47,11 +47,6 @@ const ( operatorPort = 8080 ) -type pgUser struct { - Password string `json:"password"` - Options []string `json:"options"` -} - type patroniDCS struct { TTL uint32 `json:"ttl,omitempty"` LoopWait uint32 `json:"loop_wait,omitempty"` @@ -2486,7 +2481,9 @@ func (c *Cluster) generateLogicalBackupPodEnvVars() []v1.EnvVar { } case "gcs": - envVars = append(envVars, v1.EnvVar{Name: "LOGICAL_BACKUP_GOOGLE_APPLICATION_CREDENTIALS", Value: c.OpConfig.LogicalBackup.LogicalBackupGoogleApplicationCredentials}) + if c.OpConfig.LogicalBackup.LogicalBackupGoogleApplicationCredentials != "" { + envVars = append(envVars, v1.EnvVar{Name: "LOGICAL_BACKUP_GOOGLE_APPLICATION_CREDENTIALS", Value: c.OpConfig.LogicalBackup.LogicalBackupGoogleApplicationCredentials}) + } case "az": envVars = appendEnvVars(envVars, []v1.EnvVar{ @@ -2497,11 +2494,11 @@ func (c *Cluster) generateLogicalBackupPodEnvVars() []v1.EnvVar { { Name: "LOGICAL_BACKUP_AZURE_STORAGE_CONTAINER", Value: c.OpConfig.LogicalBackup.LogicalBackupAzureStorageContainer, - }, - { - Name: "LOGICAL_BACKUP_AZURE_STORAGE_ACCOUNT_KEY", - Value: c.OpConfig.LogicalBackup.LogicalBackupAzureStorageAccountKey, }}...) + + if c.OpConfig.LogicalBackup.LogicalBackupAzureStorageAccountKey != "" { + envVars = append(envVars, v1.EnvVar{Name: "LOGICAL_BACKUP_AZURE_STORAGE_ACCOUNT_KEY", Value: c.OpConfig.LogicalBackup.LogicalBackupAzureStorageAccountKey}) + } } return envVars diff --git a/pkg/cluster/majorversionupgrade.go b/pkg/cluster/majorversionupgrade.go index 86c95b6a4..6bf4f167b 100644 --- a/pkg/cluster/majorversionupgrade.go +++ b/pkg/cluster/majorversionupgrade.go @@ -73,7 +73,7 @@ func (c *Cluster) majorVersionUpgrade() error { return nil } - if !c.isInMainternanceWindow() { + if !isInMainternanceWindow(c.Spec.MaintenanceWindows) { c.logger.Infof("skipping major version upgrade, not in maintenance window") return nil } diff --git a/pkg/cluster/streams.go b/pkg/cluster/streams.go index 422055f5f..9a31edc28 100644 --- a/pkg/cluster/streams.go +++ b/pkg/cluster/streams.go @@ -128,6 +128,8 @@ func (c *Cluster) syncPublication(dbName string, databaseSlotsList map[string]za createPublications[slotName] = tableList } else if currentTables != tableList { alterPublications[slotName] = tableList + } else { + (*slotsToSync)[slotName] = slotAndPublication.Slot } } @@ -142,30 +144,34 @@ func (c *Cluster) syncPublication(dbName string, databaseSlotsList map[string]za return nil } - var errorMessage error = nil + errors := make([]string, 0) for publicationName, tables := range createPublications { if err = c.executeCreatePublication(publicationName, tables); err != nil { - errorMessage = fmt.Errorf("creation of publication %q failed: %v", publicationName, err) + errors = append(errors, fmt.Sprintf("creation of publication %q failed: %v", publicationName, err)) continue } (*slotsToSync)[publicationName] = databaseSlotsList[publicationName].Slot } for publicationName, tables := range alterPublications { if err = c.executeAlterPublication(publicationName, tables); err != nil { - errorMessage = fmt.Errorf("update of publication %q failed: %v", publicationName, err) + errors = append(errors, fmt.Sprintf("update of publication %q failed: %v", publicationName, err)) continue } (*slotsToSync)[publicationName] = databaseSlotsList[publicationName].Slot } for _, publicationName := range deletePublications { if err = c.executeDropPublication(publicationName); err != nil { - errorMessage = fmt.Errorf("deletion of publication %q failed: %v", publicationName, err) + errors = append(errors, fmt.Sprintf("deletion of publication %q failed: %v", publicationName, err)) continue } (*slotsToSync)[publicationName] = nil } - return errorMessage + if len(errors) > 0 { + return fmt.Errorf("%v", strings.Join(errors, `', '`)) + } + + return nil } func (c *Cluster) generateFabricEventStream(appId string) *zalandov1.FabricEventStream { @@ -370,7 +376,7 @@ func (c *Cluster) syncStreams() error { for dbName, databaseSlotsList := range databaseSlots { err := c.syncPublication(dbName, databaseSlotsList, &slotsToSync) if err != nil { - c.logger.Warningf("could not sync publications in database %q: %v", dbName, err) + c.logger.Warningf("could not sync all publications in database %q: %v", dbName, err) continue } } @@ -398,7 +404,7 @@ func (c *Cluster) syncStreams() error { c.logger.Warningf("could not sync event streams with applicationId %s: %v", appId, err) } } else { - c.logger.Warningf("database replication slots for streams with applicationId %s not in sync, skipping event stream sync", appId) + c.logger.Warningf("database replication slots %#v for streams with applicationId %s not in sync, skipping event stream sync", slotsToSync, appId) } } @@ -415,8 +421,9 @@ func hasSlotsInSync(appId string, databaseSlots map[string]map[string]zalandov1. for dbName, slots := range databaseSlots { for slotName := range slots { if slotName == getSlotName(dbName, appId) { - if _, exists := slotsToSync[slotName]; !exists { + if slot, exists := slotsToSync[slotName]; !exists || slot == nil { allSlotsInSync = false + continue } } } @@ -426,24 +433,55 @@ func hasSlotsInSync(appId string, databaseSlots map[string]map[string]zalandov1. } func (c *Cluster) syncStream(appId string) error { + var ( + streams *zalandov1.FabricEventStreamList + err error + ) + c.setProcessName("syncing stream with applicationId %s", appId) + c.logger.Debugf("syncing stream with applicationId %s", appId) + + listOptions := metav1.ListOptions{LabelSelector: c.labelsSet(true).String()} + streams, err = c.KubeClient.FabricEventStreams(c.Namespace).List(context.TODO(), listOptions) + if err != nil { + return fmt.Errorf("could not list of FabricEventStreams for applicationId %s: %v", appId, err) + } + streamExists := false - // update stream when it exists and EventStreams array differs - for _, stream := range c.Streams { - if appId == stream.Spec.ApplicationId { - streamExists = true - desiredStreams := c.generateFabricEventStream(appId) - if match, reason := sameStreams(stream.Spec.EventStreams, desiredStreams.Spec.EventStreams); !match { - c.logger.Debugf("updating event streams with applicationId %s: %s", appId, reason) - desiredStreams.ObjectMeta = stream.ObjectMeta - updatedStream, err := c.updateStreams(desiredStreams) - if err != nil { - return fmt.Errorf("failed updating event streams %s with applicationId %s: %v", stream.Name, appId, err) - } - c.Streams[appId] = updatedStream - c.logger.Infof("event streams %q with applicationId %s have been successfully updated", updatedStream.Name, appId) + for _, stream := range streams.Items { + if stream.Spec.ApplicationId != appId { + continue + } + if streamExists { + c.logger.Warningf("more than one event stream with applicationId %s found, delete it", appId) + if err = c.KubeClient.FabricEventStreams(stream.ObjectMeta.Namespace).Delete(context.TODO(), stream.ObjectMeta.Name, metav1.DeleteOptions{}); err != nil { + c.logger.Errorf("could not delete event stream %q with applicationId %s: %v", stream.ObjectMeta.Name, appId, err) + } else { + c.logger.Infof("redundant event stream %q with applicationId %s has been successfully deleted", stream.ObjectMeta.Name, appId) } continue } + streamExists = true + desiredStreams := c.generateFabricEventStream(appId) + if !reflect.DeepEqual(stream.ObjectMeta.OwnerReferences, desiredStreams.ObjectMeta.OwnerReferences) { + c.logger.Infof("owner references of event streams with applicationId %s do not match the current ones", appId) + stream.ObjectMeta.OwnerReferences = desiredStreams.ObjectMeta.OwnerReferences + c.setProcessName("updating event streams with applicationId %s", appId) + stream, err := c.KubeClient.FabricEventStreams(stream.Namespace).Update(context.TODO(), &stream, metav1.UpdateOptions{}) + if err != nil { + return fmt.Errorf("could not update event streams with applicationId %s: %v", appId, err) + } + c.Streams[appId] = stream + } + if match, reason := c.compareStreams(&stream, desiredStreams); !match { + c.logger.Debugf("updating event streams with applicationId %s: %s", appId, reason) + desiredStreams.ObjectMeta = stream.ObjectMeta + updatedStream, err := c.updateStreams(desiredStreams) + if err != nil { + return fmt.Errorf("failed updating event streams %s with applicationId %s: %v", stream.Name, appId, err) + } + c.Streams[appId] = updatedStream + c.logger.Infof("event streams %q with applicationId %s have been successfully updated", updatedStream.Name, appId) + } } if !streamExists { @@ -459,7 +497,26 @@ func (c *Cluster) syncStream(appId string) error { return nil } -func sameStreams(curEventStreams, newEventStreams []zalandov1.EventStream) (match bool, reason string) { +func (c *Cluster) compareStreams(curEventStreams, newEventStreams *zalandov1.FabricEventStream) (match bool, reason string) { + reasons := make([]string, 0) + match = true + + // stream operator can add extra annotations so incl. current annotations in desired annotations + desiredAnnotations := c.annotationsSet(curEventStreams.Annotations) + if changed, reason := c.compareAnnotations(curEventStreams.ObjectMeta.Annotations, desiredAnnotations); changed { + match = false + reasons = append(reasons, fmt.Sprintf("new streams annotations do not match: %s", reason)) + } + + if changed, reason := sameEventStreams(curEventStreams.Spec.EventStreams, newEventStreams.Spec.EventStreams); !changed { + match = false + reasons = append(reasons, fmt.Sprintf("new streams EventStreams array does not match : %s", reason)) + } + + return match, strings.Join(reasons, ", ") +} + +func sameEventStreams(curEventStreams, newEventStreams []zalandov1.EventStream) (match bool, reason string) { if len(newEventStreams) != len(curEventStreams) { return false, "number of defined streams is different" } diff --git a/pkg/cluster/streams_test.go b/pkg/cluster/streams_test.go index 318bd8597..6091210b5 100644 --- a/pkg/cluster/streams_test.go +++ b/pkg/cluster/streams_test.go @@ -2,6 +2,7 @@ package cluster import ( "fmt" + "reflect" "strings" "context" @@ -18,29 +19,25 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/kubernetes/fake" ) -func newFakeK8sStreamClient() (k8sutil.KubernetesClient, *fake.Clientset) { - zalandoClientSet := fakezalandov1.NewSimpleClientset() - clientSet := fake.NewSimpleClientset() - - return k8sutil.KubernetesClient{ - FabricEventStreamsGetter: zalandoClientSet.ZalandoV1(), - PostgresqlsGetter: zalandoClientSet.AcidV1(), - PodsGetter: clientSet.CoreV1(), - StatefulSetsGetter: clientSet.AppsV1(), - }, clientSet -} - var ( - clusterName string = "acid-test-cluster" + clusterName string = "acid-stream-cluster" namespace string = "default" appId string = "test-app" dbName string = "foo" fesUser string = fmt.Sprintf("%s%s", constants.EventStreamSourceSlotPrefix, constants.UserRoleNameSuffix) slotName string = fmt.Sprintf("%s_%s_%s", constants.EventStreamSourceSlotPrefix, dbName, strings.Replace(appId, "-", "_", -1)) + zalandoClientSet = fakezalandov1.NewSimpleClientset() + + client = k8sutil.KubernetesClient{ + FabricEventStreamsGetter: zalandoClientSet.ZalandoV1(), + PostgresqlsGetter: zalandoClientSet.AcidV1(), + PodsGetter: clientSet.CoreV1(), + StatefulSetsGetter: clientSet.AppsV1(), + } + pg = acidv1.Postgresql{ TypeMeta: metav1.TypeMeta{ Kind: "Postgresql", @@ -91,6 +88,11 @@ var ( ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("%s-12345", clusterName), Namespace: namespace, + Labels: map[string]string{ + "application": "spilo", + "cluster-name": fmt.Sprintf("%s-2", clusterName), + "team": "acid", + }, OwnerReferences: []metav1.OwnerReference{ metav1.OwnerReference{ APIVersion: "apps/v1", @@ -181,6 +183,25 @@ var ( }, }, } + + cluster = New( + Config{ + OpConfig: config.Config{ + Auth: config.Auth{ + SecretNameTemplate: "{username}.{cluster}.credentials.{tprkind}.{tprgroup}", + }, + PodManagementPolicy: "ordered_ready", + Resources: config.Resources{ + ClusterLabels: map[string]string{"application": "spilo"}, + ClusterNameLabel: "cluster-name", + DefaultCPURequest: "300m", + DefaultCPULimit: "300m", + DefaultMemoryRequest: "300Mi", + DefaultMemoryLimit: "300Mi", + PodRoleLabel: "spilo-role", + }, + }, + }, client, pg, logger, eventRecorder) ) func TestGatherApplicationIds(t *testing.T) { @@ -193,15 +214,24 @@ func TestGatherApplicationIds(t *testing.T) { } func TestHasSlotsInSync(t *testing.T) { + cluster.Name = clusterName + cluster.Namespace = namespace + + appId2 := fmt.Sprintf("%s-2", appId) + dbNotExists := "dbnotexists" + slotNotExists := fmt.Sprintf("%s_%s_%s", constants.EventStreamSourceSlotPrefix, dbNotExists, strings.Replace(appId, "-", "_", -1)) + slotNotExistsAppId2 := fmt.Sprintf("%s_%s_%s", constants.EventStreamSourceSlotPrefix, dbNotExists, strings.Replace(appId2, "-", "_", -1)) tests := []struct { subTest string + applicationId string expectedSlots map[string]map[string]zalandov1.Slot actualSlots map[string]map[string]string slotsInSync bool }{ { - subTest: "slots are in sync", + subTest: fmt.Sprintf("slots in sync for applicationId %s", appId), + applicationId: appId, expectedSlots: map[string]map[string]zalandov1.Slot{ dbName: { slotName: zalandov1.Slot{ @@ -227,7 +257,93 @@ func TestHasSlotsInSync(t *testing.T) { }, slotsInSync: true, }, { - subTest: "slots are not in sync", + subTest: fmt.Sprintf("slots empty for applicationId %s after create or update of publication failed", appId), + applicationId: appId, + expectedSlots: map[string]map[string]zalandov1.Slot{ + dbNotExists: { + slotNotExists: zalandov1.Slot{ + Slot: map[string]string{ + "databases": dbName, + "plugin": constants.EventStreamSourcePluginType, + "type": "logical", + }, + Publication: map[string]acidv1.StreamTable{ + "test1": acidv1.StreamTable{ + EventType: "stream-type-a", + }, + }, + }, + }, + }, + actualSlots: map[string]map[string]string{}, + slotsInSync: false, + }, { + subTest: fmt.Sprintf("slot with empty definition for applicationId %s after publication git deleted", appId), + applicationId: appId, + expectedSlots: map[string]map[string]zalandov1.Slot{ + dbNotExists: { + slotNotExists: zalandov1.Slot{ + Slot: map[string]string{ + "databases": dbName, + "plugin": constants.EventStreamSourcePluginType, + "type": "logical", + }, + Publication: map[string]acidv1.StreamTable{ + "test1": acidv1.StreamTable{ + EventType: "stream-type-a", + }, + }, + }, + }, + }, + actualSlots: map[string]map[string]string{ + slotName: nil, + }, + slotsInSync: false, + }, { + subTest: fmt.Sprintf("one slot not in sync for applicationId %s because database does not exist", appId), + applicationId: appId, + expectedSlots: map[string]map[string]zalandov1.Slot{ + dbName: { + slotName: zalandov1.Slot{ + Slot: map[string]string{ + "databases": dbName, + "plugin": constants.EventStreamSourcePluginType, + "type": "logical", + }, + Publication: map[string]acidv1.StreamTable{ + "test1": acidv1.StreamTable{ + EventType: "stream-type-a", + }, + }, + }, + }, + dbNotExists: { + slotNotExists: zalandov1.Slot{ + Slot: map[string]string{ + "databases": "dbnotexists", + "plugin": constants.EventStreamSourcePluginType, + "type": "logical", + }, + Publication: map[string]acidv1.StreamTable{ + "test2": acidv1.StreamTable{ + EventType: "stream-type-b", + }, + }, + }, + }, + }, + actualSlots: map[string]map[string]string{ + slotName: map[string]string{ + "databases": dbName, + "plugin": constants.EventStreamSourcePluginType, + "type": "logical", + }, + }, + slotsInSync: false, + }, { + subTest: fmt.Sprintf("slots in sync for applicationId %s, but not for %s - checking %s should return true", appId, appId2, appId), + applicationId: appId, expectedSlots: map[string]map[string]zalandov1.Slot{ dbName: { slotName: zalandov1.Slot{ @@ -243,8 +359,49 @@ func TestHasSlotsInSync(t *testing.T) { }, }, }, - "dbnotexists": { + dbNotExists: { + slotNotExistsAppId2: zalandov1.Slot{ + Slot: map[string]string{ + "databases": "dbnotexists", + "plugin": constants.EventStreamSourcePluginType, + "type": "logical", + }, + Publication: map[string]acidv1.StreamTable{ + "test2": acidv1.StreamTable{ + EventType: "stream-type-b", + }, + }, + }, + }, + }, + actualSlots: map[string]map[string]string{ + slotName: map[string]string{ + "databases": dbName, + "plugin": constants.EventStreamSourcePluginType, + "type": "logical", + }, + }, + slotsInSync: true, + }, { + subTest: fmt.Sprintf("slots in sync for applicationId %s, but not for %s - checking %s should return false", appId, appId2, appId2), + applicationId: appId2, + expectedSlots: map[string]map[string]zalandov1.Slot{ + dbName: { slotName: zalandov1.Slot{ + Slot: map[string]string{ + "databases": dbName, + "plugin": constants.EventStreamSourcePluginType, + "type": "logical", + }, + Publication: map[string]acidv1.StreamTable{ + "test1": acidv1.StreamTable{ + EventType: "stream-type-a", + }, + }, + }, + }, + dbNotExists: { + slotNotExistsAppId2: zalandov1.Slot{ Slot: map[string]string{ "databases": "dbnotexists", "plugin": constants.EventStreamSourcePluginType, @@ -270,49 +427,24 @@ func TestHasSlotsInSync(t *testing.T) { } for _, tt := range tests { - result := hasSlotsInSync(appId, tt.expectedSlots, tt.actualSlots) - if !result { - t.Errorf("slots are not in sync, expected %#v, got %#v", tt.expectedSlots, tt.actualSlots) + result := hasSlotsInSync(tt.applicationId, tt.expectedSlots, tt.actualSlots) + if result != tt.slotsInSync { + t.Errorf("%s: unexpected result for slot test of applicationId: %v, expected slots %#v, actual slots %#v", tt.subTest, tt.applicationId, tt.expectedSlots, tt.actualSlots) } } } func TestGenerateFabricEventStream(t *testing.T) { - client, _ := newFakeK8sStreamClient() - - var cluster = New( - Config{ - OpConfig: config.Config{ - Auth: config.Auth{ - SecretNameTemplate: "{username}.{cluster}.credentials.{tprkind}.{tprgroup}", - }, - PodManagementPolicy: "ordered_ready", - Resources: config.Resources{ - ClusterLabels: map[string]string{"application": "spilo"}, - ClusterNameLabel: "cluster-name", - DefaultCPURequest: "300m", - DefaultCPULimit: "300m", - DefaultMemoryRequest: "300Mi", - DefaultMemoryLimit: "300Mi", - PodRoleLabel: "spilo-role", - }, - }, - }, client, pg, logger, eventRecorder) - cluster.Name = clusterName cluster.Namespace = namespace - // create statefulset to have ownerReference for streams - _, err := cluster.createStatefulSet() - assert.NoError(t, err) - // create the streams - err = cluster.syncStream(appId) + err := cluster.syncStream(appId) assert.NoError(t, err) // compare generated stream with expected stream result := cluster.generateFabricEventStream(appId) - if match, _ := sameStreams(result.Spec.EventStreams, fes.Spec.EventStreams); !match { + if match, _ := cluster.compareStreams(result, fes); !match { t.Errorf("malformed FabricEventStream, expected %#v, got %#v", fes, result) } @@ -321,14 +453,10 @@ func TestGenerateFabricEventStream(t *testing.T) { } streams, err := cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions) assert.NoError(t, err) - - // check if there is only one stream - if len(streams.Items) > 1 { - t.Errorf("too many stream CRDs found: got %d, but expected only one", len(streams.Items)) - } + assert.Equalf(t, 1, len(streams.Items), "unexpected number of streams found: got %d, but expected only one", len(streams.Items)) // compare stream returned from API with expected stream - if match, _ := sameStreams(streams.Items[0].Spec.EventStreams, fes.Spec.EventStreams); !match { + if match, _ := cluster.compareStreams(&streams.Items[0], fes); !match { t.Errorf("malformed FabricEventStream returned from API, expected %#v, got %#v", fes, streams.Items[0]) } @@ -338,20 +466,87 @@ func TestGenerateFabricEventStream(t *testing.T) { streams, err = cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions) assert.NoError(t, err) - - // check if there is still only one stream - if len(streams.Items) > 1 { - t.Errorf("too many stream CRDs found after sync: got %d, but expected only one", len(streams.Items)) - } + assert.Equalf(t, 1, len(streams.Items), "unexpected number of streams found: got %d, but expected only one", len(streams.Items)) // compare stream resturned from API with generated stream - if match, _ := sameStreams(streams.Items[0].Spec.EventStreams, result.Spec.EventStreams); !match { + if match, _ := cluster.compareStreams(&streams.Items[0], result); !match { t.Errorf("returned FabricEventStream differs from generated one, expected %#v, got %#v", result, streams.Items[0]) } } +func newFabricEventStream(streams []zalandov1.EventStream, annotations map[string]string) *zalandov1.FabricEventStream { + return &zalandov1.FabricEventStream{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-12345", clusterName), + Annotations: annotations, + }, + Spec: zalandov1.FabricEventStreamSpec{ + ApplicationId: appId, + EventStreams: streams, + }, + } +} + +func TestSyncStreams(t *testing.T) { + pg.Name = fmt.Sprintf("%s-2", pg.Name) + var cluster = New( + Config{ + OpConfig: config.Config{ + PodManagementPolicy: "ordered_ready", + Resources: config.Resources{ + ClusterLabels: map[string]string{"application": "spilo"}, + ClusterNameLabel: "cluster-name", + DefaultCPURequest: "300m", + DefaultCPULimit: "300m", + DefaultMemoryRequest: "300Mi", + DefaultMemoryLimit: "300Mi", + EnableOwnerReferences: util.True(), + PodRoleLabel: "spilo-role", + }, + }, + }, client, pg, logger, eventRecorder) + + _, err := cluster.KubeClient.Postgresqls(namespace).Create( + context.TODO(), &pg, metav1.CreateOptions{}) + assert.NoError(t, err) + + // create the stream + err = cluster.syncStream(appId) + assert.NoError(t, err) + + // create a second stream with same spec but with different name + createdStream, err := cluster.KubeClient.FabricEventStreams(namespace).Create( + context.TODO(), fes, metav1.CreateOptions{}) + assert.NoError(t, err) + assert.Equal(t, createdStream.Spec.ApplicationId, appId) + + // check that two streams exist + listOptions := metav1.ListOptions{ + LabelSelector: cluster.labelsSet(true).String(), + } + streams, err := cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions) + assert.NoError(t, err) + assert.Equalf(t, 2, len(streams.Items), "unexpected number of streams found: got %d, but expected only 2", len(streams.Items)) + + // sync the stream which should remove the redundant stream + err = cluster.syncStream(appId) + assert.NoError(t, err) + + // check that only one stream remains after sync + streams, err = cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions) + assert.NoError(t, err) + assert.Equalf(t, 1, len(streams.Items), "unexpected number of streams found: got %d, but expected only 1", len(streams.Items)) + + // check owner references + if !reflect.DeepEqual(streams.Items[0].OwnerReferences, cluster.ownerReferences()) { + t.Errorf("unexpected owner references, expected %#v, got %#v", cluster.ownerReferences(), streams.Items[0].OwnerReferences) + } +} + func TestSameStreams(t *testing.T) { testName := "TestSameStreams" + annotationsA := map[string]string{"owned-by": "acid"} + annotationsB := map[string]string{"owned-by": "foo"} stream1 := zalandov1.EventStream{ EventStreamFlow: zalandov1.EventStreamFlow{}, @@ -396,57 +591,64 @@ func TestSameStreams(t *testing.T) { tests := []struct { subTest string - streamsA []zalandov1.EventStream - streamsB []zalandov1.EventStream + streamsA *zalandov1.FabricEventStream + streamsB *zalandov1.FabricEventStream match bool reason string }{ { subTest: "identical streams", - streamsA: []zalandov1.EventStream{stream1, stream2}, - streamsB: []zalandov1.EventStream{stream1, stream2}, + streamsA: newFabricEventStream([]zalandov1.EventStream{stream1, stream2}, annotationsA), + streamsB: newFabricEventStream([]zalandov1.EventStream{stream1, stream2}, annotationsA), match: true, reason: "", }, { subTest: "same streams different order", - streamsA: []zalandov1.EventStream{stream1, stream2}, - streamsB: []zalandov1.EventStream{stream2, stream1}, + streamsA: newFabricEventStream([]zalandov1.EventStream{stream1, stream2}, nil), + streamsB: newFabricEventStream([]zalandov1.EventStream{stream2, stream1}, nil), match: true, reason: "", }, { subTest: "same streams different order", - streamsA: []zalandov1.EventStream{stream1}, - streamsB: []zalandov1.EventStream{stream1, stream2}, + streamsA: newFabricEventStream([]zalandov1.EventStream{stream1}, nil), + streamsB: newFabricEventStream([]zalandov1.EventStream{stream1, stream2}, nil), match: false, reason: "number of defined streams is different", }, { subTest: "different number of streams", - streamsA: []zalandov1.EventStream{stream1}, - streamsB: []zalandov1.EventStream{stream1, stream2}, + streamsA: newFabricEventStream([]zalandov1.EventStream{stream1}, nil), + streamsB: newFabricEventStream([]zalandov1.EventStream{stream1, stream2}, nil), match: false, reason: "number of defined streams is different", }, { subTest: "event stream specs differ", - streamsA: []zalandov1.EventStream{stream1, stream2}, - streamsB: fes.Spec.EventStreams, + streamsA: newFabricEventStream([]zalandov1.EventStream{stream1, stream2}, nil), + streamsB: fes, match: false, reason: "number of defined streams is different", }, { subTest: "event stream recovery specs differ", - streamsA: []zalandov1.EventStream{stream2}, - streamsB: []zalandov1.EventStream{stream3}, + streamsA: newFabricEventStream([]zalandov1.EventStream{stream2}, nil), + streamsB: newFabricEventStream([]zalandov1.EventStream{stream3}, nil), + match: false, + reason: "event stream specs differ", + }, + { + subTest: "event stream annotations differ", + streamsA: newFabricEventStream([]zalandov1.EventStream{stream2}, annotationsA), + streamsB: newFabricEventStream([]zalandov1.EventStream{stream3}, annotationsB), match: false, reason: "event stream specs differ", }, } for _, tt := range tests { - streamsMatch, matchReason := sameStreams(tt.streamsA, tt.streamsB) + streamsMatch, matchReason := cluster.compareStreams(tt.streamsA, tt.streamsB) if streamsMatch != tt.match { t.Errorf("%s %s: unexpected match result when comparing streams: got %s, epxected %s", testName, tt.subTest, matchReason, tt.reason) @@ -454,9 +656,8 @@ func TestSameStreams(t *testing.T) { } } -func TestUpdateFabricEventStream(t *testing.T) { - client, _ := newFakeK8sStreamClient() - +func TestUpdateStreams(t *testing.T) { + pg.Name = fmt.Sprintf("%s-3", pg.Name) var cluster = New( Config{ OpConfig: config.Config{ @@ -477,11 +678,7 @@ func TestUpdateFabricEventStream(t *testing.T) { context.TODO(), &pg, metav1.CreateOptions{}) assert.NoError(t, err) - // create statefulset to have ownerReference for streams - _, err = cluster.createStatefulSet() - assert.NoError(t, err) - - // now create the stream + // create the stream err = cluster.syncStream(appId) assert.NoError(t, err) @@ -502,7 +699,7 @@ func TestUpdateFabricEventStream(t *testing.T) { } streams := patchPostgresqlStreams(t, cluster, &pg.Spec, listOptions) result := cluster.generateFabricEventStream(appId) - if match, _ := sameStreams(streams.Items[0].Spec.EventStreams, result.Spec.EventStreams); !match { + if match, _ := cluster.compareStreams(&streams.Items[0], result); !match { t.Errorf("Malformed FabricEventStream after updating manifest, expected %#v, got %#v", streams.Items[0], result) } @@ -516,7 +713,7 @@ func TestUpdateFabricEventStream(t *testing.T) { streams = patchPostgresqlStreams(t, cluster, &pg.Spec, listOptions) result = cluster.generateFabricEventStream(appId) - if match, _ := sameStreams(streams.Items[0].Spec.EventStreams, result.Spec.EventStreams); !match { + if match, _ := cluster.compareStreams(&streams.Items[0], result); !match { t.Errorf("Malformed FabricEventStream after disabling event recovery, expected %#v, got %#v", streams.Items[0], result) } diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 59aee34e6..ee1713c05 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -228,6 +228,7 @@ func (c *Cluster) syncPatroniConfigMap(suffix string) error { } annotations := make(map[string]string) maps.Copy(annotations, cm.Annotations) + // Patroni can add extra annotations so incl. current annotations in desired annotations desiredAnnotations := c.annotationsSet(cm.Annotations) if changed, _ := c.compareAnnotations(annotations, desiredAnnotations); changed { patchData, err := metaAnnotationsPatch(desiredAnnotations) @@ -272,6 +273,7 @@ func (c *Cluster) syncPatroniEndpoint(suffix string) error { } annotations := make(map[string]string) maps.Copy(annotations, ep.Annotations) + // Patroni can add extra annotations so incl. current annotations in desired annotations desiredAnnotations := c.annotationsSet(ep.Annotations) if changed, _ := c.compareAnnotations(annotations, desiredAnnotations); changed { patchData, err := metaAnnotationsPatch(desiredAnnotations) @@ -315,6 +317,7 @@ func (c *Cluster) syncPatroniService() error { } annotations := make(map[string]string) maps.Copy(annotations, svc.Annotations) + // Patroni can add extra annotations so incl. current annotations in desired annotations desiredAnnotations := c.annotationsSet(svc.Annotations) if changed, _ := c.compareAnnotations(annotations, desiredAnnotations); changed { patchData, err := metaAnnotationsPatch(desiredAnnotations) diff --git a/pkg/cluster/util.go b/pkg/cluster/util.go index cee537036..e36d0c175 100644 --- a/pkg/cluster/util.go +++ b/pkg/cluster/util.go @@ -449,10 +449,6 @@ func (c *Cluster) _waitPodLabelsReady(anyReplica bool) error { return err } -func (c *Cluster) waitForAnyReplicaLabelReady() error { - return c._waitPodLabelsReady(true) -} - func (c *Cluster) waitForAllPodsLabelReady() error { return c._waitPodLabelsReady(false) } @@ -667,15 +663,15 @@ func parseResourceRequirements(resourcesRequirement v1.ResourceRequirements) (ac return resources, nil } -func (c *Cluster) isInMainternanceWindow() bool { - if c.Spec.MaintenanceWindows == nil { +func isInMainternanceWindow(specMaintenanceWindows []acidv1.MaintenanceWindow) bool { + if len(specMaintenanceWindows) == 0 { return true } now := time.Now() currentDay := now.Weekday() currentTime := now.Format("15:04") - for _, window := range c.Spec.MaintenanceWindows { + for _, window := range specMaintenanceWindows { startTime := window.StartTime.Format("15:04") endTime := window.EndTime.Format("15:04") diff --git a/pkg/cluster/util_test.go b/pkg/cluster/util_test.go index 0176ea005..2cb755c6c 100644 --- a/pkg/cluster/util_test.go +++ b/pkg/cluster/util_test.go @@ -651,24 +651,6 @@ func Test_trimCronjobName(t *testing.T) { } func TestIsInMaintenanceWindow(t *testing.T) { - client, _ := newFakeK8sStreamClient() - - var cluster = New( - Config{ - OpConfig: config.Config{ - PodManagementPolicy: "ordered_ready", - Resources: config.Resources{ - ClusterLabels: map[string]string{"application": "spilo"}, - ClusterNameLabel: "cluster-name", - DefaultCPURequest: "300m", - DefaultCPULimit: "300m", - DefaultMemoryRequest: "300Mi", - DefaultMemoryLimit: "300Mi", - PodRoleLabel: "spilo-role", - }, - }, - }, client, pg, logger, eventRecorder) - now := time.Now() futureTimeStart := now.Add(1 * time.Hour) futureTimeStartFormatted := futureTimeStart.Format("15:04") @@ -723,7 +705,7 @@ func TestIsInMaintenanceWindow(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { cluster.Spec.MaintenanceWindows = tt.windows - if cluster.isInMainternanceWindow() != tt.expected { + if isInMainternanceWindow(cluster.Spec.MaintenanceWindows) != tt.expected { t.Errorf("Expected isInMainternanceWindow to return %t", tt.expected) } }) diff --git a/pkg/controller/util.go b/pkg/controller/util.go index bd1e65d02..5a3b23edc 100644 --- a/pkg/controller/util.go +++ b/pkg/controller/util.go @@ -80,7 +80,7 @@ func (c *Controller) createOperatorCRD(desiredCrd *apiextv1.CustomResourceDefini c.logger.Infof("customResourceDefinition %q has been registered", crd.Name) } - return wait.Poll(c.config.CRDReadyWaitInterval, c.config.CRDReadyWaitTimeout, func() (bool, error) { + return wait.PollUntilContextTimeout(context.TODO(), c.config.CRDReadyWaitInterval, c.config.CRDReadyWaitTimeout, false, func(ctx context.Context) (bool, error) { c, err := c.KubeClient.CustomResourceDefinitions().Get(context.TODO(), desiredCrd.Name, metav1.GetOptions{}) if err != nil { return false, err diff --git a/pkg/util/util.go b/pkg/util/util.go index fb1217d1f..4b3aafc63 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -35,7 +35,7 @@ const ( var passwordChars = []byte("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789") func init() { - rand.Seed(time.Now().Unix()) + rand.New(rand.NewSource(time.Now().Unix())) } // helper function to get bool pointers