Skip to content

Commit

Permalink
backport of commit 6b8ddff
Browse files Browse the repository at this point in the history
  • Loading branch information
tgross authored Oct 16, 2024
1 parent 3d59d52 commit 7eba9f4
Show file tree
Hide file tree
Showing 9 changed files with 369 additions and 142 deletions.
3 changes: 3 additions & 0 deletions .changelog/24214.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
windows: Fixed a bug where a crashed executor would orphan task processes
```
2 changes: 2 additions & 0 deletions .github/workflows/test-windows.yml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ jobs:
gotestsum --format=short-verbose \
--junitfile results.xml \
github.com/hashicorp/nomad/drivers/docker \
github.com/hashicorp/nomad/drivers/rawexec \
github.com/hashicorp/nomad/drivers/shared/executor \
github.com/hashicorp/nomad/client/lib/fifo \
github.com/hashicorp/nomad/client/logmon \
github.com/hashicorp/nomad/client/allocrunner/taskrunner/template \
Expand Down
98 changes: 0 additions & 98 deletions drivers/rawexec/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"path/filepath"
"runtime"
"strconv"
"sync"
"syscall"
"testing"
"time"
Expand Down Expand Up @@ -237,103 +236,6 @@ func TestRawExecDriver_StartWait(t *testing.T) {
require.NoError(harness.DestroyTask(task.ID, true))
}

func TestRawExecDriver_StartWaitRecoverWaitStop(t *testing.T) {
ci.Parallel(t)
require := require.New(t)

d := newEnabledRawExecDriver(t)
harness := dtestutil.NewDriverHarness(t, d)
defer harness.Kill()

config := &Config{Enabled: true}
var data []byte
require.NoError(basePlug.MsgPackEncode(&data, config))
bconfig := &basePlug.Config{
PluginConfig: data,
AgentConfig: &base.AgentConfig{
Driver: &base.ClientDriverConfig{
Topology: d.nomadConfig.Topology,
},
},
}
require.NoError(harness.SetConfig(bconfig))

allocID := uuid.Generate()
taskName := "sleep"
task := &drivers.TaskConfig{
AllocID: allocID,
ID: uuid.Generate(),
Name: taskName,
Env: defaultEnv(),
Resources: testResources(allocID, taskName),
}
tc := &TaskConfig{
Command: testtask.Path(),
Args: []string{"sleep", "100s"},
}
require.NoError(task.EncodeConcreteDriverConfig(&tc))

testtask.SetTaskConfigEnv(task)

cleanup := harness.MkAllocDir(task, false)
defer cleanup()

harness.MakeTaskCgroup(allocID, taskName)

handle, _, err := harness.StartTask(task)
require.NoError(err)

ch, err := harness.WaitTask(context.Background(), task.ID)
require.NoError(err)

var waitDone bool
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
result := <-ch
require.Error(result.Err)
waitDone = true
}()

originalStatus, err := d.InspectTask(task.ID)
require.NoError(err)

d.tasks.Delete(task.ID)

wg.Wait()
require.True(waitDone)
_, err = d.InspectTask(task.ID)
require.Equal(drivers.ErrTaskNotFound, err)

err = d.RecoverTask(handle)
require.NoError(err)

status, err := d.InspectTask(task.ID)
require.NoError(err)
require.Exactly(originalStatus, status)

ch, err = harness.WaitTask(context.Background(), task.ID)
require.NoError(err)

wg.Add(1)
waitDone = false
go func() {
defer wg.Done()
result := <-ch
require.NoError(result.Err)
require.NotZero(result.ExitCode)
require.Equal(9, result.Signal)
waitDone = true
}()

time.Sleep(300 * time.Millisecond)
require.NoError(d.StopTask(task.ID, 0, "SIGKILL"))
wg.Wait()
require.NoError(d.DestroyTask(task.ID, false))
require.True(waitDone)
}

func TestRawExecDriver_Start_Wait_AllocDir(t *testing.T) {
ci.Parallel(t)
require := require.New(t)
Expand Down
99 changes: 99 additions & 0 deletions drivers/rawexec/driver_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"runtime"
"strconv"
"strings"
"sync"
"syscall"
"testing"
"time"
Expand All @@ -23,6 +24,7 @@ import (
"github.com/hashicorp/nomad/helper/testtask"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/plugins/base"
basePlug "github.com/hashicorp/nomad/plugins/base"
"github.com/hashicorp/nomad/plugins/drivers"
dtestutil "github.com/hashicorp/nomad/plugins/drivers/testutils"
"github.com/hashicorp/nomad/testutil"
Expand Down Expand Up @@ -443,3 +445,100 @@ func TestRawExec_ExecTaskStreaming_User(t *testing.T) {
require.Empty(t, stderr)
require.Contains(t, stdout, "nobody")
}

func TestRawExecDriver_StartWaitRecoverWaitStop(t *testing.T) {
ci.Parallel(t)
require := require.New(t)

d := newEnabledRawExecDriver(t)
harness := dtestutil.NewDriverHarness(t, d)
defer harness.Kill()

config := &Config{Enabled: true}
var data []byte
require.NoError(basePlug.MsgPackEncode(&data, config))
bconfig := &basePlug.Config{
PluginConfig: data,
AgentConfig: &base.AgentConfig{
Driver: &base.ClientDriverConfig{
Topology: d.nomadConfig.Topology,
},
},
}
require.NoError(harness.SetConfig(bconfig))

allocID := uuid.Generate()
taskName := "sleep"
task := &drivers.TaskConfig{
AllocID: allocID,
ID: uuid.Generate(),
Name: taskName,
Env: defaultEnv(),
Resources: testResources(allocID, taskName),
}
tc := &TaskConfig{
Command: testtask.Path(),
Args: []string{"sleep", "100s"},
}
require.NoError(task.EncodeConcreteDriverConfig(&tc))

testtask.SetTaskConfigEnv(task)

cleanup := harness.MkAllocDir(task, false)
defer cleanup()

harness.MakeTaskCgroup(allocID, taskName)

handle, _, err := harness.StartTask(task)
require.NoError(err)

ch, err := harness.WaitTask(context.Background(), task.ID)
require.NoError(err)

var waitDone bool
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
result := <-ch
require.Error(result.Err)
waitDone = true
}()

originalStatus, err := d.InspectTask(task.ID)
require.NoError(err)

d.tasks.Delete(task.ID)

wg.Wait()
require.True(waitDone)
_, err = d.InspectTask(task.ID)
require.Equal(drivers.ErrTaskNotFound, err)

err = d.RecoverTask(handle)
require.NoError(err)

status, err := d.InspectTask(task.ID)
require.NoError(err)
require.Exactly(originalStatus, status)

ch, err = harness.WaitTask(context.Background(), task.ID)
require.NoError(err)

wg.Add(1)
waitDone = false
go func() {
defer wg.Done()
result := <-ch
require.NoError(result.Err)
require.NotZero(result.ExitCode)
require.Equal(9, result.Signal)
waitDone = true
}()

time.Sleep(300 * time.Millisecond)
require.NoError(d.StopTask(task.ID, 0, "SIGKILL"))
wg.Wait()
require.NoError(d.DestroyTask(task.ID, false))
require.True(waitDone)
}
96 changes: 96 additions & 0 deletions drivers/rawexec/driver_windows_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1

//go:build windows

package rawexec

import (
"os"
"testing"
"time"

"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/plugins/base"
"github.com/hashicorp/nomad/plugins/drivers"
dtestutil "github.com/hashicorp/nomad/plugins/drivers/testutils"
"github.com/shoenig/test/must"
)

// TestRawExecDriver_ExecutorKill verifies that killing the executor will stop
// its child processes
func TestRawExecDriver_ExecutorKill(t *testing.T) {
ci.Parallel(t)

d := newEnabledRawExecDriver(t)
harness := dtestutil.NewDriverHarness(t, d)
t.Cleanup(harness.Kill)

config := &Config{Enabled: true}
var data []byte
must.NoError(t, base.MsgPackEncode(&data, config))
bconfig := &base.Config{
PluginConfig: data,
AgentConfig: &base.AgentConfig{
Driver: &base.ClientDriverConfig{
Topology: d.nomadConfig.Topology,
},
},
}
must.NoError(t, harness.SetConfig(bconfig))

allocID := uuid.Generate()
taskName := "test"
task := &drivers.TaskConfig{
AllocID: allocID,
ID: uuid.Generate(),
Name: taskName,
Resources: testResources(allocID, taskName),
}

taskConfig := map[string]interface{}{}
taskConfig["command"] = "Powershell.exe"
taskConfig["args"] = []string{"sleep", "100s"}

must.NoError(t, task.EncodeConcreteDriverConfig(&taskConfig))

cleanup := harness.MkAllocDir(task, false)
t.Cleanup(cleanup)

handle, _, err := harness.StartTask(task)
must.NoError(t, err)

var taskState TaskState
must.NoError(t, handle.GetDriverState(&taskState))
must.NoError(t, harness.WaitUntilStarted(task.ID, 1*time.Second))

// forcibly kill the executor, not the workload
must.NotEq(t, taskState.ReattachConfig.Pid, taskState.Pid)
proc, err := os.FindProcess(taskState.ReattachConfig.Pid)
must.NoError(t, err)

taskProc, err := os.FindProcess(taskState.Pid)
must.NoError(t, err)

must.NoError(t, proc.Kill())
t.Logf("killed %d, waiting on %d to stop", taskState.ReattachConfig.Pid, taskState.Pid)

t.Cleanup(func() {
if taskProc != nil {
taskProc.Kill()
}
})

done := make(chan struct{})
go func() {
taskProc.Wait()
close(done)
}()

select {
case <-time.After(5 * time.Second):
t.Fatal("expected child process to exit")
case <-done:
}
}
Loading

0 comments on commit 7eba9f4

Please sign in to comment.