Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue#398 that decreasing replicas will make zookeeper unrecoverable when zookeeper not running. #406

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
38 changes: 34 additions & 4 deletions pkg/controller/zookeepercluster/zookeepercluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"context"
"fmt"
"strconv"
"strings"
"time"

"github.com/operator-framework/operator-sdk/pkg/predicate"
Expand Down Expand Up @@ -279,6 +280,8 @@ func (r *ReconcileZookeeperCluster) reconcileStatefulSet(instance *zookeeperv1be
foundSTSSize := *foundSts.Spec.Replicas
newSTSSize := *sts.Spec.Replicas
if newSTSSize != foundSTSSize {
// If zookeeper is not running, it must stop update replicas.
// Until zookeeper is running and the client connect it successfully, decreasing Replicas will take effect.
zkUri := utils.GetZkServiceUri(instance)
err = r.zkClient.Connect(zkUri)
if err != nil {
Expand All @@ -295,7 +298,31 @@ func (r *ReconcileZookeeperCluster) reconcileStatefulSet(instance *zookeeperv1be

data := "CLUSTER_SIZE=" + strconv.Itoa(int(newSTSSize))
r.log.Info("Updating Cluster Size.", "New Data:", data, "Version", version)
r.zkClient.UpdateNode(path, data, version)
err = r.zkClient.UpdateNode(path, data, version)
if err != nil {
return fmt.Errorf("Error updating cluster size %s: %v", path, err)
}
// #398 if decrease node, remove node immediately after updating node successfully.
if newSTSSize < foundSTSSize {
var removes []string
config, _, err := r.zkClient.GetConfig()
if err != nil {
return fmt.Errorf("Error GetConfig %v", err)
}
r.log.Info("Get zookeeper config.", "Config: ", config)
for myid := newSTSSize + 1; myid <= foundSTSSize; myid++ {
if strings.Contains(config, "server."+strconv.Itoa(int(myid))+"=") {
removes = append(removes, strconv.Itoa(int(myid)))
}
}
// The node that have been removed with reconfig also can still provide services for all online clients.
// So We can remove it firstly, it will avoid to error that client can't connect to server on preStop.
r.log.Info("Do reconfig to remove node.", "Remove ids", strings.Join(removes, ","))
Copy link
Contributor

@anishakj anishakj Nov 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The check above returns error at line no: 303 ensures zookeeper is running.
Later in teardown script remove operation is performed.(https://github.com/pravega/zookeeper-operator/blob/master/docker/bin/zookeeperTeardown.sh#L45.) Do you still think removing node is required here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's no matter removing node on teardown script, do it or not will not affect cluster.
But if only do reconfig on teardown script, it will not chance to retry doing reconfig after pod exited when zookeeper is unserviceable.
So I think it may be better to do reconfig on checking the cluster scale down.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stop-coding Did you see Do reconfig to remove node. message being present in the logs in your use case?

I think @anishakj is suggesting that catching the UpdateNode error and returning on line 303 should be enough to fix the issue, hence lines 305 to 324 would never get executed.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jkhalack Sorry for my late reply.
Only catching the UpdateNode error is not enough to ensure do reconfigure successfully on preStop, for example that pod exiting but the cluster broken again. We indeed hope that updating node size and doing reconfigure is atomicity, but it's not realistic.

It known that updating "Spec.Replicas" to k8s will tell pod to create or exit. If it fail on scale down, we can stop updating "Spec.Replicas" until cluster recovery, that will ensure have done reconfigure before pod exit.

So I think that doing reconfigure on checking the cluster scale down is better.

err = r.zkClient.IncReconfig(nil, removes, -1)
if err != nil {
return fmt.Errorf("Error reconfig remove id:%s, %v", strings.Join(removes, ","), err)
}
}
}
err = r.updateStatefulSet(instance, foundSts, sts)
if err != nil {
Expand Down Expand Up @@ -615,7 +642,8 @@ func (r *ReconcileZookeeperCluster) reconcileClusterStatus(instance *zookeeperv1
instance.Status.Members.Ready = readyMembers
instance.Status.Members.Unready = unreadyMembers

//If Cluster is in a ready state...
// If Cluster is in a ready state...
// instance.Spec.Replicas is just an expected value that we set it, but it maybe not take effect by k8s.
if instance.Spec.Replicas == instance.Status.ReadyReplicas && (!instance.Status.MetaRootCreated) {
r.log.Info("Cluster is Ready, Creating ZK Metadata...")
zkUri := utils.GetZkServiceUri(instance)
Expand All @@ -635,7 +663,7 @@ func (r *ReconcileZookeeperCluster) reconcileClusterStatus(instance *zookeeperv1
r.log.Info("Updating zookeeper status",
"StatefulSet.Namespace", instance.Namespace,
"StatefulSet.Name", instance.Name)
if instance.Status.ReadyReplicas == instance.Spec.Replicas {
if instance.Status.ReadyReplicas == instance.Spec.Replicas && instance.Status.Replicas == instance.Spec.Replicas {
instance.Status.SetPodsReadyConditionTrue()
} else {
instance.Status.SetPodsReadyConditionFalse()
Expand Down Expand Up @@ -765,7 +793,9 @@ func (r *ReconcileZookeeperCluster) getPVCCount(instance *zookeeperv1beta1.Zooke

func (r *ReconcileZookeeperCluster) cleanupOrphanPVCs(instance *zookeeperv1beta1.ZookeeperCluster) (err error) {
// this check should make sure we do not delete the PVCs before the STS has scaled down
if instance.Status.ReadyReplicas == instance.Spec.Replicas {
// instance.Spec.Replicas is just an expected value that we set it, but it maybe not take effect by k8s if update state failly.
// So we should check that instance.Status.Replicas is equal to ReadyReplicas and Spec.Replicas, which means cluster already.
if instance.Status.ReadyReplicas == instance.Spec.Replicas && instance.Status.Replicas == instance.Spec.Replicas {
pvcCount, err := r.getPVCCount(instance)
if err != nil {
return err
Expand Down
191 changes: 188 additions & 3 deletions pkg/controller/zookeepercluster/zookeepercluster_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package zookeepercluster

import (
"context"
"fmt"
"os"
"testing"
"time"
Expand Down Expand Up @@ -60,20 +61,130 @@ func (client *MockZookeeperClient) NodeExists(zNodePath string) (version int32,
return 0, nil
}

func (client *MockZookeeperClient) IncReconfig(joining []string, leaving []string, version int64) (err error) {
return nil
}

func (client *MockZookeeperClient) GetConfig() (config string, version int32, err error) {
return "server.1=xxx,server.2=xxxx,server.3=xxxx", 0, nil
}

func (client *MockZookeeperClient) Close() {
return
}

type TestZookeeperClient struct {
// dummy struct
}

func (client *TestZookeeperClient) Connect(zkUri string) (err error) {
// do nothing
return nil
}

func (client *TestZookeeperClient) CreateNode(zoo *v1beta1.ZookeeperCluster, zNodePath string) (err error) {
return nil
}

func (client *TestZookeeperClient) UpdateNode(path string, data string, version int32) (err error) {
return fmt.Errorf("Error")
}

func (client *TestZookeeperClient) NodeExists(zNodePath string) (version int32, err error) {
return 0, nil
}

func (client *TestZookeeperClient) IncReconfig(joining []string, leaving []string, version int64) (err error) {
return fmt.Errorf("Error")
}

func (client *TestZookeeperClient) GetConfig() (config string, version int32, err error) {
return "server.1=xxx,server.2=xxxx,server.3=xxxx", 0, fmt.Errorf("Error")
}

func (client *TestZookeeperClient) Close() {
return
}

type GetConfigFailZookeeperClient struct {
// dummy struct
}

func (client *GetConfigFailZookeeperClient) Connect(zkUri string) (err error) {
// do nothing
return nil
}

func (client *GetConfigFailZookeeperClient) CreateNode(zoo *v1beta1.ZookeeperCluster, zNodePath string) (err error) {
return nil
}

func (client *GetConfigFailZookeeperClient) UpdateNode(path string, data string, version int32) (err error) {
return nil
}

func (client *GetConfigFailZookeeperClient) NodeExists(zNodePath string) (version int32, err error) {
return 0, nil
}

func (client *GetConfigFailZookeeperClient) IncReconfig(joining []string, leaving []string, version int64) (err error) {
return nil
}

func (client *GetConfigFailZookeeperClient) GetConfig() (config string, version int32, err error) {
return "server.1=xxx,server.2=xxxx,server.3=xxxx", 0, fmt.Errorf("Error")
}

func (client *GetConfigFailZookeeperClient) Close() {
return
}

type IncReconfigFailZookeeperClient struct {
// dummy struct
}

func (client *IncReconfigFailZookeeperClient) Connect(zkUri string) (err error) {
// do nothing
return nil
}

func (client *IncReconfigFailZookeeperClient) CreateNode(zoo *v1beta1.ZookeeperCluster, zNodePath string) (err error) {
return nil
}

func (client *IncReconfigFailZookeeperClient) UpdateNode(path string, data string, version int32) (err error) {
return nil
}

func (client *IncReconfigFailZookeeperClient) NodeExists(zNodePath string) (version int32, err error) {
return 0, nil
}

func (client *IncReconfigFailZookeeperClient) IncReconfig(joining []string, leaving []string, version int64) (err error) {
return fmt.Errorf("Error")
}

func (client *IncReconfigFailZookeeperClient) GetConfig() (config string, version int32, err error) {
return "server.1=xxx,server.2=xxxx,server.3=xxxx", 0, nil
}

func (client *IncReconfigFailZookeeperClient) Close() {
return
}

var _ = Describe("ZookeeperCluster Controller", func() {
const (
Name = "example"
Namespace = "default"
)

var (
s = scheme.Scheme
mockZkClient = new(MockZookeeperClient)
r *ReconcileZookeeperCluster
s = scheme.Scheme
mockZkClient = new(MockZookeeperClient)
testZkClient = new(TestZookeeperClient)
getConfigZkClient = new(GetConfigFailZookeeperClient)
incReconfigZkClient = new(IncReconfigFailZookeeperClient)
r *ReconcileZookeeperCluster
)

Context("Reconcile", func() {
Expand Down Expand Up @@ -223,6 +334,80 @@ var _ = Describe("ZookeeperCluster Controller", func() {
})
})

Context("With scale down Replicas", func() {
var (
cl client.Client
err error
)

BeforeEach(func() {
z.WithDefaults()
z.Spec.Pod.ServiceAccountName = "zookeeper"
z.Status.Init()
next := z.DeepCopy()
st := zk.MakeStatefulSet(z)
next.Spec.Replicas = 1
cl = fake.NewFakeClient([]runtime.Object{next, st}...)
r = &ReconcileZookeeperCluster{client: cl, scheme: s, zkClient: mockZkClient}
res, err = r.Reconcile(req)
})

It("should not raise an error", func() {
Ω(err).To(BeNil())
})

It("should update the sts", func() {
foundSts := &appsv1.StatefulSet{}
err = cl.Get(context.TODO(), req.NamespacedName, foundSts)
Ω(err).To(BeNil())
Ω(*foundSts.Spec.Replicas).To(BeEquivalentTo(1))
})
})

Context("With scale down Replicas but fail", func() {
var (
cl client.Client
err error
count int
new_count int
)

BeforeEach(func() {
z.WithDefaults()
z.Spec.Pod.ServiceAccountName = "zookeeper"
z.Status.Init()
next := z.DeepCopy()
st := zk.MakeStatefulSet(z)
next.Spec.Replicas = 1
cl = fake.NewFakeClient([]runtime.Object{next, st}...)
r = &ReconcileZookeeperCluster{client: cl, scheme: s, zkClient: testZkClient}
count, _ = r.getPVCCount(z)
res, err = r.Reconcile(req)
})

It("should raise an error in case of zookeeper not running", func() {
Ω(err).NotTo(BeNil())
r = &ReconcileZookeeperCluster{client: cl, scheme: s, zkClient: getConfigZkClient}
_, err = r.Reconcile(req)
Ω(err).NotTo(BeNil())
r = &ReconcileZookeeperCluster{client: cl, scheme: s, zkClient: incReconfigZkClient}
_, err = r.Reconcile(req)
Ω(err).NotTo(BeNil())
})

It("should not update the sts in case of zookeeper not running", func() {
foundSts := &appsv1.StatefulSet{}
err = cl.Get(context.TODO(), req.NamespacedName, foundSts)
Ω(err).To(BeNil())
Ω(*foundSts.Spec.Replicas).To(BeEquivalentTo(3))
})
It("should not delete pvc in case of zookeeper not running", func() {
new_count, err = r.getPVCCount(z)
Ω(err).To(BeNil())
Ω(new_count).To(BeEquivalentTo(count))
})
})

Context("With no update to sts", func() {
var (
cl client.Client
Expand Down
20 changes: 20 additions & 0 deletions pkg/zk/zookeeper_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,18 @@ type ZookeeperClient interface {
CreateNode(*v1beta1.ZookeeperCluster, string) error
NodeExists(string) (int32, error)
UpdateNode(string, string, int32) error
IncReconfig([]string, []string, int64) error
GetConfig() (string, int32, error)
Close()
}

type DefaultZookeeperClient struct {
conn *zk.Conn
}

// zookeeper configure path
const ZOO_CONFIG_PATH = "/zookeeper/config"

func (client *DefaultZookeeperClient) Connect(zkUri string) (err error) {
host := []string{zkUri}
conn, _, err := zk.Connect(host, time.Second*5)
Expand Down Expand Up @@ -74,6 +79,21 @@ func (client *DefaultZookeeperClient) NodeExists(zNodePath string) (version int3
return zNodeStat.Version, err
}

func (client *DefaultZookeeperClient) IncReconfig(joining []string, leaving []string, version int64) (err error) {
if _, err := client.conn.IncrementalReconfig(joining, leaving, version); err != nil {
return fmt.Errorf("Failed to reconfig node:%s, err:%v", strings.Join(leaving, ","), err)
}
return nil
}

func (client *DefaultZookeeperClient) GetConfig() (config string, version int32, err error) {
data, stat, err := client.conn.Get(ZOO_CONFIG_PATH)
if err != nil {
return "", -1, fmt.Errorf("Get config %s error, err:%v", ZOO_CONFIG_PATH, err)
}
return string(data), stat.Version, nil
}

func (client *DefaultZookeeperClient) Close() {
client.conn.Close()
}
10 changes: 9 additions & 1 deletion pkg/zk/zookeeper_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
var _ = Describe("Zookeeper Client", func() {

Context("with a valid update of Service port", func() {
var err1, err2, err3, err4, err5 error
var err1, err2, err3, err4, err5, err6, err7 error
BeforeEach(func() {
z := &v1beta1.ZookeeperCluster{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -36,6 +36,8 @@ var _ = Describe("Zookeeper Client", func() {
err5 = zkclient.CreateNode(z, "temp/tmp")
err3 = zkclient.UpdateNode("temp/tem/temp", "dasd", 2)
_, err4 = zkclient.NodeExists("temp")
_, _, err6 = zkclient.GetConfig()
err7 = zkclient.IncReconfig(nil, nil, -1)
zkclient.Close()
})
It("err1 should be nil", func() {
Expand All @@ -53,5 +55,11 @@ var _ = Describe("Zookeeper Client", func() {
It("err5 should be not nil", func() {
Ω(err5).ShouldNot(BeNil())
})
It("err6 should be not nil", func() {
Ω(err6).ShouldNot(BeNil())
})
It("err7 should be not nil", func() {
Ω(err7).ShouldNot(BeNil())
})
})
})