Skip to content

Commit

Permalink
Add unit tests for workunitbase and wrap fsnotify library calls (#866)
Browse files Browse the repository at this point in the history
  • Loading branch information
AaronH88 authored Oct 10, 2023
1 parent 215c103 commit 635ed43
Show file tree
Hide file tree
Showing 10 changed files with 541 additions and 36 deletions.
6 changes: 3 additions & 3 deletions pkg/workceptor/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func (cw *commandUnit) runCommand(cmd *exec.Cmd) error {
})
}()
go cmdWaiter(cmd, doneChan)
go cw.monitorLocalStatus()
go cw.MonitorLocalStatus()

return nil
}
Expand Down Expand Up @@ -263,7 +263,7 @@ func (cw *commandUnit) Restart() error {
// Job never started - mark it failed
cw.UpdateBasicStatus(WorkStateFailed, "Pending at restart", stdoutSize(cw.UnitDir()))
}
go cw.monitorLocalStatus()
go cw.MonitorLocalStatus()

return nil
}
Expand Down Expand Up @@ -331,7 +331,7 @@ func (cfg CommandWorkerCfg) NewWorker(w *Workceptor, unitID string, workType str
baseParams: cfg.Params,
allowRuntimeParams: cfg.AllowRuntimeParams,
}
cw.BaseWorkUnit.Init(w, unitID, workType)
cw.BaseWorkUnit.Init(w, unitID, workType, FileSystem{}, nil)

return cw
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/workceptor/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func newCommandWorker(w *Workceptor, unitID string, workType string) WorkUnit {
baseParams: "foo",
allowRuntimeParams: true,
}
cw.BaseWorkUnit.Init(w, unitID, workType)
cw.BaseWorkUnit.Init(w, unitID, workType, FileSystem{}, nil)

return cw
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/workceptor/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -1272,7 +1272,7 @@ func (kw *kubeUnit) startOrRestart() error {
} else {
go kw.runWorkUsingLogger()
}
go kw.monitorLocalStatus()
go kw.MonitorLocalStatus()

return nil
}
Expand Down Expand Up @@ -1388,7 +1388,7 @@ func (cfg KubeWorkerCfg) NewWorker(w *Workceptor, unitID string, workType string
deletePodOnRestart: cfg.DeletePodOnRestart,
namePrefix: fmt.Sprintf("%s-", strings.ToLower(cfg.WorkType)),
}
ku.BaseWorkUnit.Init(w, unitID, workType)
ku.BaseWorkUnit.Init(w, unitID, workType, FileSystem{}, nil)

return ku
}
Expand Down
14 changes: 14 additions & 0 deletions pkg/workceptor/mock_workceptor/stdio_utils.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

76 changes: 76 additions & 0 deletions pkg/workceptor/mock_workceptor/workunitbase.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/workceptor/python.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (cfg workPythonCfg) NewWorker(w *Workceptor, unitID string, workType string
function: cfg.Function,
config: cfg.Config,
}
cw.BaseWorkUnit.Init(w, unitID, workType)
cw.BaseWorkUnit.Init(w, unitID, workType, FileSystem{}, nil)

return cw
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/workceptor/remote_work.go
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,7 @@ func (rw *remoteUnit) Release(force bool) error {

func newRemoteWorker(w *Workceptor, unitID, workType string) WorkUnit {
rw := &remoteUnit{logger: w.nc.GetLogger()}
rw.BaseWorkUnit.Init(w, unitID, workType)
rw.BaseWorkUnit.Init(w, unitID, workType, FileSystem{}, nil)
red := &remoteExtraData{}
red.RemoteParams = make(map[string]string)
rw.status.ExtraData = red
Expand Down
6 changes: 6 additions & 0 deletions pkg/workceptor/stdio_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type FileSystemer interface {
OpenFile(name string, flag int, perm os.FileMode) (*os.File, error)
Stat(name string) (os.FileInfo, error)
Open(name string) (*os.File, error)
RemoveAll(path string) error
}

// FileSystem represents the real filesystem.
Expand All @@ -36,6 +37,11 @@ func (FileSystem) Open(name string) (*os.File, error) {
return os.Open(name)
}

// RemoveAll removes path and any children it contains.
func (FileSystem) RemoveAll(path string) error {
return os.RemoveAll(path)
}

// FileWriteCloser wraps io.WriteCloser.
type FileWriteCloser interface {
io.WriteCloser
Expand Down
90 changes: 62 additions & 28 deletions pkg/workceptor/workunitbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,29 @@ const (
WorkStateCanceled = 4
)

// WatcherWrapper is wrapping the fsnofity Watcher struct and exposing the Event chan within.
type WatcherWrapper interface {
Add(name string) error
Close() error
EventChannel() chan fsnotify.Event
}

type RealWatcher struct {
watcher *fsnotify.Watcher
}

func (rw *RealWatcher) Add(name string) error {
return rw.watcher.Add(name)
}

func (rw *RealWatcher) Close() error {
return rw.watcher.Close()
}

func (rw *RealWatcher) EventChannel() chan fsnotify.Event {
return rw.watcher.Events
}

// IsComplete returns true if a given WorkState indicates the job is finished.
func IsComplete(workState int) bool {
return workState == WorkStateSucceeded || workState == WorkStateFailed
Expand Down Expand Up @@ -74,10 +97,12 @@ type BaseWorkUnit struct {
lastUpdateErrorLock *sync.RWMutex
ctx context.Context
cancel context.CancelFunc
fs FileSystemer
watcher WatcherWrapper
}

// Init initializes the basic work unit data, in memory only.
func (bwu *BaseWorkUnit) Init(w *Workceptor, unitID string, workType string) {
func (bwu *BaseWorkUnit) Init(w *Workceptor, unitID string, workType string, fs FileSystemer, watcher WatcherWrapper) {
bwu.w = w
bwu.status.State = WorkStatePending
bwu.status.Detail = "Unit Created"
Expand All @@ -90,6 +115,17 @@ func (bwu *BaseWorkUnit) Init(w *Workceptor, unitID string, workType string) {
bwu.statusLock = &sync.RWMutex{}
bwu.lastUpdateErrorLock = &sync.RWMutex{}
bwu.ctx, bwu.cancel = context.WithCancel(w.ctx)
bwu.fs = fs
if watcher != nil {
bwu.watcher = watcher
} else {
watcher, err := fsnotify.NewWatcher()
if err == nil {
bwu.watcher = &RealWatcher{watcher: watcher}
} else {
bwu.watcher = nil
}
}
}

// Error logs message with unitID prepended.
Expand Down Expand Up @@ -340,33 +376,29 @@ func (bwu *BaseWorkUnit) LastUpdateError() error {
return bwu.lastUpdateError
}

// monitorLocalStatus watches a unit dir and keeps the in-memory workUnit up to date with status changes.
func (bwu *BaseWorkUnit) monitorLocalStatus() {
// MonitorLocalStatus watches a unit dir and keeps the in-memory workUnit up to date with status changes.
func (bwu *BaseWorkUnit) MonitorLocalStatus() {
statusFile := path.Join(bwu.UnitDir(), "status")
watcher, err := fsnotify.NewWatcher()
if err == nil {
err = watcher.Add(statusFile)
var watcherEvents chan fsnotify.Event
watcherEvents = make(chan fsnotify.Event)

if bwu.watcher != nil {
err := bwu.watcher.Add(statusFile)
if err == nil {
defer func() {
_ = watcher.Close()
_ = bwu.watcher.Close()
}()
watcherEvents = bwu.watcher.EventChannel()
} else {
_ = watcher.Close()
watcher = nil
_ = bwu.watcher.Close()
bwu.watcher = nil
}
} else {
watcher = nil
}
fi, err := os.Stat(statusFile)
fi, err := bwu.fs.Stat(statusFile)
if err != nil {
fi = nil
}
var watcherEvents chan fsnotify.Event
if watcher == nil {
watcherEvents = make(chan fsnotify.Event)
} else {
watcherEvents = watcher.Events
}

loop:
for {
select {
Expand All @@ -380,14 +412,12 @@ loop:
}
}
case <-time.After(time.Second):
newFi, err := os.Stat(statusFile)
if err == nil {
if fi == nil || fi.ModTime() != newFi.ModTime() {
fi = newFi
err = bwu.Load()
if err != nil {
bwu.w.nc.GetLogger().Error("Error reading %s: %s", statusFile, err)
}
newFi, err := bwu.fs.Stat(statusFile)
if err == nil && (fi == nil || fi.ModTime() != newFi.ModTime()) {
fi = newFi
err = bwu.Load()
if err != nil {
bwu.w.nc.GetLogger().Error("Error reading %s: %s", statusFile, err)
}
}
}
Expand Down Expand Up @@ -425,7 +455,7 @@ func (bwu *BaseWorkUnit) Release(force bool) error {
defer bwu.statusLock.Unlock()
attemptsLeft := 3
for {
err := os.RemoveAll(bwu.UnitDir())
err := bwu.fs.RemoveAll(bwu.UnitDir())
if force {
break
} else if err != nil {
Expand All @@ -451,11 +481,15 @@ func (bwu *BaseWorkUnit) Release(force bool) error {
return nil
}

func (bwu *BaseWorkUnit) CancelContext() {
bwu.cancel()
}

// =============================================================================================== //

func newUnknownWorker(w *Workceptor, unitID string, workType string) WorkUnit {
uu := &unknownUnit{}
uu.BaseWorkUnit.Init(w, unitID, workType)
uu.BaseWorkUnit.Init(w, unitID, workType, FileSystem{}, nil)

return uu
}
Expand Down
Loading

0 comments on commit 635ed43

Please sign in to comment.