Skip to content

Commit

Permalink
[YUNIKORN-2467] Remove AllocationAsk from the core when a pod is comp…
Browse files Browse the repository at this point in the history
…leted (apache#797)

Closes: apache#797

Signed-off-by: Craig Condit <[email protected]>
  • Loading branch information
pbacsko authored and craigcondit committed Mar 5, 2024
1 parent 5fff0bc commit c384a88
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 7 deletions.
2 changes: 1 addition & 1 deletion pkg/cache/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ func (task *Task) releaseAllocation() {
return
}
releaseRequest = common.CreateReleaseAllocationRequestForTask(
task.applicationID, task.allocationID, task.application.partition, task.terminationType)
task.applicationID, task.taskID, task.allocationID, task.application.partition, task.terminationType)
}

if releaseRequest.Releases != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/cache/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,7 @@ func TestSimultaneousTaskCompleteAndAllocate(t *testing.T) {
PartitionName: "default",
}
mockedAPIProvider.MockSchedulerAPIUpdateAllocationFn(func(request *si.AllocationRequest) error {
assert.Equal(t, len(request.Releases.AllocationAsksToRelease), 0,
assert.Equal(t, len(request.Releases.AllocationAsksToRelease), 1,
"allocationAskToRelease is not in the expected length")
assert.Equal(t, len(request.Releases.AllocationsToRelease), 1,
"allocationsToRelease is not in the expected length")
Expand Down
13 changes: 11 additions & 2 deletions pkg/common/si_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func GetTerminationTypeFromString(terminationTypeStr string) si.TerminationType
return si.TerminationType_STOPPED_BY_RM
}

func CreateReleaseAllocationRequestForTask(appID, allocationID, partition, terminationType string) *si.AllocationRequest {
func CreateReleaseAllocationRequestForTask(appID, taskID, allocationID, partition, terminationType string) *si.AllocationRequest {
toReleases := make([]*si.AllocationRelease, 0)
toReleases = append(toReleases, &si.AllocationRelease{
ApplicationID: appID,
Expand All @@ -152,8 +152,17 @@ func CreateReleaseAllocationRequestForTask(appID, allocationID, partition, termi
Message: "task completed",
})

toReleaseAsk := make([]*si.AllocationAskRelease, 1)
toReleaseAsk[0] = &si.AllocationAskRelease{
ApplicationID: appID,
AllocationKey: taskID,
PartitionName: partition,
Message: "task request completed",
}

releaseRequest := si.AllocationReleasesRequest{
AllocationsToRelease: toReleases,
AllocationsToRelease: toReleases,
AllocationAsksToRelease: toReleaseAsk,
}

return &si.AllocationRequest{
Expand Down
9 changes: 6 additions & 3 deletions pkg/common/si_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,18 @@ import (
const nodeID = "node-01"

func TestCreateReleaseAllocationRequest(t *testing.T) {
request := CreateReleaseAllocationRequestForTask("app01", "alloc01", "default", "STOPPED_BY_RM")
request := CreateReleaseAllocationRequestForTask("app01", "task01", "alloc01", "default", "STOPPED_BY_RM")
assert.Assert(t, request.Releases != nil)
assert.Assert(t, request.Releases.AllocationsToRelease != nil)
assert.Assert(t, request.Releases.AllocationAsksToRelease == nil)
assert.Assert(t, request.Releases.AllocationAsksToRelease != nil)
assert.Equal(t, len(request.Releases.AllocationsToRelease), 1)
assert.Equal(t, len(request.Releases.AllocationAsksToRelease), 0)
assert.Equal(t, len(request.Releases.AllocationAsksToRelease), 1)
assert.Equal(t, request.Releases.AllocationsToRelease[0].ApplicationID, "app01")
assert.Equal(t, request.Releases.AllocationsToRelease[0].AllocationID, "alloc01")
assert.Equal(t, request.Releases.AllocationsToRelease[0].PartitionName, "default")
assert.Equal(t, request.Releases.AllocationAsksToRelease[0].ApplicationID, "app01")
assert.Equal(t, request.Releases.AllocationAsksToRelease[0].AllocationKey, "task01")
assert.Equal(t, request.Releases.AllocationAsksToRelease[0].PartitionName, "default")
}

func TestCreateReleaseAskRequestForTask(t *testing.T) {
Expand Down
5 changes: 5 additions & 0 deletions pkg/shim/scheduler_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/apache/yunikorn-core/pkg/entrypoint"
"github.com/apache/yunikorn-core/pkg/scheduler/objects"
"github.com/apache/yunikorn-k8shim/pkg/cache"
"github.com/apache/yunikorn-k8shim/pkg/client"
"github.com/apache/yunikorn-k8shim/pkg/common"
Expand Down Expand Up @@ -322,6 +323,10 @@ func (fc *MockScheduler) waitForApplicationStateInCore(appID, partition, expecte
}, time.Second, 5*time.Second)
}

func (fc *MockScheduler) getApplicationFromCore(appID, partition string) *objects.Application {
return fc.coreContext.Scheduler.GetClusterContext().GetApplication(appID, partition)
}

func (fc *MockScheduler) GetPodBindStats() client.BindStats {
return fc.apiProvider.GetPodBindStats()
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/shim/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ partitions:
cluster.waitAndAssertTaskState(t, "app0001", "task0002", cache.TaskStates().Completed)
err = cluster.waitForApplicationStateInCore("app0001", partitionName, "Completing")
assert.NilError(t, err)
app := cluster.getApplicationFromCore("app0001", partitionName)
assert.Equal(t, 0, len(app.GetAllRequests()), "asks were not removed from the application")
assert.Equal(t, 0, len(app.GetAllAllocations()), "allocations were not removed from the application")
}

func TestRejectApplications(t *testing.T) {
Expand Down

0 comments on commit c384a88

Please sign in to comment.