Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP]Add CacheworkerSet as an abstract interface for worker && Add AdvancedStatefulset Interface For Worker Pod Scale In #4294

Open
wants to merge 29 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
671d3b7
Create cacheworkerset.go
zwbrealm Aug 24, 2024
3de8f05
Update cacheworkerset.go
zwbrealm Aug 24, 2024
4e31499
Create AdvancedStatefulset.go
zwbrealm Aug 24, 2024
ea0dbbe
Update cacheworkerset.go
zwbrealm Aug 25, 2024
b4a3b1c
Update and rename AdvancedStatefulset.go to advanced_statefulset.go
zwbrealm Aug 25, 2024
dea43ea
Update advanced_statefulset.go
zwbrealm Aug 26, 2024
7f34fbc
Create cacheworkerset_interface.go
zwbrealm Aug 26, 2024
6fbd2b9
新建types/cacheworker文件夹
zwbrealm Sep 1, 2024
0d53d00
Merge branch 'fluid-cloudnative:master' into master
zwbrealm Sep 1, 2024
c62af6e
0902 advancedstatefulset spec和status增加
zwbrealm Sep 2, 2024
67eca33
Merge branch 'fluid-cloudnative:master' into master
zwbrealm Sep 2, 2024
70e4072
0904 测试
zwbrealm Sep 4, 2024
79bf6bd
0905 接口变更
zwbrealm Sep 5, 2024
4be8add
Merge branch 'fluid-cloudnative:master' into master
zwbrealm Sep 5, 2024
4ced293
Merge branch 'fluid-cloudnative:master' into master
zwbrealm Sep 7, 2024
0617e1e
Create ops.go
zwbrealm Sep 7, 2024
7756754
Merge branch 'fluid-cloudnative:master' into master
zwbrealm Sep 8, 2024
a650b12
回退
zwbrealm Sep 8, 2024
e0e1b21
Merge branch 'fluid-cloudnative:master' into master
zwbrealm Sep 16, 2024
a1ebad2
0917
zwbrealm Sep 16, 2024
172946b
0928
zwbrealm Sep 28, 2024
7e12457
Merge branch 'fluid-cloudnative:master' into master
zwbrealm Sep 28, 2024
9ac197b
Merge branch 'fluid-cloudnative:master' into master
zwbrealm Oct 4, 2024
8552ddd
1004
zwbrealm Oct 4, 2024
f1200d4
1004-2
zwbrealm Oct 4, 2024
8f7941c
Merge remote-tracking branch 'origin/master'
zwbrealm Oct 4, 2024
e6367a7
1005
zwbrealm Oct 5, 2024
65dee01
1006-1
zwbrealm Oct 6, 2024
4dc05ac
1011-1
zwbrealm Oct 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions pkg/ddc/alluxio/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ import (
// AlluxioEngine implements the Engine interface.
type AlluxioEngine struct {
// *base.TemplateEngine
runtime *datav1alpha1.AlluxioRuntime
name string
namespace string
Runtime *datav1alpha1.AlluxioRuntime
Name string
Namespace string
runtimeType string
engineImpl string
Log logr.Logger
Expand All @@ -54,8 +54,8 @@ type AlluxioEngine struct {
// Build function builds the Alluxio Engine
func Build(id string, ctx cruntime.ReconcileRequestContext) (base.Engine, error) {
engine := &AlluxioEngine{
name: ctx.Name,
namespace: ctx.Namespace,
Name: ctx.Name,
Namespace: ctx.Namespace,
Client: ctx.Client,
Recorder: ctx.Recorder,
Log: ctx.Log,
Expand All @@ -74,13 +74,13 @@ func Build(id string, ctx cruntime.ReconcileRequestContext) (base.Engine, error)
if !ok {
return nil, fmt.Errorf("engine %s is failed to parse", ctx.Name)
}
engine.runtime = runtime
engine.Runtime = runtime
} else {
return nil, fmt.Errorf("engine %s is failed to parse", ctx.Name)
}

// Build and setup runtime info
runtimeInfo, err := engine.getRuntimeInfo()
runtimeInfo, err := engine.GetRuntimeInfo()
if err != nil {
return nil, fmt.Errorf("engine %s failed to get runtime info, error %s", ctx.Name, err.Error())
}
Expand Down
24 changes: 4 additions & 20 deletions pkg/ddc/alluxio/runtime_info.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,3 @@
/*
Copyright 2020 The Fluid Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package alluxio

import (
Expand All @@ -23,14 +7,14 @@ import (
)

// getRuntimeInfo gets runtime info
func (e *AlluxioEngine) getRuntimeInfo() (base.RuntimeInfoInterface, error) {
func (e *AlluxioEngine) GetRuntimeInfo() (base.RuntimeInfoInterface, error) {
if e.runtimeInfo == nil {
runtime, err := e.getRuntime()
if err != nil {
return e.runtimeInfo, err
}

e.runtimeInfo, err = base.BuildRuntimeInfo(e.name, e.namespace, e.runtimeType, runtime.Spec.TieredStore, base.WithMetadataList(base.GetMetadataListFromAnnotation(runtime)))
e.runtimeInfo, err = base.BuildRuntimeInfo(e.Name, e.Namespace, e.runtimeType, runtime.Spec.TieredStore, base.WithMetadataList(base.GetMetadataListFromAnnotation(runtime)))
if err != nil {
return e.runtimeInfo, err
}
Expand All @@ -56,10 +40,10 @@ func (e *AlluxioEngine) getRuntimeInfo() (base.RuntimeInfoInterface, error) {
e.Log.Info("Deprecation check finished", "isLabelDeprecated", e.runtimeInfo.IsDeprecatedNodeLabel(), "isPVNameDeprecated", e.runtimeInfo.IsDeprecatedPVName())

// Setup with Dataset Info
dataset, err := utils.GetDataset(e.Client, e.name, e.namespace)
dataset, err := utils.GetDataset(e.Client, e.Name, e.Namespace)
if err != nil {
if utils.IgnoreNotFound(err) == nil {
e.Log.Info("Dataset is notfound", "name", e.name, "namespace", e.namespace)
e.Log.Info("Dataset is notfound", "name", e.Name, "namespace", e.Namespace)
return e.runtimeInfo, nil
}

Expand Down
125 changes: 110 additions & 15 deletions pkg/ddc/alluxio/shutdown.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package alluxio
import (
"context"
"fmt"

datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1"
"github.com/fluid-cloudnative/fluid/pkg/common"

Expand Down Expand Up @@ -47,7 +46,7 @@ func (e *AlluxioEngine) Shutdown() (err error) {
return
}
if e.retryShutdown < gracefulShutdownLimits {
err = e.cleanupCache()
err = e.CleanupCache()
if err != nil {
e.retryShutdown = e.retryShutdown + 1
e.Log.Info("clean cache failed",
Expand All @@ -61,7 +60,7 @@ func (e *AlluxioEngine) Shutdown() (err error) {
base.SafeClose(e.MetadataSyncDoneCh)
}

_, err = e.destroyWorkers(-1)
_, err = e.DestroyWorkers(-1)
if err != nil {
return
}
Expand All @@ -79,7 +78,7 @@ func (e *AlluxioEngine) Shutdown() (err error) {
if datav1alpha1.IsHostNetwork(runtime.Spec.Master.NetworkMode) ||
datav1alpha1.IsHostNetwork(runtime.Spec.Worker.NetworkMode) {
e.Log.Info("releasePorts for hostnetwork mode")
err = e.releasePorts()
err = e.ReleasePorts()
if err != nil {
return
}
Expand All @@ -93,13 +92,13 @@ func (e *AlluxioEngine) Shutdown() (err error) {
// destroyMaster Destroies the master
func (e *AlluxioEngine) destroyMaster() (err error) {
var found bool
found, err = helm.CheckRelease(e.name, e.namespace)
found, err = helm.CheckRelease(e.Name, e.Namespace)
if err != nil {
return err
}

if found {
err = helm.DeleteRelease(e.name, e.namespace)
err = helm.DeleteRelease(e.Name, e.Namespace)
if err != nil {
return
}
Expand All @@ -113,7 +112,7 @@ func (e *AlluxioEngine) destroyMaster() (err error) {
// }

// cleanupCache cleans up the cache
func (e *AlluxioEngine) cleanupCache() (err error) {
func (e *AlluxioEngine) CleanupCache() (err error) {
// TODO(cheyang): clean up the cache
cacheStates, err := e.queryCacheStatus()
if utils.IgnoreNotFound(err) != nil {
Expand Down Expand Up @@ -155,15 +154,15 @@ func (e *AlluxioEngine) cleanupCache() (err error) {
return fmt.Errorf("to make sure if the remaining cache is cleaned up, check again")
}

func (e *AlluxioEngine) releasePorts() (err error) {
func (e *AlluxioEngine) ReleasePorts() (err error) {
var valueConfigMapName = e.getHelmValuesConfigMapName()

allocator, err := portallocator.GetRuntimePortAllocator()
if err != nil {
return errors.Wrap(err, "GetRuntimePortAllocator when releasePorts")
}

cm, err := kubeclient.GetConfigmapByName(e.Client, valueConfigMapName, e.namespace)
cm, err := kubeclient.GetConfigmapByName(e.Client, valueConfigMapName, e.Namespace)
if err != nil {
return errors.Wrap(err, "GetConfigmapByName when releasePorts")
}
Expand Down Expand Up @@ -194,8 +193,8 @@ func (e *AlluxioEngine) cleanAll() (err error) {

var (
valueConfigmapName = e.getHelmValuesConfigMapName()
configmapName = e.name + "-config"
namespace = e.namespace
configmapName = e.Name + "-config"
namespace = e.Namespace
)

cms := []string{valueConfigmapName, configmapName}
Expand All @@ -212,12 +211,12 @@ func (e *AlluxioEngine) cleanAll() (err error) {

// destroyWorkers attempts to delete the workers until worker num reaches the given expectedWorkers, if expectedWorkers is -1, it means all the workers should be deleted
// This func returns currentWorkers representing how many workers are left after this process.
func (e *AlluxioEngine) destroyWorkers(expectedWorkers int32) (currentWorkers int32, err error) {
func (e *AlluxioEngine) DestroyWorkers(expectedWorkers int32) (currentWorkers int32, err error) {
// SchedulerMutex only for patch mode
lifecycle.SchedulerMutex.Lock()
defer lifecycle.SchedulerMutex.Unlock()

runtimeInfo, err := e.getRuntimeInfo()
runtimeInfo, err := e.GetRuntimeInfo()
if err != nil {
return currentWorkers, err
}
Expand Down Expand Up @@ -292,7 +291,7 @@ func (e *AlluxioEngine) destroyWorkers(expectedWorkers int32) (currentWorkers in
labelsToModify.Delete(label)
}

exclusiveLabelValue := utils.GetExclusiveValue(e.namespace, e.name)
exclusiveLabelValue := utils.GetExclusiveValue(e.Namespace, e.Name)
if val, exist := toUpdate.Labels[labelExclusiveName]; exist && val == exclusiveLabelValue {
labelsToModify.Delete(labelExclusiveName)
}
Expand All @@ -308,7 +307,7 @@ func (e *AlluxioEngine) destroyWorkers(expectedWorkers int32) (currentWorkers in
if err != nil {
return err
}
e.Log.Info("Destroy worker", "Dataset", e.name, "deleted worker node", node.Name, "removed or updated labels", modifiedLabels)
e.Log.Info("Destroy worker", "Dataset", e.Name, "deleted worker node", node.Name, "removed or updated labels", modifiedLabels)
return nil
})

Expand All @@ -322,6 +321,102 @@ func (e *AlluxioEngine) destroyWorkers(expectedWorkers int32) (currentWorkers in
return currentWorkers, nil
}

// DestroyPodsByAnnotationOnWorkerNodes attempts to delete all worker pods on the nodes that have the specified annotation.
// This function also cleans up the cache related to the deleted pods.
func (e *AlluxioEngine) DestroyPodsByAnnotationOnWorkerNodes(annotationKey string, annotationValue string) (err error) {
// SchedulerMutex only for patch mode
lifecycle.SchedulerMutex.Lock()
defer lifecycle.SchedulerMutex.Unlock()

runtimeInfo, err := e.GetRuntimeInfo()
if err != nil {
return err
}

var (
nodeList = &corev1.NodeList{}
labelCommonName = runtimeInfo.GetCommonLabelName()
)

datasetLabels, err := labels.Parse(fmt.Sprintf("%s=true", labelCommonName))
if err != nil {
return err
}

// List all nodes with the specified dataset labels
err = e.List(context.TODO(), nodeList, &client.ListOptions{
LabelSelector: datasetLabels,
})
if err != nil {
return err
}

// Iterate over each node to find and delete pods with the specified annotation
for _, node := range nodeList.Items {
nodeName := node.Name
currentPods, err := e.DestroyWorkerPodsByNode(nodeName, annotationKey, annotationValue)
if err != nil {
return err
}
e.Log.Info("Destroyed pods on node", "nodeName", nodeName, "remainingPods", currentPods)
}

// Clean up cache after deleting pods
err = e.CleanupCache()
if err != nil {
e.Log.Error(err, "Failed to clean up cache after deleting pods")
return err
}

return nil
}

// DestroyWorkerPodsByNode deletes all worker pods on the specified node that match the given annotation.
func (e *AlluxioEngine) DestroyWorkerPodsByNode(nodeName string, annotationKey string, annotationValue string) (currentPods int32, err error) {
var podList corev1.PodList

// List all pods in the namespace
err = e.Client.List(context.TODO(), &podList, &client.ListOptions{
Namespace: e.Namespace,
})
if err != nil {
return currentPods, err
}

currentPods = int32(len(podList.Items))

// Select the pods based on the node name and annotation
for _, pod := range podList.Items {
if pod.Spec.NodeName != nodeName {
continue // Skip pods not on the specified node
}

// Check if the pod has the specified annotation
if value, exists := pod.Annotations[annotationKey]; !exists || value != annotationValue {
continue // Skip this pod if it doesn't match the annotation
}

podName := pod.Name
err = retry.RetryOnConflict(retry.DefaultBackoff, func() error {
// Delete the pod
err := e.Client.Delete(context.TODO(), &pod)
if err != nil {
e.Log.Error(err, "Failed to delete pod", "podname", podName)
return err
}
e.Log.Info("Deleted pod", "podname", podName)
return nil
})

if err != nil {
return currentPods, err
}

currentPods--
}

return currentPods, nil
}
func (e *AlluxioEngine) sortNodesToShutdown(candidateNodes []corev1.Node) (nodes []corev1.Node, err error) {
// If fuses are deployed in global mode. Scaling in workers has nothing to do with fuses.
// All nodes with related label can be candidate nodes.
Expand Down
Loading