Skip to content

Commit

Permalink
Merge branch 'master' into bump-1-13-0
Browse files Browse the repository at this point in the history
  • Loading branch information
FxKu committed Aug 20, 2024
2 parents 99ba308 + cb06a1e commit 3d448b0
Show file tree
Hide file tree
Showing 16 changed files with 500 additions and 260 deletions.
2 changes: 1 addition & 1 deletion charts/postgres-operator/crds/postgresqls.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ spec:
type: array
items:
type: string
pattern: '^\ *((Mon|Tue|Wed|Thu|Fri|Sat|Sun):(2[0-3]|[01]?\d):([0-5]?\d)|(2[0-3]|[01]?\d):([0-5]?\d))-((Mon|Tue|Wed|Thu|Fri|Sat|Sun):(2[0-3]|[01]?\d):([0-5]?\d)|(2[0-3]|[01]?\d):([0-5]?\d))\ *$'
pattern: '^\ *((Mon|Tue|Wed|Thu|Fri|Sat|Sun):(2[0-3]|[01]?\d):([0-5]?\d)|(2[0-3]|[01]?\d):([0-5]?\d))-((2[0-3]|[01]?\d):([0-5]?\d)|(2[0-3]|[01]?\d):([0-5]?\d))\ *$'
masterServiceAnnotations:
type: object
additionalProperties:
Expand Down
220 changes: 113 additions & 107 deletions e2e/tests/test_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -2131,130 +2131,136 @@ def test_stream_resources(self):
verbs=["create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"]
)
cluster_role.rules.append(fes_cluster_role_rule)
k8s.api.rbac_api.patch_cluster_role("postgres-operator", cluster_role)

# create a table in one of the database of acid-minimal-cluster
create_stream_table = """
CREATE TABLE test_table (id int, payload jsonb);
"""
self.query_database(leader.metadata.name, "foo", create_stream_table)
try:
k8s.api.rbac_api.patch_cluster_role("postgres-operator", cluster_role)

# update the manifest with the streams section
patch_streaming_config = {
"spec": {
"patroni": {
"slots": {
"manual_slot": {
"type": "physical"
}
}
},
"streams": [
{
"applicationId": "test-app",
"batchSize": 100,
"database": "foo",
"enableRecovery": True,
"tables": {
"test_table": {
"eventType": "test-event",
"idColumn": "id",
"payloadColumn": "payload",
"recoveryEventType": "test-event-dlq"
# create a table in one of the database of acid-minimal-cluster
create_stream_table = """
CREATE TABLE test_table (id int, payload jsonb);
"""
self.query_database(leader.metadata.name, "foo", create_stream_table)

# update the manifest with the streams section
patch_streaming_config = {
"spec": {
"patroni": {
"slots": {
"manual_slot": {
"type": "physical"
}
}
},
{
"applicationId": "test-app2",
"batchSize": 100,
"database": "foo",
"enableRecovery": True,
"tables": {
"test_non_exist_table": {
"eventType": "test-event",
"idColumn": "id",
"payloadColumn": "payload",
"recoveryEventType": "test-event-dlq"
"streams": [
{
"applicationId": "test-app",
"batchSize": 100,
"database": "foo",
"enableRecovery": True,
"tables": {
"test_table": {
"eventType": "test-event",
"idColumn": "id",
"payloadColumn": "payload",
"recoveryEventType": "test-event-dlq"
}
}
},
{
"applicationId": "test-app2",
"batchSize": 100,
"database": "foo",
"enableRecovery": True,
"tables": {
"test_non_exist_table": {
"eventType": "test-event",
"idColumn": "id",
"payloadColumn": "payload",
"recoveryEventType": "test-event-dlq"
}
}
}
}
]
]
}
}
}
k8s.api.custom_objects_api.patch_namespaced_custom_object(
'acid.zalan.do', 'v1', 'default', 'postgresqls', 'acid-minimal-cluster', patch_streaming_config)
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
k8s.api.custom_objects_api.patch_namespaced_custom_object(
'acid.zalan.do', 'v1', 'default', 'postgresqls', 'acid-minimal-cluster', patch_streaming_config)
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")

# check if publication, slot, and fes resource are created
get_publication_query = """
SELECT * FROM pg_publication WHERE pubname = 'fes_foo_test_app';
"""
get_slot_query = """
SELECT * FROM pg_replication_slots WHERE slot_name = 'fes_foo_test_app';
"""
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_publication_query)), 1,
"Publication is not created", 10, 5)
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_slot_query)), 1,
"Replication slot is not created", 10, 5)
self.eventuallyEqual(lambda: len(k8s.api.custom_objects_api.list_namespaced_custom_object(
# check if publication, slot, and fes resource are created
get_publication_query = """
SELECT * FROM pg_publication WHERE pubname = 'fes_foo_test_app';
"""
get_slot_query = """
SELECT * FROM pg_replication_slots WHERE slot_name = 'fes_foo_test_app';
"""
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_publication_query)), 1,
"Publication is not created", 10, 5)
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_slot_query)), 1,
"Replication slot is not created", 10, 5)
self.eventuallyEqual(lambda: len(k8s.api.custom_objects_api.list_namespaced_custom_object(
"zalando.org", "v1", "default", "fabriceventstreams", label_selector="cluster-name=acid-minimal-cluster")["items"]), 1,
"Could not find Fabric Event Stream resource", 10, 5)

# check if the non-existing table in the stream section does not create a publication and slot
get_publication_query_not_exist_table = """
SELECT * FROM pg_publication WHERE pubname = 'fes_foo_test_app2';
"""
get_slot_query_not_exist_table = """
SELECT * FROM pg_replication_slots WHERE slot_name = 'fes_foo_test_app2';
"""
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_publication_query_not_exist_table)), 0,
"Publication is created for non-existing tables", 10, 5)
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_slot_query_not_exist_table)), 0,
"Replication slot is created for non-existing tables", 10, 5)

# grant create and ownership of test_table to foo_user, reset search path to default
grant_permission_foo_user = """
GRANT CREATE ON DATABASE foo TO foo_user;
ALTER TABLE test_table OWNER TO foo_user;
ALTER ROLE foo_user RESET search_path;
"""
self.query_database(leader.metadata.name, "foo", grant_permission_foo_user)
# non-postgres user creates a publication
create_nonstream_publication = """
CREATE PUBLICATION mypublication FOR TABLE test_table;
"""
self.query_database_with_user(leader.metadata.name, "foo", create_nonstream_publication, "foo_user")
# check if the non-existing table in the stream section does not create a publication and slot
get_publication_query_not_exist_table = """
SELECT * FROM pg_publication WHERE pubname = 'fes_foo_test_app2';
"""
get_slot_query_not_exist_table = """
SELECT * FROM pg_replication_slots WHERE slot_name = 'fes_foo_test_app2';
"""
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_publication_query_not_exist_table)), 0,
"Publication is created for non-existing tables", 10, 5)
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_slot_query_not_exist_table)), 0,
"Replication slot is created for non-existing tables", 10, 5)

# grant create and ownership of test_table to foo_user, reset search path to default
grant_permission_foo_user = """
GRANT CREATE ON DATABASE foo TO foo_user;
ALTER TABLE test_table OWNER TO foo_user;
ALTER ROLE foo_user RESET search_path;
"""
self.query_database(leader.metadata.name, "foo", grant_permission_foo_user)
# non-postgres user creates a publication
create_nonstream_publication = """
CREATE PUBLICATION mypublication FOR TABLE test_table;
"""
self.query_database_with_user(leader.metadata.name, "foo", create_nonstream_publication, "foo_user")

# remove the streams section from the manifest
patch_streaming_config_removal = {
"spec": {
"streams": []
# remove the streams section from the manifest
patch_streaming_config_removal = {
"spec": {
"streams": []
}
}
}
k8s.api.custom_objects_api.patch_namespaced_custom_object(
'acid.zalan.do', 'v1', 'default', 'postgresqls', 'acid-minimal-cluster', patch_streaming_config_removal)
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
k8s.api.custom_objects_api.patch_namespaced_custom_object(
'acid.zalan.do', 'v1', 'default', 'postgresqls', 'acid-minimal-cluster', patch_streaming_config_removal)
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")

# check if publication, slot, and fes resource are removed
self.eventuallyEqual(lambda: len(k8s.api.custom_objects_api.list_namespaced_custom_object(
# check if publication, slot, and fes resource are removed
self.eventuallyEqual(lambda: len(k8s.api.custom_objects_api.list_namespaced_custom_object(
"zalando.org", "v1", "default", "fabriceventstreams", label_selector="cluster-name=acid-minimal-cluster")["items"]), 0,
'Could not delete Fabric Event Stream resource', 10, 5)
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_publication_query)), 0,
"Publication is not deleted", 10, 5)
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_slot_query)), 0,
"Replication slot is not deleted", 10, 5)

# check the manual_slot and mypublication should not get deleted
get_manual_slot_query = """
SELECT * FROM pg_replication_slots WHERE slot_name = 'manual_slot';
"""
get_nonstream_publication_query = """
SELECT * FROM pg_publication WHERE pubname = 'mypublication';
"""
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "postgres", get_manual_slot_query)), 1,
"Slot defined in patroni config is deleted", 10, 5)
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_nonstream_publication_query)), 1,
"Publication defined not in stream section is deleted", 10, 5)
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_publication_query)), 0,
"Publication is not deleted", 10, 5)
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_slot_query)), 0,
"Replication slot is not deleted", 10, 5)

# check the manual_slot and mypublication should not get deleted
get_manual_slot_query = """
SELECT * FROM pg_replication_slots WHERE slot_name = 'manual_slot';
"""
get_nonstream_publication_query = """
SELECT * FROM pg_publication WHERE pubname = 'mypublication';
"""
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "postgres", get_manual_slot_query)), 1,
"Slot defined in patroni config is deleted", 10, 5)
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "foo", get_nonstream_publication_query)), 1,
"Publication defined not in stream section is deleted", 10, 5)

except timeout_decorator.TimeoutError:
print('Operator log: {}'.format(k8s.get_operator_log()))
raise

@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
def test_taint_based_eviction(self):
Expand Down
2 changes: 1 addition & 1 deletion manifests/postgresql.crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ spec:
type: array
items:
type: string
pattern: '^\ *((Mon|Tue|Wed|Thu|Fri|Sat|Sun):(2[0-3]|[01]?\d):([0-5]?\d)|(2[0-3]|[01]?\d):([0-5]?\d))-((Mon|Tue|Wed|Thu|Fri|Sat|Sun):(2[0-3]|[01]?\d):([0-5]?\d)|(2[0-3]|[01]?\d):([0-5]?\d))\ *$'
pattern: '^\ *((Mon|Tue|Wed|Thu|Fri|Sat|Sun):(2[0-3]|[01]?\d):([0-5]?\d)|(2[0-3]|[01]?\d):([0-5]?\d))-((2[0-3]|[01]?\d):([0-5]?\d)|(2[0-3]|[01]?\d):([0-5]?\d))\ *$'
masterServiceAnnotations:
type: object
additionalProperties:
Expand Down
4 changes: 2 additions & 2 deletions pkg/apis/acid.zalan.do/v1/postgresql_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ type Volume struct {
Size string `json:"size"`
StorageClass string `json:"storageClass,omitempty"`
SubPath string `json:"subPath,omitempty"`
IsSubPathExpr *bool `json:"isSubPathExpr,omitemtpy"`
IsSubPathExpr *bool `json:"isSubPathExpr,omitempty"`
Iops *int64 `json:"iops,omitempty"`
Throughput *int64 `json:"throughput,omitempty"`
VolumeType string `json:"type,omitempty"`
Expand All @@ -144,7 +144,7 @@ type AdditionalVolume struct {
Name string `json:"name"`
MountPath string `json:"mountPath"`
SubPath string `json:"subPath,omitempty"`
IsSubPathExpr *bool `json:"isSubPathExpr,omitemtpy"`
IsSubPathExpr *bool `json:"isSubPathExpr,omitempty"`
TargetContainers []string `json:"targetContainers"`
VolumeSource v1.VolumeSource `json:"volumeSource"`
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/apis/acid.zalan.do/v1/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ var maintenanceWindows = []struct {
{"expect error as weekday is empty", []byte(`":00:00-10:00"`), MaintenanceWindow{}, errors.New(`could not parse weekday: incorrect weekday`)},
{"expect error as maintenance window set seconds", []byte(`"Mon:00:00:00-10:00:00"`), MaintenanceWindow{}, errors.New(`incorrect maintenance window format`)},
{"expect error as 'To' time set seconds", []byte(`"Mon:00:00-00:00:00"`), MaintenanceWindow{}, errors.New("could not parse end time: incorrect time format")},
// ideally, should be implemented
{"expect error as 'To' has a weekday", []byte(`"Mon:00:00-Fri:00:00"`), MaintenanceWindow{}, errors.New("could not parse end time: incorrect time format")},
{"expect error as 'To' time is missing", []byte(`"Mon:00:00"`), MaintenanceWindow{}, errors.New("incorrect maintenance window format")}}

var postgresStatus = []struct {
Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -668,7 +668,7 @@ func compareEnv(a, b []v1.EnvVar) bool {
if len(a) != len(b) {
return false
}
equal := true
var equal bool
for _, enva := range a {
hasmatch := false
for _, envb := range b {
Expand Down
4 changes: 2 additions & 2 deletions pkg/cluster/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"github.com/zalando/postgres-operator/pkg/util/constants"
)

//ExecCommand executes arbitrary command inside the pod
// ExecCommand executes arbitrary command inside the pod
func (c *Cluster) ExecCommand(podName *spec.NamespacedName, command ...string) (string, error) {
c.setProcessName("executing command %q", strings.Join(command, " "))

Expand Down Expand Up @@ -59,7 +59,7 @@ func (c *Cluster) ExecCommand(podName *spec.NamespacedName, command ...string) (
return "", fmt.Errorf("failed to init executor: %v", err)
}

err = exec.Stream(remotecommand.StreamOptions{
err = exec.StreamWithContext(context.TODO(), remotecommand.StreamOptions{
Stdout: &execOut,
Stderr: &execErr,
Tty: false,
Expand Down
17 changes: 7 additions & 10 deletions pkg/cluster/k8sres.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,6 @@ const (
operatorPort = 8080
)

type pgUser struct {
Password string `json:"password"`
Options []string `json:"options"`
}

type patroniDCS struct {
TTL uint32 `json:"ttl,omitempty"`
LoopWait uint32 `json:"loop_wait,omitempty"`
Expand Down Expand Up @@ -2486,7 +2481,9 @@ func (c *Cluster) generateLogicalBackupPodEnvVars() []v1.EnvVar {
}

case "gcs":
envVars = append(envVars, v1.EnvVar{Name: "LOGICAL_BACKUP_GOOGLE_APPLICATION_CREDENTIALS", Value: c.OpConfig.LogicalBackup.LogicalBackupGoogleApplicationCredentials})
if c.OpConfig.LogicalBackup.LogicalBackupGoogleApplicationCredentials != "" {
envVars = append(envVars, v1.EnvVar{Name: "LOGICAL_BACKUP_GOOGLE_APPLICATION_CREDENTIALS", Value: c.OpConfig.LogicalBackup.LogicalBackupGoogleApplicationCredentials})
}

case "az":
envVars = appendEnvVars(envVars, []v1.EnvVar{
Expand All @@ -2497,11 +2494,11 @@ func (c *Cluster) generateLogicalBackupPodEnvVars() []v1.EnvVar {
{
Name: "LOGICAL_BACKUP_AZURE_STORAGE_CONTAINER",
Value: c.OpConfig.LogicalBackup.LogicalBackupAzureStorageContainer,
},
{
Name: "LOGICAL_BACKUP_AZURE_STORAGE_ACCOUNT_KEY",
Value: c.OpConfig.LogicalBackup.LogicalBackupAzureStorageAccountKey,
}}...)

if c.OpConfig.LogicalBackup.LogicalBackupAzureStorageAccountKey != "" {
envVars = append(envVars, v1.EnvVar{Name: "LOGICAL_BACKUP_AZURE_STORAGE_ACCOUNT_KEY", Value: c.OpConfig.LogicalBackup.LogicalBackupAzureStorageAccountKey})
}
}

return envVars
Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/majorversionupgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (c *Cluster) majorVersionUpgrade() error {
return nil
}

if !c.isInMainternanceWindow() {
if !isInMainternanceWindow(c.Spec.MaintenanceWindows) {
c.logger.Infof("skipping major version upgrade, not in maintenance window")
return nil
}
Expand Down
Loading

0 comments on commit 3d448b0

Please sign in to comment.