Skip to content

Commit

Permalink
Fix edge cases in PostgreSQL maitenance
Browse files Browse the repository at this point in the history
This commit fixes two separate issue with the PostreSQL maintenance
script. Both are related to waiting for the upgrade to complete.

* Fixes a panic in case we receive an event with an SGDbOps without any
  conditions
* Fixes an issue where the script will run repack prematurely, in case
  there is an event on a different, completed, SGDbOps in the namespace.
  • Loading branch information
glrf committed Jul 19, 2023
1 parent af99cb9 commit b969b27
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 39 deletions.
23 changes: 13 additions & 10 deletions pkg/maintenance/postgresql.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,13 @@ func (p *PostgreSQL) DoMaintenance(ctx context.Context) error {
currentVersion := sgclusters.Items[0].Spec.Postgres.Version

p.log.Info("Checking for upgrades...")
err = p.upgradeVersion(currentVersion, clusterName)
op, err := p.upgradeVersion(currentVersion, clusterName)
if err != nil {
return err
}

p.log.Info("Waiting for upgrades to finish before doing repack on databases")
err = p.waitForUpgrade(ctx)
err = p.waitForUpgrade(ctx, op)
if apierrors.IsTimeout(err) {
return nil
}
Expand All @@ -110,7 +110,7 @@ func (p *PostgreSQL) DoMaintenance(ctx context.Context) error {
return nil
}

func (p *PostgreSQL) upgradeVersion(currentVersion string, clusterName string) error {
func (p *PostgreSQL) upgradeVersion(currentVersion string, clusterName string) (OpName, error) {
versionList, err := p.fetchVersionList(p.SgURL)
if err != nil {
p.log.Error(err, "StackGres API error")
Expand All @@ -127,22 +127,22 @@ func (p *PostgreSQL) upgradeVersion(currentVersion string, clusterName string) e
p.log.Info("Found versions", "current", currentVersion, "latest", latestMinor)
if currentVersion != latestMinor {
p.log.Info("Doing a minor upgrade")
return p.createMinorUpgrade(clusterName, latestMinor)
return mvu, p.createMinorUpgrade(clusterName, latestMinor)
}

p.log.Info("Checking for EOL")
if versionList != nil && p.isEOL(currentVersion, *versionList) {
err = p.setEOLStatus()
if err != nil {
return fmt.Errorf("cannot set EOL status on claim: %w", err)
return "", fmt.Errorf("cannot set EOL status on claim: %w", err)
}
}

p.log.Info("Doing a security maintenance")
return p.createSecurityUpgrade(clusterName)
return su, p.createSecurityUpgrade(clusterName)
}

func (p *PostgreSQL) waitForUpgrade(ctx context.Context) error {
func (p *PostgreSQL) waitForUpgrade(ctx context.Context, op OpName) error {
ls := &stackgresv1.SGDbOpsList{}
watcher, err := p.Client.Watch(ctx, ls, client.InNamespace(p.instanceNamespace))
if err != nil {
Expand All @@ -168,9 +168,12 @@ func (p *PostgreSQL) waitForUpgrade(ctx context.Context) error {
switch event.Type {
case watch.Modified:
ops, _ := event.Object.(*stackgresv1.SGDbOps)
if ops.Status.Conditions == nil || ops.Spec.Op != string(op) {
continue
}
for _, c := range *ops.Status.Conditions {
// When operation is completed then return, regardless if it failed or not
if isUpgradeFinished(ops.Spec.Op, c) {
if isUpgradeFinished(c) {
return nil
}
}
Expand All @@ -181,8 +184,8 @@ func (p *PostgreSQL) waitForUpgrade(ctx context.Context) error {
}
}

func isUpgradeFinished(op string, v stackgresv1.SGDbOpsStatusConditionsItem) bool {
return op != string(r) && *v.Reason == "OperationCompleted" && *v.Status == "True"
func isUpgradeFinished(v stackgresv1.SGDbOpsStatusConditionsItem) bool {
return *v.Reason == "OperationCompleted" && *v.Status == "True"
}

func (p *PostgreSQL) listClustersInNamespace() (*stackgresv1.SGClusterList, error) {
Expand Down
134 changes: 105 additions & 29 deletions pkg/maintenance/postgresql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@ import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"reflect"
"strings"
"testing"
"time"

"github.com/go-logr/logr"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
Expand All @@ -12,15 +19,8 @@ import (
"github.com/vshn/appcat/pkg"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/pointer"
"math/rand"
"net/http"
"net/http/httptest"
"reflect"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"strings"
"testing"
"time"
)

func TestPostgreSQL_getLatestMinorversion(t *testing.T) {
Expand Down Expand Up @@ -199,13 +199,15 @@ func getBrokenHTTPServer(t *testing.T) *httptest.Server {

func TestPostgreSQL_DoMaintenance(t *testing.T) {
tests := []struct {
name string
wantErr bool
objs []client.Object
maintTimeout time.Duration
wantedClaim *vshnv1.VSHNPostgreSQL
wantedOps *stackgresv1.SGDbOps
server *httptest.Server
name string
wantErr bool
objs []client.Object
maintTimeout time.Duration
wantedClaim *vshnv1.VSHNPostgreSQL
wantedOps *stackgresv1.SGDbOps
server *httptest.Server
updatedOps string
shouldSkipRepack bool
}{
{
name: "GivenEOLVersion_ThenExpectEOLStatus",
Expand Down Expand Up @@ -339,7 +341,7 @@ func TestPostgreSQL_DoMaintenance(t *testing.T) {
},
{
name: "GivenMaintenanceTooLong_ThenExpectNoRepack",
maintTimeout: time.Second,
maintTimeout: 500 * time.Millisecond,
objs: []client.Object{
&stackgresv1.SGCluster{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -353,7 +355,42 @@ func TestPostgreSQL_DoMaintenance(t *testing.T) {
},
},
},
server: getVersionTestHTTPServer(t),
server: getVersionTestHTTPServer(t),
shouldSkipRepack: true,
},
{
name: "GivenMaintenanceTooLong_WithUnrelatedSecupdate_ThenExpectNoRepack",
maintTimeout: 2 * time.Second,
objs: []client.Object{
&stackgresv1.SGCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "cluster",
Namespace: "default",
},
Spec: stackgresv1.SGClusterSpec{
Postgres: stackgresv1.SGClusterSpecPostgres{
Version: "15.0",
},
},
},
&stackgresv1.SGDbOps{
ObjectMeta: metav1.ObjectMeta{
Name: "unrelated-securitymaintenance",
Namespace: "default",
},
Spec: stackgresv1.SGDbOpsSpec{
Op: "securityUpgrade",
SgCluster: "cluster",
MaxRetries: pointer.Int(1),
SecurityUpgrade: &stackgresv1.SGDbOpsSpecSecurityUpgrade{
Method: pointer.String("InPlace"),
},
},
},
},
server: getVersionTestHTTPServer(t),
updatedOps: "securityUpgrade",
shouldSkipRepack: true,
},
}
for _, tt := range tests {
Expand All @@ -368,7 +405,7 @@ func TestPostgreSQL_DoMaintenance(t *testing.T) {

defer tt.server.Close()
ctx := context.TODO()
err := concurrentSGDbOpsUpdate(fclient)
err := concurrentSGDbOpsUpdate(fclient, tt.updatedOps)
if err != nil {
assert.NoError(t, err)
}
Expand All @@ -379,9 +416,11 @@ func TestPostgreSQL_DoMaintenance(t *testing.T) {
SgURL: tt.server.URL,
MaintTimeout: tt.maintTimeout,
}
if err := p.DoMaintenance(ctx); (err != nil) != tt.wantErr {
t.Errorf("PostgreSQL.DoMaintenance() error = %v, wantErr %v", err, tt.wantErr)
}
assert.NotPanics(t, func() {
if err := p.DoMaintenance(ctx); (err != nil) != tt.wantErr {
t.Errorf("PostgreSQL.DoMaintenance() error = %v, wantErr %v", err, tt.wantErr)
}
})

if tt.wantedClaim != nil {

Expand All @@ -406,7 +445,7 @@ func TestPostgreSQL_DoMaintenance(t *testing.T) {
Namespace: "default",
},
}
if tt.maintTimeout == time.Hour {
if !tt.shouldSkipRepack {
assert.NoError(t, fclient.Get(ctx, client.ObjectKeyFromObject(repack), repack))
assert.Equal(t, "repack", repack.Spec.Op)
} else {
Expand All @@ -416,34 +455,71 @@ func TestPostgreSQL_DoMaintenance(t *testing.T) {
}
}

func concurrentSGDbOpsUpdate(fakeClient client.WithWatch) error {
func concurrentSGDbOpsUpdate(fakeClient client.WithWatch, op string) error {
var err error
go func() error {
ctx := context.Background()
sl := &stackgresv1.SGDbOpsList{}
for len(sl.Items) == 0 {
err := fakeClient.List(ctx, sl)
if err != nil {

return fmt.Errorf("cannot get sgdbops resources")
}
}
for {
s := sl.Items[rand.Intn(len(sl.Items))]
// Check that we can handle nils
for _, s := range sl.Items {
s.Status.Conditions = nil
err := fakeClient.Status().Update(ctx, &s)
if err != nil {
return fmt.Errorf("cannot update status of sgdbops resource")
}
}

// Check that we can handle not completes
err := fakeClient.List(ctx, sl)
if err != nil {
return fmt.Errorf("cannot get sgdbops resources")
}
for _, s := range sl.Items {
s.Status.Conditions = &[]stackgresv1.SGDbOpsStatusConditionsItem{
{
Message: pointer.String("all good"),
Reason: pointer.String("OperationCompleted"),
Status: pointer.String("True"),
Message: pointer.String("not yet"),
Reason: pointer.String("OperationnNotCompleted"),
Status: pointer.String("False"),
Type: pointer.String("Completed"),
},
}
time.Sleep(time.Second)
err := fakeClient.Status().Update(ctx, &s)
if err != nil {
return fmt.Errorf("cannot update status of sgdbops resource")
}
}

// Complete only the configured sgdbop
for {
time.Sleep(time.Second)
err := fakeClient.List(ctx, sl)
if err != nil {
return fmt.Errorf("cannot get sgdbops resources")
}
for _, s := range sl.Items {
if op != "" && s.Spec.Op != op {
continue
}
s.Status.Conditions = &[]stackgresv1.SGDbOpsStatusConditionsItem{
{
Message: pointer.String("all good"),
Reason: pointer.String("OperationCompleted"),
Status: pointer.String("True"),
Type: pointer.String("Completed"),
},
}
err := fakeClient.Status().Update(ctx, &s)
if err != nil {
return fmt.Errorf("cannot update status of sgdbops resource")
}
}
}
}()
return err
}
Expand Down

0 comments on commit b969b27

Please sign in to comment.