Skip to content

Commit

Permalink
Add user-definable auxiliary procedures to init/exit (#164)
Browse files Browse the repository at this point in the history
* Add user-definable auxiliary procedures to init/exit

* Fix GC safety perventing compilation

* Add tests

* Remove accidentally redundant test

* Fix nil checks - check if proc is nil, not pointer to proc

* Auxiliary Proc type fixes, removed pointer indirection

* Remove typo

* Fix URL in comment
  • Loading branch information
awr1 authored Sep 15, 2020
1 parent 2e528bd commit e5a3701
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 2 deletions.
18 changes: 18 additions & 0 deletions tests/test_auxiliary_procs.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import
std/[atomics, os],
../weave

var count: Atomic[int]

proc increment() = count += 1
proc decrement() = count -= 1

const ThreadsToLaunch = 8 # Arbitrary value

putEnv("WEAVE_NUM_THREADS", $ThreadsToLaunch)

init(Weave, increment)
syncRoot(Weave)
doAssert(count.load == ThreadsToLaunch - 1)
exit(Weave, decrement)
doAssert(count.load == 0)
2 changes: 2 additions & 0 deletions weave.nimble
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ task test, "Run Weave tests":
test "", "weave/parallel_reduce.nim"

test "--debugger:native", "tests/test_background_jobs.nim"
test "--debugger:native", "tests/test_auxiliary_procs.nim"

test "-d:WV_LazyFlowvar", "weave/parallel_tasks.nim"
test "-d:WV_LazyFlowvar", "weave/parallel_for.nim"
Expand Down Expand Up @@ -117,6 +118,7 @@ task test_gc_arc, "Run Weave tests with --gc:arc":
test "--gc:arc", "weave/parallel_reduce.nim"

test "--gc:arc", "tests/test_background_jobs.nim"
test "--gc:arc", "tests/test_auxiliary_procs.nim"

test "--gc:arc -d:WV_LazyFlowvar", "weave/parallel_tasks.nim"
test "--gc:arc -d:WV_LazyFlowvar", "weave/parallel_for.nim"
Expand Down
1 change: 1 addition & 0 deletions weave/datatypes/context_global.nim
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type
barrier*: SyncBarrier
## Barrier for initialization and teardown
manager*: ManagerContext
auxiliaryInit*, auxiliaryExit*: proc() {.nimcall, gcsafe.}

ManagerContext* = object
## Manager context
Expand Down
17 changes: 15 additions & 2 deletions weave/runtime.nim
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ else:
# Runtime public routines
# ----------------------------------------------------------------------------------

proc init*(_: type Weave) =
type AuxiliaryProc* = proc() {.nimcall, gcsafe.} | proc() {.cdecl, gcsafe.}
const WV_NoAuxiliary: proc() {.nimcall, gcsafe.} = nil # for type resolution

proc init*(_: type Weave, auxiliary: AuxiliaryProc = WV_NoAuxiliary) =
# TODO detect Hyper-Threading and NUMA domain
manager.acceptsJobs.store(false, moRelaxed)

Expand All @@ -42,6 +45,11 @@ proc init*(_: type Weave) =
else:
workforce() = int32 countProcessors()

when auxiliary is proc() {.cdecl, gcsafe.}:
globalCtx.auxiliaryInit = proc() {.nimcall, gcsafe.} = auxiliary
else:
globalCtx.auxiliaryInit = auxiliary

## Allocation of the global context.
globalCtx.mempools = wv_alloc(TLPoolAllocator, workforce())
globalCtx.threadpool = wv_alloc(Thread[WorkerID], workforce())
Expand Down Expand Up @@ -167,7 +175,12 @@ proc globalCleanup() =
metrics:
log("+========================================+\n")

proc exit*(_: type Weave) =
proc exit*(_: type Weave, auxiliary: AuxiliaryProc = WV_NoAuxiliary) =
when auxiliary is proc() {.cdecl, gcsafe.}:
globalCtx.auxiliaryExit = proc() {.nimcall, gcsafe.} = auxiliary
else:
globalCtx.auxiliaryExit = auxiliary

syncRoot(_)
signalTerminate(nil)
workerContext.signaledTerminate = true
Expand Down
9 changes: 9 additions & 0 deletions weave/scheduler.nim
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,11 @@ proc worker_entry_fn*(id: WorkerID) =
myID() = id # If this crashes, you need --tlsemulation:off
myMemPool().initialize()
setupWorker()

# Unstealable user-definable procedure to be executed on all threads
{.gcsafe.}:
if not globalCtx.auxiliaryInit.isNil: globalCtx.auxiliaryInit()

discard globalCtx.barrier.wait()

eventLoop()
Expand All @@ -181,6 +186,10 @@ proc worker_entry_fn*(id: WorkerID) =
# 1 matching barrier in init(Runtime) for lead thread
workerMetrics()

# Same as auxiliaryInit, but for cleanup
{.gcsafe.}:
if not globalCtx.auxiliaryExit.isNil: globalCtx.auxiliaryExit()

teardownWorker()
postCondition: localThreadKind == Unknown

Expand Down

0 comments on commit e5a3701

Please sign in to comment.