Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pull] master from apache:master #65

Merged
merged 6 commits into from
Aug 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions .github/workflows/pre-commit.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,18 @@ jobs:
strategy:
fail-fast: false
matrix:
k8s: [v1.30.0, v1.29.4, v1.28.9, v1.27.13, v1.26.15, v1.25.16, v1.24.17]
plugin: ['', '--plugin']
k8s:
[
v1.31.0,
v1.30.4,
v1.29.8,
v1.28.13,
v1.27.16,
v1.26.15,
v1.25.16,
v1.24.17,
]
plugin: ["", "--plugin"]
steps:
- name: Checkout source code
uses: actions/checkout@v4
Expand Down Expand Up @@ -76,5 +86,5 @@ jobs:
uses: actions/upload-artifact@v4
if: ${{ failure() }}
with:
name: ${{ github.job }} stdout (${{ matrix.k8s }}${{ matrix.plugin == '--plugin' && format(', {0}', matrix.plugin) || matrix.plugin }})
name: ${{ github.job }} stdout (${{ matrix.k8s }}${{ matrix.plugin == '--plugin' && format(', {0}', matrix.plugin) || matrix.plugin }})
path: build/e2e
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ KUBECTL_BIN=$(KUBECTL_PATH)/kubectl
export PATH := $(BASE_DIR)/$(KUBECTL_PATH):$(PATH)

# kind
KIND_VERSION=v0.23.0
KIND_VERSION=v0.24.0
KIND_PATH=$(TOOLS_DIR)/kind-$(KIND_VERSION)
KIND_BIN=$(KIND_PATH)/kind
export PATH := $(BASE_DIR)/$(KIND_PATH):$(PATH)
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ module github.com/apache/yunikorn-k8shim
go 1.21

require (
github.com/apache/yunikorn-core v0.0.0-20240802210614-4aec626c6bf9
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240731203810-92032b13d586
github.com/apache/yunikorn-core v0.0.0-20240815214512-f51aaba68ff2
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240815142741-38a38685cd4e
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.6.0
github.com/looplab/fsm v1.0.1
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ github.com/NYTimes/gziphandler v1.1.1 h1:ZUDjpQae29j0ryrS0u/B8HZfJBtBQHjqw2rQ2cq
github.com/NYTimes/gziphandler v1.1.1/go.mod h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c=
github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230305170008-8188dc5388df h1:7RFfzj4SSt6nnvCPbCqijJi1nWCd+TqAT3bYCStRC18=
github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230305170008-8188dc5388df/go.mod h1:pSwJ0fSY5KhvocuWSx4fz3BA8OrA1bQn+K1Eli3BRwM=
github.com/apache/yunikorn-core v0.0.0-20240802210614-4aec626c6bf9 h1:s1Co/K+cR9Q/GW0e974dToW9eyLQZxYoCp0TCoEuEj0=
github.com/apache/yunikorn-core v0.0.0-20240802210614-4aec626c6bf9/go.mod h1:S9yGBGA2i2hAajtEc2t4lmiPJDZz3Ek8eVxz5KhJqGI=
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240731203810-92032b13d586 h1:ZVpo9Qj2/gvwX6Rl44UxkZBm2pZWEJDYWTramc9hwF0=
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240731203810-92032b13d586/go.mod h1:WuHJpVk34t8N5+1ErYGj/5Qq33/cRzL4YtuoAsbMtWc=
github.com/apache/yunikorn-core v0.0.0-20240815214512-f51aaba68ff2 h1:m1kxL2ce3QfHOsYl5D+AfHn7xjFxP40b88na/7qzmS8=
github.com/apache/yunikorn-core v0.0.0-20240815214512-f51aaba68ff2/go.mod h1:QHKfJ2RyZuQnZg28SnypmnvFxN/zfoYf+hmfxiVdq5g=
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240815142741-38a38685cd4e h1:ZOLst6ROwUrgoUQbEdYaz28iKuiU5YNYGtelKsTFhqw=
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240815142741-38a38685cd4e/go.mod h1:WuHJpVk34t8N5+1ErYGj/5Qq33/cRzL4YtuoAsbMtWc=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a h1:idn718Q4B6AGu/h5Sxe66HYVdqdGu2l9Iebqhi/AEoA=
Expand Down
35 changes: 6 additions & 29 deletions pkg/cache/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,42 +627,19 @@ func (app *Application) handleFailApplicationEvent(errMsg string) {
}
}

func (app *Application) handleReleaseAppAllocationEvent(allocationKey string, terminationType string) {
log.Log(log.ShimCacheApplication).Info("try to release pod from application",
zap.String("appID", app.applicationID),
zap.String("allocationKey", allocationKey),
zap.String("terminationType", terminationType))

for _, task := range app.taskMap {
if task.allocationKey == allocationKey {
task.setTaskTerminationType(terminationType)
err := task.DeleteTaskPod()
if err != nil {
log.Log(log.ShimCacheApplication).Error("failed to release allocation from application", zap.Error(err))
}
app.publishPlaceholderTimeoutEvents(task)
}
}
}

func (app *Application) handleReleaseAppAllocationAskEvent(taskID string, terminationType string) {
func (app *Application) handleReleaseAppAllocationEvent(taskID string, terminationType string) {
log.Log(log.ShimCacheApplication).Info("try to release pod from application",
zap.String("appID", app.applicationID),
zap.String("taskID", taskID),
zap.String("terminationType", terminationType))

if task, ok := app.taskMap[taskID]; ok {
task.setTaskTerminationType(terminationType)
if task.IsPlaceholder() {
err := task.DeleteTaskPod()
if err != nil {
log.Log(log.ShimCacheApplication).Error("failed to release allocation ask from application", zap.Error(err))
}
app.publishPlaceholderTimeoutEvents(task)
} else {
log.Log(log.ShimCacheApplication).Warn("skip to release allocation ask, ask is not a placeholder",
zap.String("appID", app.applicationID),
zap.String("taskID", taskID))
err := task.DeleteTaskPod()
if err != nil {
log.Log(log.ShimCacheApplication).Error("failed to release allocation from application", zap.Error(err))
}
app.publishPlaceholderTimeoutEvents(task)
} else {
log.Log(log.ShimCacheApplication).Warn("task not found",
zap.String("appID", app.applicationID),
Expand Down
64 changes: 3 additions & 61 deletions pkg/cache/application_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,12 @@ const (
KillApplication
KilledApplication
ReleaseAppAllocation
ReleaseAppAllocationAsk
ResumingApplication
AppTaskCompleted
)

func (ae ApplicationEventType) String() string {
return [...]string{"SubmitApplication", "AcceptApplication", "TryReserve", "UpdateReservation", "RunApplication", "RejectApplication", "CompleteApplication", "FailApplication", "KillApplication", "KilledApplication", "ReleaseAppAllocation", "ReleaseAppAllocationAsk", "ResumingApplication", "AppTaskCompleted"}[ae]
return [...]string{"SubmitApplication", "AcceptApplication", "TryReserve", "UpdateReservation", "RunApplication", "RejectApplication", "CompleteApplication", "FailApplication", "KillApplication", "KilledApplication", "ReleaseAppAllocation", "ResumingApplication", "AppTaskCompleted"}[ae]
}

// ------------------------
Expand Down Expand Up @@ -295,37 +294,6 @@ func (re ReleaseAppAllocationEvent) GetEvent() string {
return re.event.String()
}

type ReleaseAppAllocationAskEvent struct {
applicationID string
taskID string
terminationType string
event ApplicationEventType
}

func NewReleaseAppAllocationAskEvent(appID string, allocTermination si.TerminationType, taskID string) ReleaseAppAllocationAskEvent {
return ReleaseAppAllocationAskEvent{
applicationID: appID,
taskID: taskID,
terminationType: si.TerminationType_name[int32(allocTermination)],
event: ReleaseAppAllocationAsk,
}
}

func (re ReleaseAppAllocationAskEvent) GetApplicationID() string {
return re.applicationID
}

func (re ReleaseAppAllocationAskEvent) GetArgs() []interface{} {
args := make([]interface{}, 2)
args[0] = re.taskID
args[1] = re.terminationType
return args
}

func (re ReleaseAppAllocationAskEvent) GetEvent() string {
return re.event.String()
}

// ------------------------
// Resuming application
// ------------------------
Expand Down Expand Up @@ -434,7 +402,7 @@ func newAppState() *fsm.FSM { //nolint:funlen
},
{
Name: ReleaseAppAllocation.String(),
Src: []string{states.Running},
Src: []string{states.Running, states.Accepted, states.Reserving},
Dst: states.Running,
},
{
Expand All @@ -447,21 +415,6 @@ func newAppState() *fsm.FSM { //nolint:funlen
Src: []string{states.Resuming},
Dst: states.Resuming,
},
{
Name: ReleaseAppAllocationAsk.String(),
Src: []string{states.Running, states.Accepted, states.Reserving},
Dst: states.Running,
},
{
Name: ReleaseAppAllocationAsk.String(),
Src: []string{states.Failing},
Dst: states.Failing,
},
{
Name: ReleaseAppAllocationAsk.String(),
Src: []string{states.Resuming},
Dst: states.Resuming,
},
{
Name: CompleteApplication.String(),
Src: []string{states.Running},
Expand Down Expand Up @@ -543,17 +496,6 @@ func newAppState() *fsm.FSM { //nolint:funlen
app.onReservationStateChange()
},
ReleaseAppAllocation.String(): func(_ context.Context, event *fsm.Event) {
app := event.Args[0].(*Application) //nolint:errcheck
eventArgs := make([]string, 2)
if err := events.GetEventArgsAsStrings(eventArgs, event.Args[1].([]interface{})); err != nil {
log.Log(log.ShimFSM).Error("fail to parse event arg", zap.Error(err))
return
}
allocationKey := eventArgs[0]
terminationType := eventArgs[1]
app.handleReleaseAppAllocationEvent(allocationKey, terminationType)
},
ReleaseAppAllocationAsk.String(): func(_ context.Context, event *fsm.Event) {
app := event.Args[0].(*Application) //nolint:errcheck
eventArgs := make([]string, 2)
if err := events.GetEventArgsAsStrings(eventArgs, event.Args[1].([]interface{})); err != nil {
Expand All @@ -562,7 +504,7 @@ func newAppState() *fsm.FSM { //nolint:funlen
}
taskID := eventArgs[0]
terminationType := eventArgs[1]
app.handleReleaseAppAllocationAskEvent(taskID, terminationType)
app.handleReleaseAppAllocationEvent(taskID, terminationType)
},
AppTaskCompleted.String(): func(_ context.Context, event *fsm.Event) {
app := event.Args[0].(*Application) //nolint:errcheck
Expand Down
100 changes: 0 additions & 100 deletions pkg/cache/application_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -769,105 +769,6 @@ func TestReleaseAppAllocationEventGetApplicationID(t *testing.T) {
}
}

func TestNewReleaseAppAllocationAskEvent(t *testing.T) {
tests := []struct {
name string
appID, taskID string
terminationType si.TerminationType
wantID, wantTaskID, wantType string
wantEvent ApplicationEventType
}{
{TestCreateName, "testAppId001", "testTaskId001", si.TerminationType_TIMEOUT, "testAppId001", "testTaskId001", "TIMEOUT", ReleaseAppAllocationAsk},
}

for _, tt := range tests {
instance := NewReleaseAppAllocationAskEvent(tt.appID, tt.terminationType, tt.taskID)
t.Run(tt.name, func(t *testing.T) {
if instance.applicationID != tt.wantID || instance.taskID != tt.taskID || instance.terminationType != tt.wantType || instance.event != tt.wantEvent {
t.Errorf("want %s %s %s %s, got %s %s %s %s",
tt.wantID, tt.taskID, tt.wantType, tt.wantEvent,
instance.applicationID, instance.taskID, instance.terminationType, instance.event)
}
})
}
}

func TestReleaseAppAllocationAskEventGetEvent(t *testing.T) {
tests := []struct {
name string
appID, taskID string
terminationType si.TerminationType
wantEvent ApplicationEventType
}{
{TestEventName, "testAppId001", "testTaskId001", si.TerminationType_TIMEOUT, ReleaseAppAllocationAsk},
}

for _, tt := range tests {
instance := NewReleaseAppAllocationAskEvent(tt.appID, tt.terminationType, tt.taskID)
event := instance.GetEvent()
t.Run(tt.name, func(t *testing.T) {
if event != tt.wantEvent.String() {
t.Errorf("want %s, got %s", tt.wantEvent, event)
}
})
}
}

func TestReleaseAppAllocationAskEventGetArgs(t *testing.T) {
tests := []struct {
name string
appID, taskID string
terminationType si.TerminationType
wantLen int
wantTaskID, wantType string
castOk []bool
wantArg []string
}{
{TestArgsName, "testAppId001", "testTaskId001", si.TerminationType_TIMEOUT, 2, "testTaskId001", "TIMEOUT", []bool{true, true}, []string{"testTaskId001", "TIMEOUT"}},
}

for _, tt := range tests {
instance := NewReleaseAppAllocationAskEvent(tt.appID, tt.terminationType, tt.taskID)
args := instance.GetArgs()
t.Run(tt.name, func(t *testing.T) {
if len(args) != tt.wantLen {
t.Errorf("want %d, got %d", tt.wantLen, len(args))

for index, arg := range args {
info, ok := arg.(string)
if ok != tt.castOk[index] {
t.Errorf("want %v, got %v", tt.castOk[index], ok)
}
if info != tt.wantArg[index] {
t.Errorf("want %s, got %s", tt.wantArg[index], info)
}
}
}
})
}
}

func TestReleaseAppAllocationAskEventGetApplicationID(t *testing.T) {
tests := []struct {
name string
appID, taskID string
terminationType si.TerminationType
wantID string
}{
{TestAppIDName, "testAppId001", "testTaskId001", si.TerminationType_TIMEOUT, "testAppId001"},
}

for _, tt := range tests {
instance := NewReleaseAppAllocationAskEvent(tt.appID, tt.terminationType, tt.taskID)
appID := instance.GetApplicationID()
t.Run(tt.name, func(t *testing.T) {
if appID != tt.wantID {
t.Errorf("want %s, got %s", tt.wantID, appID)
}
})
}
}

func TestNewResumingApplicationEvent(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -902,7 +803,6 @@ func TestApplicationEventsAsString(t *testing.T) {
assert.Equal(t, KillApplication.String(), "KillApplication")
assert.Equal(t, KilledApplication.String(), "KilledApplication")
assert.Equal(t, ReleaseAppAllocation.String(), "ReleaseAppAllocation")
assert.Equal(t, ReleaseAppAllocationAsk.String(), "ReleaseAppAllocationAsk")
assert.Equal(t, ResumingApplication.String(), "ResumingApplication")
assert.Equal(t, AppTaskCompleted.String(), "AppTaskCompleted")
}
4 changes: 2 additions & 2 deletions pkg/cache/placeholder.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ func newPlaceholder(placeholderName string, app *Application, taskGroup TaskGrou
Name: placeholderName,
Namespace: app.tags[constants.AppTagNamespace],
Labels: utils.MergeMaps(taskGroup.Labels, map[string]string{
constants.LabelApplicationID: app.GetApplicationID(),
constants.LabelQueueName: app.GetQueue(),
constants.CanonicalLabelApplicationID: app.GetApplicationID(),
constants.CanonicalLabelQueueName: app.GetQueue(),
}),
Annotations: annotations,
OwnerReferences: ownerRefs,
Expand Down
8 changes: 4 additions & 4 deletions pkg/cache/placeholder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,10 @@ func TestNewPlaceholder(t *testing.T) {
assert.Equal(t, holder.pod.Name, "ph-name")
assert.Equal(t, holder.pod.Namespace, namespace)
assert.DeepEqual(t, holder.pod.Labels, map[string]string{
constants.LabelApplicationID: appID,
constants.LabelQueueName: queue,
"labelKey0": "labelKeyValue0",
"labelKey1": "labelKeyValue1",
constants.CanonicalLabelApplicationID: appID,
constants.CanonicalLabelQueueName: queue,
"labelKey0": "labelKeyValue0",
"labelKey1": "labelKeyValue1",
})
assert.Equal(t, len(holder.pod.Annotations), 6, "unexpected number of annotations")
assert.Equal(t, holder.pod.Annotations[constants.AnnotationTaskGroupName], app.taskGroups[0].Name)
Expand Down
19 changes: 0 additions & 19 deletions pkg/cache/scheduler_callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,6 @@ func (callback *AsyncRMCallback) UpdateAllocation(response *si.AllocationRespons
}
}

for _, reject := range response.Rejected {
// request rejected by the scheduler, put it back and try scheduling again
log.Log(log.ShimRMCallback).Debug("callback: response to rejected ask",
zap.String("allocationKey", reject.AllocationKey))
dispatcher.Dispatch(NewRejectTaskEvent(reject.ApplicationID, reject.AllocationKey,
fmt.Sprintf("task %s ask from application %s is rejected by scheduler",
reject.AllocationKey, reject.ApplicationID)))
}

for _, reject := range response.RejectedAllocations {
// request rejected by the scheduler, reject it
log.Log(log.ShimRMCallback).Debug("callback: response to rejected allocation",
Expand All @@ -110,16 +101,6 @@ func (callback *AsyncRMCallback) UpdateAllocation(response *si.AllocationRespons
}
}

for _, ask := range response.ReleasedAsks {
log.Log(log.ShimRMCallback).Debug("callback: response to released allocations",
zap.String("allocation key", ask.AllocationKey))

if ask.TerminationType == si.TerminationType_TIMEOUT {
ev := NewReleaseAppAllocationAskEvent(ask.ApplicationID, ask.TerminationType, ask.AllocationKey)
dispatcher.Dispatch(ev)
}
}

return nil
}

Expand Down
Loading