Skip to content
This repository has been archived by the owner on Sep 2, 2024. It is now read-only.

Commit

Permalink
upgrade to latest dependencies (#1472)
Browse files Browse the repository at this point in the history
bumping knative.dev/eventing 469d0ac...5500bed:
  > 5500bed Cache statefulset scale update/get requests (# 7651)
  > 7b975fc Update KinD for e2e tests to 0.21.0 (# 7656)
  > 96863ba [main] Upgrade to latest dependencies (# 7657)

Signed-off-by: Knative Automation <[email protected]>
  • Loading branch information
knative-automation authored Feb 7, 2024
1 parent 35a20b5 commit d23eb24
Show file tree
Hide file tree
Showing 7 changed files with 155 additions and 56 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ require (
k8s.io/apimachinery v0.28.5
k8s.io/client-go v0.28.5
k8s.io/utils v0.0.0-20230406110748-d93618cff8a2
knative.dev/eventing v0.40.1-0.20240202144010-469d0ac85e86
knative.dev/eventing v0.40.1-0.20240206181150-5500beda659a
knative.dev/hack v0.0.0-20240201013652-f3881d90c189
knative.dev/pkg v0.0.0-20240205092023-4104e4237f6a
knative.dev/reconciler-test v0.0.0-20240202062219-3bf004cddd5c
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -970,8 +970,8 @@ k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9 h1:LyMgNKD2P8Wn1iAwQU5Ohx
k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9/go.mod h1:wZK2AVp1uHCp4VamDVgBP2COHZjqD1T68Rf0CM3YjSM=
k8s.io/utils v0.0.0-20230406110748-d93618cff8a2 h1:qY1Ad8PODbnymg2pRbkyMT/ylpTrCM8P2RJ0yroCyIk=
k8s.io/utils v0.0.0-20230406110748-d93618cff8a2/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
knative.dev/eventing v0.40.1-0.20240202144010-469d0ac85e86 h1:a+k1iy7L+AQPY0jbWWM+7N6h5DXvvNw3L0f9cN60WKo=
knative.dev/eventing v0.40.1-0.20240202144010-469d0ac85e86/go.mod h1:gji5GMsP3ahX6Ul5y/rqp6X2oNTurLwdekzl6Tt16zo=
knative.dev/eventing v0.40.1-0.20240206181150-5500beda659a h1:xVIxFtb91Sz3bxWhn+x8ZDpiBcXBmRTPms2SVk1s8JE=
knative.dev/eventing v0.40.1-0.20240206181150-5500beda659a/go.mod h1:O74nDEiDyKCBS+kg5YdlODCLzjBmxV/bRgm12Q9dgOM=
knative.dev/hack v0.0.0-20240201013652-f3881d90c189 h1:a8htyuf5+S0NGxxdKXeQ49XOD9dEC1LHoofRQPgFKrU=
knative.dev/hack v0.0.0-20240201013652-f3881d90c189/go.mod h1:yk2OjGDsbEnQjfxdm0/HJKS2WqTLEFg/N6nUs6Rqx3Q=
knative.dev/pkg v0.0.0-20240205092023-4104e4237f6a h1:BP7e0NMXh+4dgxw5RYKmEOG8QZufS3OEzY88ILle0ms=
Expand Down
98 changes: 98 additions & 0 deletions vendor/knative.dev/eventing/pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,15 @@ limitations under the License.
package scheduler

import (
"context"
"sync"
"time"

autoscalingv1 "k8s.io/api/autoscaling/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/cache"

duckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1"
)
Expand Down Expand Up @@ -114,3 +121,94 @@ type VPod interface {

GetResourceVersion() string
}

type ScaleClient interface {
GetScale(ctx context.Context, name string, options metav1.GetOptions) (*autoscalingv1.Scale, error)
UpdateScale(ctx context.Context, name string, scale *autoscalingv1.Scale, options metav1.UpdateOptions) (*autoscalingv1.Scale, error)
}

type ScaleCacheConfig struct {
RefreshPeriod time.Duration `json:"refreshPeriod"`
}

type ScaleCache struct {
entriesMu sync.RWMutex // protects access to entries, entries itself is concurrency safe, so we only need to ensure that we correctly access the pointer
entries *cache.Expiring
scaleClient ScaleClient
statefulSetNamespace string
config ScaleCacheConfig
}

type scaleEntry struct {
specReplicas int32
statReplicas int32
}

func NewScaleCache(ctx context.Context, namespace string, scaleClient ScaleClient, config ScaleCacheConfig) *ScaleCache {
return &ScaleCache{
entries: cache.NewExpiring(),
scaleClient: scaleClient,
statefulSetNamespace: namespace,
config: config,
}
}

func (sc *ScaleCache) GetScale(ctx context.Context, statefulSetName string, options metav1.GetOptions) (*autoscalingv1.Scale, error) {
sc.entriesMu.RLock()
defer sc.entriesMu.RUnlock()

if entry, ok := sc.entries.Get(statefulSetName); ok {
entry := entry.(scaleEntry)
return &autoscalingv1.Scale{
ObjectMeta: metav1.ObjectMeta{
Name: statefulSetName,
Namespace: sc.statefulSetNamespace,
},
Spec: autoscalingv1.ScaleSpec{
Replicas: entry.specReplicas,
},
Status: autoscalingv1.ScaleStatus{
Replicas: entry.statReplicas,
},
}, nil
}

scale, err := sc.scaleClient.GetScale(ctx, statefulSetName, options)
if err != nil {
return scale, err
}

sc.setScale(statefulSetName, scale)

return scale, nil
}

func (sc *ScaleCache) UpdateScale(ctx context.Context, statefulSetName string, scale *autoscalingv1.Scale, opts metav1.UpdateOptions) (*autoscalingv1.Scale, error) {
sc.entriesMu.RLock()
defer sc.entriesMu.RUnlock()

updatedScale, err := sc.scaleClient.UpdateScale(ctx, statefulSetName, scale, opts)
if err != nil {
return updatedScale, err
}

sc.setScale(statefulSetName, updatedScale)

return updatedScale, nil
}

func (sc *ScaleCache) Reset() {
sc.entriesMu.Lock()
defer sc.entriesMu.Unlock()

sc.entries = cache.NewExpiring()
}

func (sc *ScaleCache) setScale(name string, scale *autoscalingv1.Scale) {
entry := scaleEntry{
specReplicas: scale.Spec.Replicas,
statReplicas: scale.Status.Replicas,
}

sc.entries.Set(name, entry, sc.config.RefreshPeriod)
}
50 changes: 24 additions & 26 deletions vendor/knative.dev/eventing/pkg/scheduler/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,8 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
clientappsv1 "k8s.io/client-go/kubernetes/typed/apps/v1"
corev1 "k8s.io/client-go/listers/core/v1"

kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/logging"

"knative.dev/eventing/pkg/scheduler"
Expand Down Expand Up @@ -154,34 +152,34 @@ func (s *State) IsSchedulablePod(ordinal int32) bool {

// stateBuilder reconstruct the state from scratch, by listing vpods
type stateBuilder struct {
ctx context.Context
logger *zap.SugaredLogger
vpodLister scheduler.VPodLister
capacity int32
schedulerPolicy scheduler.SchedulerPolicyType
nodeLister corev1.NodeLister
statefulSetClient clientappsv1.StatefulSetInterface
statefulSetName string
podLister corev1.PodNamespaceLister
schedPolicy *scheduler.SchedulerPolicy
deschedPolicy *scheduler.SchedulerPolicy
ctx context.Context
logger *zap.SugaredLogger
vpodLister scheduler.VPodLister
capacity int32
schedulerPolicy scheduler.SchedulerPolicyType
nodeLister corev1.NodeLister
statefulSetCache *scheduler.ScaleCache
statefulSetName string
podLister corev1.PodNamespaceLister
schedPolicy *scheduler.SchedulerPolicy
deschedPolicy *scheduler.SchedulerPolicy
}

// NewStateBuilder returns a StateAccessor recreating the state from scratch each time it is requested
func NewStateBuilder(ctx context.Context, namespace, sfsname string, lister scheduler.VPodLister, podCapacity int32, schedulerPolicy scheduler.SchedulerPolicyType, schedPolicy *scheduler.SchedulerPolicy, deschedPolicy *scheduler.SchedulerPolicy, podlister corev1.PodNamespaceLister, nodeLister corev1.NodeLister) StateAccessor {
func NewStateBuilder(ctx context.Context, namespace, sfsname string, lister scheduler.VPodLister, podCapacity int32, schedulerPolicy scheduler.SchedulerPolicyType, schedPolicy *scheduler.SchedulerPolicy, deschedPolicy *scheduler.SchedulerPolicy, podlister corev1.PodNamespaceLister, nodeLister corev1.NodeLister, statefulSetCache *scheduler.ScaleCache) StateAccessor {

return &stateBuilder{
ctx: ctx,
logger: logging.FromContext(ctx),
vpodLister: lister,
capacity: podCapacity,
schedulerPolicy: schedulerPolicy,
nodeLister: nodeLister,
statefulSetClient: kubeclient.Get(ctx).AppsV1().StatefulSets(namespace),
statefulSetName: sfsname,
podLister: podlister,
schedPolicy: schedPolicy,
deschedPolicy: deschedPolicy,
ctx: ctx,
logger: logging.FromContext(ctx),
vpodLister: lister,
capacity: podCapacity,
schedulerPolicy: schedulerPolicy,
nodeLister: nodeLister,
statefulSetCache: statefulSetCache,
statefulSetName: sfsname,
podLister: podlister,
schedPolicy: schedPolicy,
deschedPolicy: deschedPolicy,
}
}

Expand All @@ -191,7 +189,7 @@ func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32)
return nil, err
}

scale, err := s.statefulSetClient.GetScale(s.ctx, s.statefulSetName, metav1.GetOptions{})
scale, err := s.statefulSetCache.GetScale(s.ctx, s.statefulSetName, metav1.GetOptions{})
if err != nil {
s.logger.Infow("failed to get statefulset", zap.Error(err))
return nil, err
Expand Down
48 changes: 24 additions & 24 deletions vendor/knative.dev/eventing/pkg/scheduler/statefulset/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,8 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
clientappsv1 "k8s.io/client-go/kubernetes/typed/apps/v1"
"knative.dev/pkg/reconciler"

kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/logging"

"knative.dev/eventing/pkg/scheduler"
Expand All @@ -57,13 +55,13 @@ type Autoscaler interface {
}

type autoscaler struct {
statefulSetClient clientappsv1.StatefulSetInterface
statefulSetName string
vpodLister scheduler.VPodLister
logger *zap.SugaredLogger
stateAccessor st.StateAccessor
trigger chan struct{}
evictor scheduler.Evictor
statefulSetCache *scheduler.ScaleCache
statefulSetName string
vpodLister scheduler.VPodLister
logger *zap.SugaredLogger
stateAccessor st.StateAccessor
trigger chan struct{}
evictor scheduler.Evictor

// capacity is the total number of virtual replicas available per pod.
capacity int32
Expand Down Expand Up @@ -92,6 +90,8 @@ func (a *autoscaler) Promote(b reconciler.Bucket, _ func(reconciler.Bucket, type
if b.Has(ephemeralLeaderElectionObject) {
// The promoted bucket has the ephemeralLeaderElectionObject, so we are leader.
a.isLeader.Store(true)
// reset the cache to be empty so that when we access state as the leader it is always the newest values
a.statefulSetCache.Reset()
}
return nil
}
Expand All @@ -104,20 +104,20 @@ func (a *autoscaler) Demote(b reconciler.Bucket) {
}
}

func newAutoscaler(ctx context.Context, cfg *Config, stateAccessor st.StateAccessor) *autoscaler {
func newAutoscaler(ctx context.Context, cfg *Config, stateAccessor st.StateAccessor, statefulSetCache *scheduler.ScaleCache) *autoscaler {
return &autoscaler{
logger: logging.FromContext(ctx).With(zap.String("component", "autoscaler")),
statefulSetClient: kubeclient.Get(ctx).AppsV1().StatefulSets(cfg.StatefulSetNamespace),
statefulSetName: cfg.StatefulSetName,
vpodLister: cfg.VPodLister,
stateAccessor: stateAccessor,
evictor: cfg.Evictor,
trigger: make(chan struct{}, 1),
capacity: cfg.PodCapacity,
refreshPeriod: cfg.RefreshPeriod,
lock: new(sync.Mutex),
isLeader: atomic.Bool{},
getReserved: cfg.getReserved,
logger: logging.FromContext(ctx).With(zap.String("component", "autoscaler")),
statefulSetCache: statefulSetCache,
statefulSetName: cfg.StatefulSetName,
vpodLister: cfg.VPodLister,
stateAccessor: stateAccessor,
evictor: cfg.Evictor,
trigger: make(chan struct{}, 1),
capacity: cfg.PodCapacity,
refreshPeriod: cfg.RefreshPeriod,
lock: new(sync.Mutex),
isLeader: atomic.Bool{},
getReserved: cfg.getReserved,
// Anything that is less than now() - refreshPeriod, so that we will try to compact
// as soon as we start.
lastCompactAttempt: time.Now().
Expand Down Expand Up @@ -183,7 +183,7 @@ func (a *autoscaler) doautoscale(ctx context.Context, attemptScaleDown bool) err
return err
}

scale, err := a.statefulSetClient.GetScale(ctx, a.statefulSetName, metav1.GetOptions{})
scale, err := a.statefulSetCache.GetScale(ctx, a.statefulSetName, metav1.GetOptions{})
if err != nil {
// skip a beat
a.logger.Infow("failed to get scale subresource", zap.Error(err))
Expand Down Expand Up @@ -236,7 +236,7 @@ func (a *autoscaler) doautoscale(ctx context.Context, attemptScaleDown bool) err
scale.Spec.Replicas = newreplicas
a.logger.Infow("updating adapter replicas", zap.Int32("replicas", scale.Spec.Replicas))

_, err = a.statefulSetClient.UpdateScale(ctx, a.statefulSetName, scale, metav1.UpdateOptions{})
_, err = a.statefulSetCache.UpdateScale(ctx, a.statefulSetName, scale, metav1.UpdateOptions{})
if err != nil {
a.logger.Errorw("updating scale subresource failed", zap.Error(err))
return err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ type Config struct {
StatefulSetNamespace string `json:"statefulSetNamespace"`
StatefulSetName string `json:"statefulSetName"`

ScaleCacheConfig scheduler.ScaleCacheConfig `json:"scaleCacheConfig"`
// PodCapacity max capacity for each StatefulSet's pod.
PodCapacity int32 `json:"podCapacity"`
// Autoscaler refresh period
Expand All @@ -87,14 +88,16 @@ func New(ctx context.Context, cfg *Config) (scheduler.Scheduler, error) {
podInformer := podinformer.Get(ctx)
podLister := podInformer.Lister().Pods(cfg.StatefulSetNamespace)

stateAccessor := st.NewStateBuilder(ctx, cfg.StatefulSetNamespace, cfg.StatefulSetName, cfg.VPodLister, cfg.PodCapacity, cfg.SchedulerPolicy, cfg.SchedPolicy, cfg.DeschedPolicy, podLister, cfg.NodeLister)
scaleCache := scheduler.NewScaleCache(ctx, cfg.StatefulSetNamespace, kubeclient.Get(ctx).AppsV1().StatefulSets(cfg.StatefulSetNamespace), cfg.ScaleCacheConfig)

stateAccessor := st.NewStateBuilder(ctx, cfg.StatefulSetNamespace, cfg.StatefulSetName, cfg.VPodLister, cfg.PodCapacity, cfg.SchedulerPolicy, cfg.SchedPolicy, cfg.DeschedPolicy, podLister, cfg.NodeLister, scaleCache)

var getReserved GetReserved
cfg.getReserved = func() map[types.NamespacedName]map[string]int32 {
return getReserved()
}

autoscaler := newAutoscaler(ctx, cfg, stateAccessor)
autoscaler := newAutoscaler(ctx, cfg, stateAccessor, scaleCache)

var wg sync.WaitGroup
wg.Add(1)
Expand Down
2 changes: 1 addition & 1 deletion vendor/modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1304,7 +1304,7 @@ k8s.io/utils/net
k8s.io/utils/pointer
k8s.io/utils/strings/slices
k8s.io/utils/trace
# knative.dev/eventing v0.40.1-0.20240202144010-469d0ac85e86
# knative.dev/eventing v0.40.1-0.20240206181150-5500beda659a
## explicit; go 1.21
knative.dev/eventing/cmd/heartbeats
knative.dev/eventing/pkg/adapter/v2
Expand Down

0 comments on commit d23eb24

Please sign in to comment.