Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support InPlaceUpdate when only configuration changes are made #557

Merged
merged 12 commits into from
Oct 16, 2024
5 changes: 3 additions & 2 deletions api/core/v1alpha1/common_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const (
)

var (
LegacyOpVersion = semver.MustParse("1.2.0")
LatestOpVersion = semver.MustParse("1.3.0")
)

Expand Down Expand Up @@ -300,11 +301,11 @@ func (p *PodSet) GetSemVer() (*semver.Version, bool) {

func (p *PodSet) GetOperatorVersion() semver.Version {
if p.OperatorVersion == nil {
return LatestOpVersion
return LegacyOpVersion
}
v, err := semver.ParseTolerant(*p.OperatorVersion)
if err != nil {
return LatestOpVersion
return LegacyOpVersion
}
return v
}
7 changes: 5 additions & 2 deletions pkg/controllers/cnset/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,11 +324,14 @@ func syncCloneSet(ctx *recon.Context[*v1alpha1.CNSet], cs *kruisev1alpha1.CloneS
// ref: https://openkruise.io/zh/docs/next/user-manuals/cloneset/#%E6%94%AF%E6%8C%81-pvc-%E6%A8%A1%E6%9D%BF
syncPersistentVolumeClaim(cn, cs)

cm, err := buildCNSetConfigMap(ctx.Obj, ctx.Dep.Deps.LogSet)
cm, configSuffix, err := buildCNSetConfigMap(ctx.Obj, ctx.Dep.Deps.LogSet)
if err != nil {
return err
}
return common.SyncConfigMap(ctx, &cs.Spec.Template.Spec, cm)
if cn.Spec.GetOperatorVersion().Equals(v1alpha1.LatestOpVersion) {
cs.Spec.Template.Annotations[common.ConfigSuffixAnno] = configSuffix
}
return common.SyncConfigMap(ctx, &cs.Spec.Template.Spec, cm, cn.Spec.GetOperatorVersion())
}

func setReady(cn *v1alpha1.CNSet) {
Expand Down
20 changes: 13 additions & 7 deletions pkg/controllers/cnset/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,11 @@ sql-address = "${POD_IP}:{{ .CNSQLPort }}"
service-host = "${POD_IP}"
EOF
# build instance config
sed "/\[cn\]/r ${bc}" {{ .ConfigFilePath }} > ${conf}
if [ -n "${CONFIG_SUFFIX}" ]; then
sed "/\[cn\]/r ${bc}" {{ .ConfigFilePath }}-${CONFIG_SUFFIX} > ${conf}
else
sed "/\[cn\]/r ${bc}" {{ .ConfigFilePath }} > ${conf}
fi

# append lock-service configs
lsc=$(mktemp)
Expand Down Expand Up @@ -196,6 +200,7 @@ func syncPodSpec(cn *v1alpha1.CNSet, cs *kruisev1alpha1.CloneSet, sp v1alpha1.Sh
mainRef.Env = []corev1.EnvVar{
util.FieldRefEnv(common.PodNameEnvKey, "metadata.name"),
util.FieldRefEnv(common.NamespaceEnvKey, "metadata.namespace"),
util.FieldRefEnv(common.ConfigSuffixEnvKey, fmt.Sprintf("metadata.annotations['%s']", common.ConfigSuffixAnno)),
{Name: common.HeadlessSvcEnvKey, Value: headlessSvcName(cn)},
util.FieldRefEnv(common.PodIPEnvKey, "status.podIP"),
}
Expand Down Expand Up @@ -249,9 +254,9 @@ func syncPodSpec(cn *v1alpha1.CNSet, cs *kruisev1alpha1.CloneSet, sp v1alpha1.Sh
}
}

func buildCNSetConfigMap(cn *v1alpha1.CNSet, ls *v1alpha1.LogSet) (*corev1.ConfigMap, error) {
func buildCNSetConfigMap(cn *v1alpha1.CNSet, ls *v1alpha1.LogSet) (*corev1.ConfigMap, string, error) {
if ls.Status.Discovery == nil {
return nil, errors.New("logset had not yet exposed HAKeeper discovery address")
return nil, "", errors.New("logset had not yet exposed HAKeeper discovery address")
}
cfg := cn.Spec.Config
if cfg == nil {
Expand Down Expand Up @@ -285,7 +290,7 @@ func buildCNSetConfigMap(cn *v1alpha1.CNSet, ls *v1alpha1.LogSet) (*corev1.Confi
}
s, err := cfg.ToString()
if err != nil {
return nil, err
return nil, "", err
}
buff := new(bytes.Buffer)
err = startScriptTpl.Execute(buff, &model{
Expand All @@ -295,18 +300,19 @@ func buildCNSetConfigMap(cn *v1alpha1.CNSet, ls *v1alpha1.LogSet) (*corev1.Confi
LockServicePort: common.LockServicePort,
})
if err != nil {
return nil, err
return nil, "", err
}

configSuffix := common.DataDigest([]byte(s))
return &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: configMapName(cn),
Namespace: cn.Namespace,
Labels: common.SubResourceLabels(cn),
},
Data: map[string]string{
common.ConfigFile: s,
fmt.Sprintf("%s-%s", common.ConfigFile, configSuffix): s,
common.Entrypoint: buff.String(),
},
}, nil
}, configSuffix, nil
}
7 changes: 4 additions & 3 deletions pkg/controllers/cnset/resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,13 +227,14 @@ service-addresses = []
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := NewGomegaWithT(t)
got, err := buildCNSetConfigMap(tt.args.cn, tt.args.ls)
got, configSuffix, err := buildCNSetConfigMap(tt.args.cn, tt.args.ls)
if (err != nil) != tt.wantErr {
t.Errorf("buildDNSetConfigMap() error = %v, wantErr %v", err, tt.wantErr)
return
}
g.Expect(got.Data["config.toml"]).NotTo(BeNil())
g.Expect(cmp.Diff(tt.wantConfig, got.Data["config.toml"])).To(BeEmpty())
configKey := "config.toml-" + configSuffix
g.Expect(got.Data[configKey]).NotTo(BeNil())
g.Expect(cmp.Diff(tt.wantConfig, got.Data[configKey])).To(BeEmpty())
})
}
}
70 changes: 64 additions & 6 deletions pkg/controllers/common/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@ package common
import (
"encoding/json"
"fmt"
"github.com/blang/semver/v4"
"github.com/cespare/xxhash"
"github.com/go-errors/errors"
recon "github.com/matrixorigin/controller-runtime/pkg/reconciler"
"github.com/matrixorigin/controller-runtime/pkg/util"
"github.com/matrixorigin/matrixone-operator/api/core/v1alpha1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"sigs.k8s.io/controller-runtime/pkg/client"
"strings"
)

const (
Expand All @@ -38,14 +42,19 @@ const (

// SyncConfigMap syncs the desired configmap for pods, which will cause rolling-update if the
// data of the configmap is changed
func SyncConfigMap(kubeCli recon.KubeClient, podSpec *corev1.PodSpec, cm *corev1.ConfigMap) error {
func SyncConfigMap(kubeCli recon.KubeClient, podSpec *corev1.PodSpec, cm *corev1.ConfigMap, operatorVersion semver.Version) error {
var currentCmName string
var desiredName string
var err error
vp := util.FindFirst(podSpec.Volumes, util.WithVolumeName("config"))
if vp != nil {
currentCmName = vp.Name
}
// TODO(aylei): GC stale configmaps (maybe in another worker?)
desiredName, err := ensureConfigMap(kubeCli, currentCmName, cm)
if operatorVersion.Equals(v1alpha1.LatestOpVersion) {
desiredName, err = ensureConfigMap(kubeCli, cm)
} else {
desiredName, err = ensureConfigMapLegacy(kubeCli, currentCmName, cm)
}
if err != nil {
return err
}
Expand All @@ -66,23 +75,72 @@ func SyncConfigMap(kubeCli recon.KubeClient, podSpec *corev1.PodSpec, cm *corev1
}

// ensureConfigMap ensures the configmap exist in k8s
func ensureConfigMap(kubeCli recon.KubeClient, currentCm string, desired *corev1.ConfigMap) (string, error) {
func ensureConfigMap(kubeCli recon.KubeClient, desired *corev1.ConfigMap) (string, error) {
c := desired.DeepCopy()
if err := addConfigMapDigest(c); err != nil {
old := &corev1.ConfigMap{}
exist, err := kubeCli.Exist(client.ObjectKeyFromObject(c), old)
if err != nil {
return "", err
}
if exist {
podList := &corev1.PodList{}
err = kubeCli.List(podList, client.InNamespace(c.Namespace))
if err != nil {
return "", err
}
for key, v := range old.Data {
if withDigest(key, v) && configInUse(key, podList.Items) {
// append item that is still in use
c.Data[key] = v
}
}
err = kubeCli.Update(c)
} else {
err = kubeCli.CreateOwned(c)
}
if err != nil {
return "", err
}
return c.Name, nil
}

// Deprecated: use ensureConfigMap instead
func ensureConfigMapLegacy(kubeCli recon.KubeClient, currentCm string, desired *corev1.ConfigMap) (string, error) {
c := desired.DeepCopy()
if err := addConfigMapDigest(c); err != nil {
return "", errors.Wrap(err, 0)
}
// config digest not changed
if c.Name == currentCm {
return currentCm, nil
}
// otherwise ensure the configmap exists
err := util.Ignore(apierrors.IsAlreadyExists, kubeCli.CreateOwned(c))
if err != nil {
return "", err
return "", errors.Wrap(err, 0)
}
return c.Name, nil
}

func withDigest(key string, v string) bool {
return strings.Contains(key, DataDigest([]byte(v)))
}

func configInUse(key string, podList []corev1.Pod) bool {
for _, pod := range podList {
s := pod.Annotations[ConfigSuffixAnno]
if len(s) > 0 && strings.Contains(key, s) {
return true
}
}
return false
}

func DataDigest(data []byte) string {
sum := xxhash.Sum64(data)
return fmt.Sprintf("%x", sum)[0:7]
}

func addConfigMapDigest(cm *corev1.ConfigMap) error {
s, err := json.Marshal(cm.Data)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions pkg/controllers/common/configmap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func TestAddConfigMapDigest(t *testing.T) {
func TestDataDigest(t *testing.T) {
// need fuzz?
cmList := []*corev1.ConfigMap{
newCM(""),
Expand All @@ -33,8 +33,8 @@ func TestAddConfigMapDigest(t *testing.T) {
}
g := NewGomegaWithT(t)
for _, cm := range cmList {
g.Expect(addConfigMapDigest(cm)).To(Succeed())
g.Expect(utf8string.NewString(cm.Name).IsASCII()).To(BeTrue())
digest := DataDigest([]byte(cm.Data["config"]))
g.Expect(utf8string.NewString(digest).IsASCII()).To(BeTrue())
}
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/controllers/common/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,6 @@ const (

LabelManagedBy = "matrixorigin.io/managed-by"
LabelOwnerUID = "matrixorigin.io/owner-uid"

ConfigSuffixAnno = "matrixorigin.io/config-suffix"
)
8 changes: 6 additions & 2 deletions pkg/controllers/common/podset.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type SyncMOPodTask struct {
ConfigMap *corev1.ConfigMap
KubeCli recon.KubeClient
StorageProvider *v1alpha1.SharedStorageProvider

ConfigSuffix string
// optional
MutateContainer func(c *corev1.Container)
MutatePod func(p *corev1.PodTemplateSpec)
Expand Down Expand Up @@ -74,7 +74,7 @@ func GetSemanticVersion(meta *metav1.ObjectMeta) semver.Version {
// SyncMOPod execute the given SyncMOPodTask which keeps the pod spec update to date
func SyncMOPod(t *SyncMOPodTask) error {
syncPodTemplate(t)
if err := SyncConfigMap(t.KubeCli, &t.TargetTemplate.Spec, t.ConfigMap); err != nil {
if err := SyncConfigMap(t.KubeCli, &t.TargetTemplate.Spec, t.ConfigMap, t.PodSet.GetOperatorVersion()); err != nil {
return errors.WrapPrefix(err, "sync configmap", 0)
}
return nil
Expand Down Expand Up @@ -106,6 +106,9 @@ func syncPodTemplate(t *SyncMOPodTask) {
if t.MutatePod != nil {
t.MutatePod(t.TargetTemplate)
}
if t.PodSet.GetOperatorVersion().Equals(v1alpha1.LatestOpVersion) {
t.TargetTemplate.ObjectMeta.Annotations[ConfigSuffixAnno] = t.ConfigSuffix
}

p.Overlay.OverlayPodMeta(&t.TargetTemplate.ObjectMeta)
p.Overlay.OverlayPodSpec(specRef)
Expand All @@ -119,6 +122,7 @@ func syncMainContainer(p *v1alpha1.PodSet, c *corev1.Container, mutateFn func(c
c.Env = []corev1.EnvVar{
util.FieldRefEnv(PodNameEnvKey, "metadata.name"),
util.FieldRefEnv(NamespaceEnvKey, "metadata.namespace"),
util.FieldRefEnv(ConfigSuffixEnvKey, fmt.Sprintf("metadata.annotations['%s']", ConfigSuffixAnno)),
}
memLimitEnv := GoMemLimitEnv(p.MemoryLimitPercent, c.Resources.Limits.Memory(), p.Overlay)
if memLimitEnv != nil {
Expand Down
2 changes: 2 additions & 0 deletions pkg/controllers/common/template.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ const (
NamespaceEnvKey = "NAMESPACE"
// PodIPEnvKey is the container environment variable to reflect the IP of the Pod that runs the container
PodIPEnvKey = "POD_IP"
// ConfigSuffixEnvKey is the container environment variable to reflect the config suffix
ConfigSuffixEnvKey = "CONFIG_SUFFIX"
)

// SubResourceLabels generate labels for sub-resources
Expand Down
8 changes: 5 additions & 3 deletions pkg/controllers/dnset/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,12 +186,14 @@ func (d *Actor) Create(ctx *recon.Context[*v1alpha1.DNSet]) error {
syncPodSpec(dn, dnSet, ctx.Dep.Deps.LogSet.Spec.SharedStorage)
syncPersistentVolumeClaim(dn, dnSet)

configMap, err := buildDNSetConfigMap(dn, ctx.Dep.Deps.LogSet)
configMap, configSuffix, err := buildDNSetConfigMap(dn, ctx.Dep.Deps.LogSet)
if err != nil {
return err
}

if err := common.SyncConfigMap(ctx, &dnSet.Spec.Template.Spec, configMap); err != nil {
if dn.Spec.GetOperatorVersion().Equals(v1alpha1.LatestOpVersion) {
dnSet.Spec.Template.Annotations[common.ConfigSuffixAnno] = configSuffix
}
if err := common.SyncConfigMap(ctx, &dnSet.Spec.Template.Spec, configMap, dn.Spec.GetOperatorVersion()); err != nil {
return err
}

Expand Down
Loading
Loading