diff --git a/README.md b/README.md index 17c15a5b..ca0a8dd3 100644 --- a/README.md +++ b/README.md @@ -152,14 +152,28 @@ exit(Weave) ### Complete list -- `init(Weave)`, `exit(Weave)` to start and stop the runtime. Forgetting this will give you nil pointer exceptions on spawn. -- `spawn fnCall(args)` which spawns a function that may run on another thread and gives you an awaitable Flowvar handle. +We separate the list depending on the threading context + +#### Root thread + +The root thread is the thread that started the Weave runtime. It has special privileges. + +- `init(Weave)`, `exit(Weave)` to start and stop the runtime. Forgetting this will give you nil pointer exceptions on spawn.\ + The thread that calls `init` will become the root thread. +- `syncRoot(Weave)` is a global barrier. The root thread will not continue beyond + until all tasks in the runtime are finished. + +#### Weave worker thread + +A worker thread is automatically created per (logical) core on the machine. +The root thread is also a worker thread. +Worker threads are tuned to maximize throughput of computational **tasks**. + +- `spawn fnCall(args)` which spawns a function that may run on another thread and gives you an awaitable `Flowvar` handle. - `newPledge`, `fulfill` and `spawnDelayed` (experimental) to delay a task until some dependencies are met. This allows expressing precise data dependencies and producer-consumer relationships. - `sync(Flowvar)` will await a Flowvar and block until you receive a result. - `isReady(Flowvar)` will check if `sync` will actually block or return the result immediately. -- `syncRoot(Weave)` is a global barrier for the main thread on the main task. - Using syncRoot in a proc means that the can only be called from the main thread. - `syncRoot(Weave)` is implicitly called by `exit(Weave)` + - `syncScope` is a scope barrier. The thread will not move beyond the scope until all tasks and parallel loops spawned and their descendants are finished. `syncScope` is composable, it can be called by any thread, it can be nested. @@ -177,13 +191,83 @@ exit(Weave) may be created by the current tasks. - `parallelFor`, `parallelForStrided`, `parallelForStaged`, `parallelForStagedStrided` are described above and in the experimental section. - `loadBalance(Weave)` gives the runtime the opportunity to distribute work. Insert this within long computation as due to Weave design, it's the busy workers that are also in charge of load balancing. This is done automatically when using `parallelFor`. -- `isSpawned` allows you to build speculative algorithm where a thread is spawned only if certain conditions are valid. See the `nqueens` benchmark for an example. -- `getThreadId` returns a unique thread ID. The thread ID is in the range 0 ..< number of threads. +- `isSpawned(Flowvar)` allows you to build speculative algorithm where a thread is spawned only if certain conditions are valid. See the `nqueens` benchmark for an example. +- `getThreadId(Weave)` returns a unique thread ID. The thread ID is in the range 0 ..< number of threads. -The max number of threads can be configured by the environment variable WEAVE_NUM_THREADS +The max number of worker threads can be configured by the environment variable WEAVE_NUM_THREADS and default to your number of logical cores (including HyperThreading). Weave uses Nim's `countProcessors()` in `std/cpuinfo` +#### Foreign thread & Background service (experimental) + +Weave can also be run as a background service and process `jobs` similar to the `Executor` concept in C++. +Jobs will be processed in FIFO order. + +> **Experimental**: +> The distinction between spawn/sync on a Weave thread +> and submit/waitFor on a foreign thread may be removed in the future. + +A background service can be started with either: +- `thr.runInBackground(Weave)` +- or `thr.runInBackground(Weave, signalShutdown: ptr Atomic[bool])` + +with `thr` an uninitialized `Thread[void]` or `Thread[ptr Atomic[bool]]` + +Then the foreign thread should call: +- `setupSubmitterThread(Weave)`: Configure a thread so that it can send jobs to a background Weave service +and on shutdown +- `waitUntilReady(Weave)`: Block the foreign thread until the Weave runtime is ready to accept jobs. + +and for shutdown +- `teardownSubmitterThread(Weave)`: Cleanup Weave resources allocated on the thread. + +Once setup, a foreign thread can submit jobs via: + +- `submit fnCall(args)` which submits a function to the Weave runtime and gives you an awaitable `Pending` handle. +- `newPledge`, `fulfill` and `submitDelayed` (experimental) to delay a task until some dependencies are met. This allows expressing precise data dependencies and producer-consumer relationships. +- `waitFor(Pending)` which await a Pending job result and blocks the current thread +- `isReady(Pending)` will check if `waitFor` will actually block or return the result immediately. +- `isSubmitted(job)` allows you to build speculative algorithm where a job is submitted only if certain conditions are valid. + +Within a job, tasks can be spawned and parallel for constructs can be used. + +If `runInBackground()` does not provide fine enough control, a Weave background event loop +can be customized using the following primitive: +- at a very low-level: + - The root thread primitives: `init(Weave)` and `exit(Weave)` + - `processAllandTryPark(Weave)`: Process all pending jobs and try sleeping. The sleep may fail to avoid deadlocks + if a job is submitted concurrently. This should be used in a `while true` event loop. +- at a medium level: + - `runForever(Weave)`: Start a never-ending event loop that processes all pending jobs and sleep until new work arrives. + - `runUntil(Weave, signalShutdown: ptr Atomic[bool])`: Start an event-loop that quits on signal. + +For example: +```Nim +proc runUntil*(_: typedesc[Weave], signal: ptr Atomic[bool]) = + ## Start a Weave event loop until signal is true on the current thread. + ## It wakes-up on job submission, handles multithreaded load balancing, + ## help process tasks + ## and spin down when there is no work anymore. + preCondition: not signal.isNil + while not signal[].load(moRelaxed): + processAllandTryPark(Weave) + syncRoot(Weave) + +proc runInBackground*( + _: typedesc[Weave], + signalShutdown: ptr Atomic[bool] + ): Thread[ptr Atomic[bool]] = + ## Start the Weave runtime on a background thread. + ## It wakes-up on job submissions, handles multithreaded load balancing, + ## help process tasks + ## and spin down when there is no work anymore. + proc eventLoop(shutdown: ptr Atomic[bool]) {.thread.} = + init(Weave) + Weave.runUntil(shutdown) + exit(Weave) + result.createThread(eventLoop, signalShutdown) +``` + ## Table of Contents - [Weave, a state-of-the-art multithreading runtime](#weave-a-state-of-the-art-multithreading-runtime) @@ -194,6 +278,9 @@ Weave uses Nim's `countProcessors()` in `std/cpuinfo` - [Data parallelism](#data-parallelism) - [Strided loops](#strided-loops) - [Complete list](#complete-list) + - [Root thread](#root-thread) + - [Weave worker thread](#weave-worker-thread) + - [Foreign thread & Background service (experimental)](#foreign-thread--background-service-experimental) - [Table of Contents](#table-of-contents) - [Platforms supported](#platforms-supported) - [C++ compilation](#c-compilation) diff --git a/changelog.md b/changelog.md index 2e42a1eb..5af8d9eb 100644 --- a/changelog.md +++ b/changelog.md @@ -5,6 +5,18 @@ #### Features - Added `isReady(Flowvar)` which will return true is `sync` would block on that Flowvar or if the result is actually immediately available. +- `syncScope:` to block until all tasks and their (recursive) + descendants are completed. +- Dataflow parallelism can now be used with the C++ target. +- Weave as a background service (experimental). + Weave can now be started on a dedicated thread + and handle **jobs** from any thread. + To do this, Weave can be started with `thr.runInBackground(Weave)`. + Job providing threads should call `setupSubmitterThread(Weave)`, + and can now use `submit function(args...)` and `waitFor(PendingResult)` + to have Weave work as a job system. + Jobs are handled in FIFO order. + Within a job, tasks can be spawned. ### v0.4.0 - April 2020 - "Bespoke" diff --git a/weave.nim b/weave.nim index 7050c394..f8c45389 100644 --- a/weave.nim +++ b/weave.nim @@ -12,7 +12,8 @@ import weave/state_machines/[sync_root, sync, sync_scope], weave/datatypes/flowvars, weave/cross_thread_com/pledges, - weave/contexts + weave/contexts, + weave/[executor, parallel_jobs] export Flowvar, Weave, @@ -28,4 +29,12 @@ export syncScope, # Experimental dataflow parallelism spawnDelayed, Pledge, - fulfill, newPledge + fulfill, newPledge, + # Experimental background service + Pending, + submit, submitDelayed, + runInBackground, waitUntilReady, + setupSubmitterThread, teardownSubmitterThread, + waitFor, isSubmitted, + processAllandTryPark, + runForever, runUntil diff --git a/weave.nimble b/weave.nimble index 3f06663b..2ffe7578 100644 --- a/weave.nimble +++ b/weave.nimble @@ -1,6 +1,6 @@ # Package -version = "0.4.0" +version = "0.4.9" author = "Mamy André-Ratsimbazafy" description = "a state-of-the-art ùultithreading runtime" license = "MIT or Apache License 2.0" @@ -45,11 +45,13 @@ task test, "Run Weave tests": test "", "weave/parallel_for.nim" test "", "weave/parallel_for_staged.nim" test "", "weave/parallel_reduce.nim" + test "", "weave/parallel_jobs.nim" test "-d:WV_LazyFlowvar", "weave/parallel_tasks.nim" test "-d:WV_LazyFlowvar", "weave/parallel_for.nim" test "-d:WV_LazyFlowvar", "weave/parallel_for_staged.nim" test "-d:WV_LazyFlowvar", "weave/parallel_reduce.nim" + test "-d:WV_LazyFlowvar", "weave/parallel_jobs.nim" test "", "benchmarks/dfs/weave_dfs.nim" test "", "benchmarks/fibonacci/weave_fib.nim" @@ -70,7 +72,7 @@ task test, "Run Weave tests": test "-d:WV_LazyFlowvar", "benchmarks/heat/weave_heat.nim" test "-d:WV_LazyFlowvar", "benchmarks/matrix_transposition/weave_transposes.nim" test "-d:WV_LazyFlowvar", "benchmarks/nqueens/weave_nqueens.nim" - when not defined(windows): + when not defined(windows): # Timer impl missing test "-d:WV_LazyFlowvar", "benchmarks/single_task_producer/weave_spc.nim" test "-d:WV_LazyFlowvar", "benchmarks/bouncing_producer_consumer/weave_bpc.nim" when defined(i386) or defined(amd64): @@ -107,11 +109,13 @@ task test_gc_arc, "Run Weave tests with --gc:arc": test "--gc:arc", "weave/parallel_for.nim" test "--gc:arc", "weave/parallel_for_staged.nim" test "--gc:arc", "weave/parallel_reduce.nim" + test "--gc:arc", "weave/parallel_jobs.nim" test "--gc:arc -d:WV_LazyFlowvar", "weave/parallel_tasks.nim" test "--gc:arc -d:WV_LazyFlowvar", "weave/parallel_for.nim" test "--gc:arc -d:WV_LazyFlowvar", "weave/parallel_for_staged.nim" test "--gc:arc -d:WV_LazyFlowvar", "weave/parallel_reduce.nim" + test "--gc:arc -d:WV_LazyFlowvar", "weave/parallel_jobs.nim" test "--gc:arc", "benchmarks/dfs/weave_dfs.nim" test "--gc:arc", "benchmarks/fibonacci/weave_fib.nim" @@ -132,7 +136,7 @@ task test_gc_arc, "Run Weave tests with --gc:arc": test "--gc:arc -d:WV_LazyFlowvar", "benchmarks/heat/weave_heat.nim" test "--gc:arc -d:WV_LazyFlowvar", "benchmarks/matrix_transposition/weave_transposes.nim" test "--gc:arc -d:WV_LazyFlowvar", "benchmarks/nqueens/weave_nqueens.nim" - when not defined(windows): + when not defined(windows): # Timer impl missing test "--gc:arc -d:WV_LazyFlowvar", "benchmarks/single_task_producer/weave_spc.nim" test "--gc:arc -d:WV_LazyFlowvar", "benchmarks/bouncing_producer_consumer/weave_bpc.nim" when defined(i386) or defined(amd64): diff --git a/weave/config.nim b/weave/config.nim index d277a72b..d2a874b6 100644 --- a/weave/config.nim +++ b/weave/config.nim @@ -7,6 +7,8 @@ import strutils +{.push gcsafe.} + # Platform support # ---------------------------------------------------------------------------------- @@ -100,6 +102,10 @@ template debugTermination*(body: untyped): untyped = when defined(WV_DebugTermination) or defined(WV_Debug): block: {.noSideEffect, gcsafe.}: body +template debugExecutor*(body: untyped): untyped = + when defined(WV_DebugExecutor) or defined(WV_Debug): + block: {.noSideEffect, gcsafe.}: body + template debug*(body: untyped): untyped = when defined(WV_Debug): block: {.noSideEffect, gcsafe.}: body diff --git a/weave/contexts.nim b/weave/contexts.nim index f8fdef4a..5ba4e0ff 100644 --- a/weave/contexts.nim +++ b/weave/contexts.nim @@ -18,16 +18,23 @@ when defined(WV_metrics): Backoff: import ./cross_thread_com/event_notifiers +{.push gcsafe.} + # Contexts # ---------------------------------------------------------------------------------- type Weave* = object var globalCtx*: GlobalContext -var localCtx* {.threadvar.}: TLContext +var workerContext* {.threadvar.}: WorkerContext + ## Worker context # TODO: tlsEmulation off by default on OSX and on by default on iOS? -const LeaderID*: WorkerID = 0 +var jobProviderContext* {.threadvar.}: JobProviderContext +var localThreadKind* {.threadvar.}: ThreadKind + +const RootID*: WorkerID = 0 +const ManagerID*: WorkerID = 0 # Profilers # ---------------------------------------------------------------------------------- @@ -49,16 +56,19 @@ template isRootTask*(task: Task): bool = task.parent.isNil template myTodoBoxes*: Persistack[WV_MaxConcurrentStealPerWorker, ChannelSpscSinglePtr[Task]] = - globalCtx.com.tasks[localCtx.worker.ID] + globalCtx.com.tasksStolen[workerContext.worker.ID] + +template managerJobQueue*: ChannelMpscUnboundedBatch[Job] = + globalCtx.manager.jobsIncoming[] template myThieves*: ChannelMpscUnboundedBatch[StealRequest] = - globalCtx.com.thefts[localCtx.worker.ID] + globalCtx.com.thefts[workerContext.worker.ID] template getThievesOf*(worker: WorkerID): ChannelMpscUnboundedBatch[StealRequest] = globalCtx.com.thefts[worker] template myMemPool*: TLPoolAllocator = - globalCtx.mempools[localCtx.worker.ID] + globalCtx.mempools[workerContext.worker.ID] template workforce*: int32 = globalCtx.numWorkers @@ -67,32 +77,32 @@ template maxID*: int32 = globalCtx.numWorkers - 1 template myID*: WorkerID = - localCtx.worker.ID + workerContext.worker.ID template myWorker*: Worker = - localCtx.worker + workerContext.worker template myTask*: Task = - localCtx.worker.currentTask + workerContext.worker.currentTask template myThefts*: Thefts = - localCtx.thefts + workerContext.thefts template myMetrics*: untyped = metrics: - localCtx.counters + workerContext.counters template mySyncScope*: ptr ScopedBarrier = - localCtx.worker.currentScope + workerContext.worker.currentScope Backoff: template myParking*: EventNotifier = - globalCtx.com.parking[localCtx.worker.ID] + globalCtx.com.parking[workerContext.worker.ID] template wakeup*(target: WorkerID) = mixin notify debugTermination: - log("Worker %2d: waking up child %2d\n", localCtx.worker.ID, target) + log("Worker %2d: waking up child %2d\n", workerContext.worker.ID, target) globalCtx.com.parking[target].notify() export event_notifiers.park, event_notifiers.prepareToPark, event_notifiers.initialize, event_notifiers.EventNotifier @@ -101,7 +111,7 @@ Backoff: # ---------------------------------------------------------------------------------- proc newTaskFromCache*(): Task = - result = localCtx.taskCache.pop() + result = workerContext.taskCache.pop() if result.isNil: result = myMemPool().borrow(deref(Task)) # Zeroing is expensive, it's 96 bytes @@ -168,32 +178,49 @@ proc fulfill*(pledge: Pledge, index: SomeInteger) = # Dynamic Scopes # ---------------------------------------------------------------------------------- -template Leader*(body: untyped) = - if localCtx.worker.ID == LeaderID: +template Root*(body: untyped) = + if workerContext.worker.ID == RootID: + body + +template workerIsManager*(): bool = + workerContext.worker.ID == ManagerID + +template manager*(): ManagerContext = + globalCtx.manager + +template Manager*(body: untyped) = + if workerIsManager: body template Worker*(body: untyped) = - if localCtx.worker.ID != LeaderID: + if workerContext.worker.ID != ManagerID: body +template onWeaveThread*(): bool = + localThreadKind == WorkerThread + +template onSubmitterThread*(): bool = + localThreadKind == SubmitterThread + + # Counters # ---------------------------------------------------------------------------------- template incCounter*(name: untyped{ident}, amount = 1) = bind name metrics: - # Assumes localCtx is in the calling context - localCtx.counters.name += amount + # Assumes workerContext is in the calling context + workerContext.counters.name += amount template decCounter*(name: untyped{ident}) = bind name metrics: - # Assumes localCtx is in the calling context - localCtx.counters.name -= 1 + # Assumes workerContext is in the calling context + workerContext.counters.name -= 1 proc workerMetrics*() = metrics: - Leader: + Root: c_printf("\n") c_printf("+========================================+\n") c_printf("| Per-worker statistics |\n") @@ -202,25 +229,25 @@ proc workerMetrics*() = discard globalCtx.barrier.wait() - c_printf("Worker %2d: %u steal requests sent\n", myID(), localCtx.counters.stealSent) - c_printf("Worker %2d: %u steal requests handled\n", myID(), localCtx.counters.stealHandled) - c_printf("Worker %2d: %u steal requests declined\n", myID(), localCtx.counters.stealDeclined) - c_printf("Worker %2d: %u tasks executed\n", myID(), localCtx.counters.tasksExec) - c_printf("Worker %2d: %u tasks sent\n", myID(), localCtx.counters.tasksSent) - c_printf("Worker %2d: %u loops split\n", myID(), localCtx.counters.loopsSplit) - c_printf("Worker %2d: %u loops iterations executed\n", myID(), localCtx.counters.loopsIterExec) + c_printf("Worker %2d: %u steal requests sent\n", myID(), workerContext.counters.stealSent) + c_printf("Worker %2d: %u steal requests handled\n", myID(), workerContext.counters.stealHandled) + c_printf("Worker %2d: %u steal requests declined\n", myID(), workerContext.counters.stealDeclined) + c_printf("Worker %2d: %u tasks executed\n", myID(), workerContext.counters.tasksExec) + c_printf("Worker %2d: %u tasks sent\n", myID(), workerContext.counters.tasksSent) + c_printf("Worker %2d: %u loops split\n", myID(), workerContext.counters.loopsSplit) + c_printf("Worker %2d: %u loops iterations executed\n", myID(), workerContext.counters.loopsIterExec) StealAdaptative: - ascertain: localCtx.counters.stealOne + localCtx.counters.stealHalf == localCtx.counters.stealSent - if localCtx.counters.stealSent != 0: + ascertain: workerContext.counters.stealOne + workerContext.counters.stealHalf == workerContext.counters.stealSent + if workerContext.counters.stealSent != 0: c_printf("Worker %2d: %.2f %% steal-one\n", myID(), - localCtx.counters.stealOne.float64 / localCtx.counters.stealSent.float64 * 100) + workerContext.counters.stealOne.float64 / workerContext.counters.stealSent.float64 * 100) c_printf("Worker %2d: %.2f %% steal-half\n", myID(), - localCtx.counters.stealHalf.float64 / localCtx.counters.stealSent.float64 * 100) + workerContext.counters.stealHalf.float64 / workerContext.counters.stealSent.float64 * 100) else: c_printf("Worker %2d: %.2f %% steal-one\n", myID(), 0) c_printf("Worker %2d: %.2f %% steal-half\n", myID(), 0) LazyFV: - c_printf("Worker %2d: %u futures converted\n", myID(), localCtx.counters.futuresConverted) + c_printf("Worker %2d: %u futures converted\n", myID(), workerContext.counters.futuresConverted) profile_results(myID()) flushFile(stdout) diff --git a/weave/datatypes/context_global.nim b/weave/datatypes/context_global.nim index a4d4103f..15750cc6 100644 --- a/weave/datatypes/context_global.nim +++ b/weave/datatypes/context_global.nim @@ -6,6 +6,7 @@ # at your option. This file may not be copied, modified, or distributed except according to those terms. import + std/atomics, ../cross_thread_com/channels_mpsc_unbounded_batch, ../cross_thread_com/channels_spsc_single_ptr, ../memory/[persistacks, memory_pools], @@ -37,7 +38,7 @@ type # Theft channels are bounded to "NumWorkers * WV_MaxConcurrentStealPerWorker" thefts*: ptr UncheckedArray[ChannelMpscUnboundedBatch[StealRequest]] - tasks*: ptr UncheckedArray[Persistack[WV_MaxConcurrentStealPerWorker, ChannelSpscSinglePtr[Task]]] + tasksStolen*: ptr UncheckedArray[Persistack[WV_MaxConcurrentStealPerWorker, ChannelSpscSinglePtr[Task]]] when static(WV_Backoff): parking*: ptr UncheckedArray[EventNotifier] @@ -45,7 +46,26 @@ type com*: ComChannels threadpool*: ptr UncheckedArray[Thread[WorkerID]] numWorkers*: int32 - barrier*: SyncBarrier mempools*: ptr UncheckedArray[TlPoolAllocator] + barrier*: SyncBarrier + ## Barrier for initialization and teardown + manager*: ManagerContext - # TODO track workers per socket / NUMA domain + ManagerContext* = object + ## Manager context + ## in charge of distributing incoming jobs. + ## The root thread is the Manager. + # + # In the future the Manager will also handle + # synchronization across machines in a distributed setting. + # + # We may also have a manager per socket/NUMA domain. + # + # It may become a thread dedicated to supervision, synchronization + # and job handling. + jobsIncoming*: ptr ChannelMpscUnboundedBatch[Job] + when static(WV_Backoff): + jobNotifier*: ptr EventNotifier + ## When Weave works as a dedicated execution engine + ## we need to park it when there is no CPU tasks. + acceptsJobs*: Atomic[bool] diff --git a/weave/datatypes/context_thread_local.nim b/weave/datatypes/context_thread_local.nim index f6dfdcfb..9eb74056 100644 --- a/weave/datatypes/context_thread_local.nim +++ b/weave/datatypes/context_thread_local.nim @@ -8,7 +8,7 @@ import ./bounded_queues, ./sync_types, ./prell_deques, ./binary_worker_trees, ../config, - ../memory/[lookaside_lists, persistacks, allocs], + ../memory/[lookaside_lists, persistacks, allocs, memory_pools], ../instrumentation/contracts, ../random/rng, ../cross_thread_com/scoped_barriers @@ -69,13 +69,13 @@ type recentTasks*: int32 recentThefts*: int32 - TLContext* = object - ## Thread-Local context + WorkerContext* = object + ## Thread-Local context for Weave workers worker*: Worker thefts*: Thefts taskCache*: LookAsideList[Task] stealCache*: Persistack[WV_MaxConcurrentStealPerWorker, deref(StealRequest)] - # Leader thread only - Whole runtime is quiescent + # Root thread only - Whole runtime is quiescent runtimeIsQuiescent*: bool signaledTerminate*: bool when defined(WV_Metrics): @@ -99,6 +99,16 @@ type when defined(WV_LazyFlowvar): futuresConverted*: int + JobProviderContext* = object + ## Thread-local context for non-Weave threads + ## to allow them to submit jobs to Weave. + mempool*: ptr TLPoolAllocator # TODO: have the main thread takeover the mempool on thread destruction + + ThreadKind* = enum + Unknown + WorkerThread + SubmitterThread + # Worker proc # ---------------------------------------------------------------------------------- diff --git a/weave/datatypes/flowvars.nim b/weave/datatypes/flowvars.nim index 9b16aaa2..58da5ef2 100644 --- a/weave/datatypes/flowvars.nim +++ b/weave/datatypes/flowvars.nim @@ -9,7 +9,10 @@ import ../cross_thread_com/channels_spsc_single, ../memory/[allocs, memory_pools], ../instrumentation/contracts, - ../config, ../contexts + ../config, ../contexts, + std/os + +{.push gcsafe.} type LazyChannel* {.union.} = object @@ -25,7 +28,7 @@ type lazy*: LazyChannel Flowvar*[T] = object - ## A Flowvar is a simple channel + ## A Flowvar is a placeholder for a future result that may be computed in parallel # Flowvar are optimized when containing a ptr type. # They take less size in memory by testing isNil # instead of having an extra atomic bool @@ -59,10 +62,10 @@ type else: chan*: ptr ChannelSPSCSingle -func isSpawned*(fv: Flowvar): bool {.inline.}= - ## Returns true if a future is spawned +func isSpawned*(fv: Flowvar): bool {.inline.} = + ## Returns true if a flowvar is spawned ## This may be useful for recursive algorithms that - ## may or may not spawn a future depending on a condition. + ## may or may not spawn a flowvar depending on a condition. ## This is similar to Option or Maybe types when defined(WV_LazyFlowVar): return not fv.lfv.isNil @@ -93,6 +96,10 @@ EagerFV: ## until the Flowvar is ready. not fv.chan[].isEmpty() + proc cleanup*(fv: Flowvar) {.inline.} = + ## Cleanup after forcing a future + recycleChannel(fv) + LazyFV: proc recycleChannel*(fv: Flowvar) {.inline.} = recycle(fv.lfv.lazy.chan) @@ -153,6 +160,17 @@ LazyFV: convertLazyFlowvar(task) task = task.next + proc cleanup*[T](fv: Flowvar[T]) {.inline.} = + ## Cleanup after forcing a future + if not fv.lfv.hasChannel: + ascertain: fv.lfv.isReady + else: + ascertain: not fv.lfv.lazy.chan.isNil + recycleChannel(fv) + +# Reductions +# ---------------------------------------------------- + proc newFlowvarNode*(itemSize: uint8): FlowvarNode = ## Create a linked list of flowvars # Lazy flowvars unfortunately are allocated on the heap @@ -178,6 +196,54 @@ proc recycleFVN*(fvNode: sink FlowvarNode) {.inline.} = recycle(fvNode.lfv) recycle(fvNode) - # TODO destructors for automatic management # of the user-visible flowvars + +# Foreign threads interop +# ---------------------------------------------------- + +type Pending*[T] = object + ## A Pending[T] is a placeholder for the + ## future result of type T for a job + ## submitted to Weave for parallel execution. + # For implementation this is just a distinct type + # from Flowvars to ensure proper usage + fv: Flowvar[T] + +func isSubmitted*[T](p: Pending[T]): bool {.inline.} = + ## Returns true if a job has been submitted and we have a result pending + ## This may be useful for recursive algorithms that + ## may or may not submit a job depending on a condition. + ## This is similar to Option or Maybe types + p.fv.isSpawned + +template newPending*(pool: var TLPoolAllocator, T: typedesc): Pending[T] = + Pending[T](fv: newFlowVar(pool, T)) + +func isReady*[T](p: Pending[T]): bool {.inline.} = + ## Returns true if the pending result is ready. + ## In that case `settle` will not block. + ## Otherwise the current thread will block. + p.fv.isReady + +func readyWith*[T](p: Pending[T], childResult: T) {.inline.} = + ## Sends the Pending result from the child thread processing the task + ## to its parent thread. + p.fv.readyWith(childResult) + +proc waitFor*[T](p: Pending[T]): T {.inline.} = + ## Wait for a pending value + ## This blocks the thread until the value is ready + ## and then returns it. + preCondition: onSubmitterThread + + var backoff = 1 + while not p.isReady: + sleep(backoff) + backoff *= 2 + if backoff > 16: + backoff = 16 + + let ok = p.fv.tryComplete(result) + ascertain: ok + cleanup(p.fv) diff --git a/weave/datatypes/sync_types.nim b/weave/datatypes/sync_types.nim index 3735b8f3..b7818dbb 100644 --- a/weave/datatypes/sync_types.nim +++ b/weave/datatypes/sync_types.nim @@ -53,12 +53,45 @@ type # We align to avoid torn reads/extra bookkeeping. data*{.align:sizeof(int).}: array[TaskDataSize, byte] + Job* = ptr object + ## Job + ## Represents a deferred computation that can be passed around threads. + ## The fields "prev" and "next" can be used + ## for intrusive containers + # We save memory by using int32 instead of int on select properties + # order field by size to optimize zero initialization (bottleneck on recursive algorithm) + + # The same as a task except for next being Atomic + fn*: proc (param: pointer) {.nimcall, gcsafe.} + parent*: pointer # Jobs will have for parent their JobProviderContext + prev*: Task + next*: Atomic[pointer] # For MPSC queue + # 32 bytes + start*: int + cur*: int + stop*: int + stride*: int + # 64 bytes + scopedBarrier*: ptr ScopedBarrier + futures*: pointer # LinkedList of futures required by the current task + futureSize*: uint8 # Size of the future result type if relevant + hasFuture*: bool # If a task is associated with a future, the future is stored at data[0] + isLoop*: bool + isInitialIter*: bool # Awaitable for-loops return true for the initial iter + when FirstVictim == LastVictim: + victim*: WorkerID + # 84 bytes (or 88 with FirstVictim = LastVictim) + # User data - including the FlowVar channel to send back result. + # It is very likely that User data contains a pointer (the Flowvar channel) + # We align to avoid torn reads/extra bookkeeping. + data*{.align:sizeof(int).}: array[TaskDataSize, byte] + # Steal requests # ---------------------------------------------------------------------------------- StealRequest* = ptr object # TODO: Remove workaround generic atomics bug: https://github.com/nim-lang/Nim/issues/12695 - next*{.align:WV_CacheLinePadding.}: Atomic[pointer] # For intrusive lists and queues + next*{.align:WV_CacheLinePadding.}: Atomic[pointer] # For intrusive lists and queues thiefAddr*: ptr ChannelSpscSinglePtr[Task] # Channel for sending tasks back to the thief thiefID*: WorkerID retry*: int32 # 0 <= retry <= num_workers diff --git a/weave/executor.nim b/weave/executor.nim new file mode 100644 index 00000000..b7eda46e --- /dev/null +++ b/weave/executor.nim @@ -0,0 +1,173 @@ +# Weave +# Copyright (c) 2019 Mamy André-Ratsimbazafy +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +# Executor +# ---------------------------------------------- +# +# Weave can be used in "Executor-mode". +# In that case jobs are submitted from threads that are foreign to Weave +# +# The hard part is combining all the following: +# - Ensuring that the worker that receive the new job is awake. +# - Ensure no race condition with awakening and termination detection. +# - Limiting contention due to all workers writing to the same data structure. +# - Limiting latency between scheduling and running the job. +# - Greedy scheduler: as long as there is enough work, no worker is idle. +# +# +# 1. Weave termination detection is distributed, based on combining-tree +# or what is called "Lifeline-based Global Load Balancing" +# - http://www.cs.columbia.edu/~martha/courses/4130/au12/p201-saraswat.pdf +# i.e. each worker has a parent and a parent cannot sleep before its children. +# This means that submitting job to a random Worker in the threadpool +# requires ensuring that the Worker is awake to avoid termination detection. +# +# 2. A global job queue will always have job distributed to awake workers. +# It also ensures minimum latency. +# But the queue becomes a contention and scalability bottleneck. +# Also lock-free intrusive MPMC queues are very hard to write. +# +# 3. We choose to create a ManagerContext which is in charge of +# distributing incoming jobs. +# The manager is the root thread, it is always awake on job submission via an EventNotifier +# The main issue is latency to distribute the jobs if the root thread is +# on a long-running task with few loadBalancing occasions +# but this already existed before the executor mode +# when all tasks where created on Weave's root thread. +# +# The ManagerContext can be extended further to support distributed computing +# with Weave instances on muliple nodes of a cluster. +# The manager thread can become a dedicated separate thread +# if communication costs and jobs latnecy are high enough to justify it in the future. + +import + # Standard library + macros, typetraits, atomics, os, + # Internal + ./memory/[allocs, memory_pools], + ./contexts, ./config, ./runtime, + ./datatypes/[sync_types, context_thread_local], + ./instrumentation/[contracts, loggers], + ./cross_thread_com/[scoped_barriers, pledges, channels_mpsc_unbounded_batch, event_notifiers], + ./state_machines/sync_root + +{.push gcsafe, inline.} # TODO raises: [] + +proc waitUntilReady*(_: typedesc[Weave]) = + ## Wait until Weave is ready to accept jobs + ## This blocks the thread until the Weave runtime (on another thread) is fully initialized + # We use a simple exponential backoff for waiting. + var backoff = 1 + while not globalCtx.manager.acceptsJobs.load(moRelaxed): + sleep(backoff) + backoff *= 2 + if backoff > 16: + backoff = 16 + +proc setupSubmitterThread*(_: typedesc[Weave]) = + ## Configure a thread so that it can submit jobs to the Weave runtime. + ## This is useful if we want Weave to work + ## as an independent "service" or "execution engine" + ## and still being able to offload computation + ## to it instead of mixing + ## logic or IO and Weave on the main thread. + ## + ## This will block until Weave is ready to accet jobs + preCondition: localThreadKind == Unknown + + jobProviderContext.mempool = wv_alloc(TLPoolAllocator) + jobProviderContext.mempool[].initialize() + + localThreadKind = SubmitterThread + +proc teardownSubmitterThread*(_: typedesc[Weave]) = + ## Maintenance before exiting a job submitter thread + + # TODO: Have the main thread takeover the mempool if it couldn't be fully released + let fullyReleased {.used.} = jobProviderContext.mempool.teardown() + localThreadKind = Unknown + +proc processAllandTryPark*(_: typedesc[Weave]) = + ## Process all tasks and then try parking the weave runtime + ## This `syncRoot` then put the Weave runtime to sleep + ## if no job submission was received concurrently + ## + ## This should be used if Weave root thread (that called init(Weave)) + ## is on a dedicated long-running thread + ## in an event loop: + ## + ## while true: + ## park(Weave) + ## + ## New job submissions will automatically wakeup the runtime + + manager.jobNotifier[].prepareToPark() + syncRoot(Weave) + debugTermination: log("Parking Weave runtime\n") + manager.jobNotifier[].park() + debugTermination: log("Waking Weave runtime\n") + +proc wakeup(_: typedesc[Weave]) = + ## Wakeup the runtime manager if asleep + manager.jobNotifier[].notify() + +proc runForever*(_: typedesc[Weave]) = + ## Start a never-ending event loop on the current thread + ## that wakes-up on job submission, handles multithreaded load balancing, + ## help process tasks + ## and spin down when there is no work anymore. + while true: + processAllandTryPark(Weave) + +# TODO: "not nil" +proc runUntil*(_: typedesc[Weave], signal: ptr Atomic[bool]) = + ## Start a Weave event loop until signal is true on the current thread. + ## It wakes-up on job submission, handles multithreaded load balancing, + ## help process tasks + ## and spin down when there is no work anymore. + preCondition: not signal.isNil + while not signal[].load(moRelaxed): + processAllandTryPark(Weave) + syncRoot(Weave) + +proc runInBackground*( + thr: var Thread[ptr Atomic[bool]], + _: typedesc[Weave], + signalShutdown: ptr Atomic[bool] + ) = + ## Start the Weave runtime on a background thread. + ## It wakes-up on job submissions, handles multithreaded load balancing, + ## help process tasks + ## and spin down when there is no work anymore. + proc eventLoop(shutdown: ptr Atomic[bool]) {.thread.} = + init(Weave) + Weave.runUntil(shutdown) + exit(Weave) + {.gcsafe.}: # Workaround regression - https://github.com/nim-lang/Nim/issues/14370 + thr.createThread(eventLoop, signalShutdown) + +proc runInBackground*(thr: var Thread[void], _: typedesc[Weave]) = + ## Start the Weave runtime on a background thread. + ## It wakes-up on job submissions, handles multithreaded load balancing, + ## help process tasks + ## and spin down when there is no work anymore. + proc eventLoop() {.thread.} = + init(Weave) + Weave.runForever() + {.gcsafe.}: # Workaround regression - https://github.com/nim-lang/Nim/issues/14370 + thr.createThread(eventLoop) + +proc submitJob*(job: sink Job) = + ## Submit a serialized job to a worker at random + preCondition: not jobProviderContext.mempool.isNil + preCondition: globalCtx.manager.acceptsJobs.load(moRelaxed) + + let sent {.used.} = managerJobQueue.trySend job + wakeup(Weave) + debugTermination: + log("Thread %d: sent job to Weave runtime and woke it up.\n", getThreadID()) + postCondition: sent diff --git a/weave/loop_splitting.nim b/weave/loop_splitting.nim index bae002db..c70e4420 100644 --- a/weave/loop_splitting.nim +++ b/weave/loop_splitting.nim @@ -10,6 +10,8 @@ import ./datatypes/sync_types, ./instrumentation/[contracts, loggers] +{.push gcsafe.} + # Loop splitting # ---------------------------------------------------------------------------------- diff --git a/weave/parallel_jobs.nim b/weave/parallel_jobs.nim new file mode 100644 index 00000000..4462326b --- /dev/null +++ b/weave/parallel_jobs.nim @@ -0,0 +1,400 @@ +# Weave +# Copyright (c) 2019 Mamy André-Ratsimbazafy +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +# A job is processing request submitted from outside a Weave worker thread +# They are scheduled in FIFO order and minimize latency for submitters. +# In particular, it is optimized for jobs assumed independent. + +# Job submission +# ---------------------------------------------------------------------------------- + +import + # Standard library + macros, typetraits, atomics, os, + # Internal + ./memory/[allocs, memory_pools], + ./scheduler, ./contexts, + ./datatypes/[flowvars, sync_types], + ./instrumentation/contracts, + ./cross_thread_com/[pledges, channels_mpsc_unbounded_batch, event_notifiers], + ./state_machines/sync_root, + ./executor + +proc submitImpl(pledges: NimNode, funcCall: NimNode): NimNode = + # We take typed argument so that overloading resolution + # is already done and arguments are semchecked + funcCall.expectKind(nnkCall) + result = newStmtList() + result.add quote do: + preCondition: onSubmitterThread() + + # Get the return type if any + let retType = funcCall[0].getImpl[3][0] + let needFuture = retType.kind != nnkEmpty + + # Get a serialized type and data for all function arguments + # We use adhoc tuple + var argsTy = nnkPar.newTree() + var args = nnkPar.newTree() + for i in 1 ..< funcCall.len: + argsTy.add getTypeInst(funcCall[i]) + args.add funcCall[i] + + # Check that the type is safely serializable + # TODO: we need to check the return type as well + # so we can merge both future and no future code path + let fn = funcCall[0] + let fnName = $fn + let withArgs = args.len > 0 + if withArgs: + result.add quote do: + static: + # assert supportsCopyMem(`argsTy`), "\n\n" & `fnName` & + # " has arguments managed by GC (ref/seq/strings),\n" & + # " they cannot be distributed across threads.\n" & + # " Argument types: " & $`argsTy` & "\n\n" + + assert sizeof(`argsTy`) <= TaskDataSize, "\n\n" & `fnName` & + " has arguments that do not fit in the async data buffer.\n" & + " Argument types: " & `argsTy`.name & "\n" & + " Current size: " & $sizeof(`argsTy`) & "\n" & + " Maximum size allowed: " & $TaskDataSize & "\n\n" + + # Create the async function + let async_fn = ident("async_" & fnName) + var fnCall = newCall(fn) + let data = ident("data") # typed pointer to data + + # Submit immediately or delay on dependencies + var submitBlock: NimNode + let job = ident"job" + if pledges.isNil: + submitBlock = newCall(bindSym"submitJob", job) + elif pledges.len == 1: + let pledgeDesc = pledges[0] + if pledgeDesc.kind in {nnkIdent, nnkSym}: + submitBlock = quote do: + if not delayedUntil(cast[Task](`job`), `pledgeDesc`, jobProviderContext.mempool[]): + submitJob(`job`) + else: + pledgeDesc.expectKind({nnkPar, nnkTupleConstr}) + let pledge = pledgeDesc[0] + let pledgeIndex = pledgeDesc[1] + submitBlock = quote do: + if not delayedUntil(cast[Task](`job`), `pledge`, int32(`pledgeIndex`), myMemPool()): + submitJob(`job`) + else: + let delayedMulti = getAst( + delayedUntilMulti( + nnkCast.newTree(bindSym"Task", job), + nnkDerefExpr.newTree( + nnkDotExpr.newTree(bindSym"jobProviderContext", ident"mempool") + ), + pledges + ) + ) + submitBlock = quote do: + if not `delayedMulti`: + submitJob(`job`) + + if not needFuture: # TODO: allow awaiting on a Pending[void] + if funcCall.len == 2: + # With only 1 arg, the tuple syntax doesn't construct a tuple + # let data = (123) # is an int + fnCall.add nnkDerefExpr.newTree(data) + else: # This handles the 0 arg case as well + for i in 1 ..< funcCall.len: + fnCall.add nnkBracketExpr.newTree( + data, + newLit i-1 + ) + + # Create the async call + result.add quote do: + proc `async_fn`(param: pointer) {.nimcall.} = + preCondition: not isRootTask(myTask()) + + when bool(`withArgs`): + let `data` = cast[ptr `argsTy`](param) # TODO - restrict + `fnCall` + # Create the task + result.add quote do: + when defined(WV_profile): + # TODO - add timers for jobs + discard timer_start(timer_enq_deq_job) + block enq_deq_job: + let `job` = jobProviderContext.mempool[].borrow(deref(Job)) + `job`.parent = jobProviderContext.addr # By convention, we set the parent to the JobProvider address + `job`.fn = `async_fn` + # registerDescendant(mySyncScope()) # TODO: does it make sense? + # `task`.scopedBarrier = mySyncScope() + when bool(`withArgs`): + cast[ptr `argsTy`](`job`.data.addr)[] = `args` + `submitBlock` + + manager.jobNotifier[].notify() # Wake up the runtime + when defined(WV_profile): + timer_stop(timer_enq_deq_job) + + else: ################ Need a future + # We repack fut + args. + let fut = ident("fut") + + # data[0] will be the future. + + var futArgs = nnkPar.newTree + var futArgsTy = nnkPar.newTree + futArgs.add fut + futArgsTy.add nnkBracketExpr.newTree( + bindSym"Pending", + retType + ) + for i in 1 ..< funcCall.len: + futArgsTy.add getTypeInst(funcCall[i]) + futArgs.add funcCall[i] + + for i in 1 ..< funcCall.len: + fnCall.add nnkBracketExpr.newTree( + data, + newLit i + ) + + result.add quote do: + proc `async_fn`(param: pointer) {.nimcall.} = + preCondition: not isRootTask(myTask()) + + let `data` = cast[ptr `futArgsTy`](param) # TODO - restrict + let res = `fnCall` + when typeof(`data`[]) is Pending: + readyWith(`data`[], res) + else: + readyWith(`data`[0], res) + + # Create the task + let freshIdent = ident($retType) + result.add quote do: + when defined(WV_profile): + # TODO profiling templates visibility issue + discard timer_start(timer_enq_deq_job) + block enq_deq_task: + let `job` = jobProviderContext.mempool[].borrow(deref(Job)) + `job`.parent = jobProviderContext.addr # By convention, we set the parent to the JobProvider address + `job`.fn = `async_fn` + # registerDescendant(mySyncScope()) # TODO: does it make sense? + # `task`.scopedBarrier = mySyncScope() + `job`.has_future = true + `job`.futureSize = uint8(sizeof(`retType`)) + let `fut` = newPending(jobProviderContext.mempool[], `freshIdent`) + cast[ptr `futArgsTy`](`job`.data.addr)[] = `futArgs` + `submitBlock` + + manager.jobNotifier[].notify() # Wake up the runtime + when defined(WV_profile): + discard timer_stop(timer_enq_deq_job) + # Return the future + `fut` + + # Wrap in a block for namespacing + result = nnkBlockStmt.newTree(newEmptyNode(), result) + # echo result.toStrLit + +macro submit*(fnCall: typed): untyped = + ## Submit the input function call asynchronously to the Weave runtime. + ## + ## This is a compatibility routine for foreign threads. + ## `setupSubmitterThread` MUST be called on the submitter thread beforehand + ## + ## This procedure is intended for interoperability with long-running threads + ## started with `createThread` + ## and other threadpools and/or execution engines, + ## use `spawn` otherwise. + ## + ## If the function calls returns a result, submit will wrap it in a Pending[T]. + ## You can use `waitFor` to block the current thread and extract the asynchronous result from the Pending[T]. + ## You can use `isReady` to check if result is available and if subsequent + ## `waitFor` calls would block or return immediately. + ## + ## `submit` returns immediately. + ## + ## Jobs are processed approximately in First-In-First-Out (FIFO) order. + result = submitImpl(nil, fnCall) + +macro submitDelayed*(pledges: varargs[typed], fnCall: typed): untyped = + ## Submit the input function call asynchronously to the Weave runtime. + ## The function call will only be scheduled when the pledge is fulfilled. + ## + ## This is a compatibility routine for foreign threads. + ## `setupSubmitterThread` MUST be called on the submitter thread beforehand + ## + ## This procedure is intended for interoperability with long-running threads + ## started with `createThread` + ## and other threadpools and/or execution engines, + ## use `spawn` otherwise. + ## + ## If the function calls returns a result, submit will wrap it in a Pending[T]. + ## You can use `settle` to block the current thread and extract the asynchronous result from the Pending[T]. + ## You can use `isReady` to check if result is available and if subsequent + ## `settle` calls would block or return immediately. + ## + ## Ensure that before settling on the Pending[T] of a delayed submit, its pledge can be fulfilled or you will deadlock. + result = submitImpl(pledges, fnCall) + +# Sanity checks +# -------------------------------------------------------- + +when isMainModule: + import + ./runtime, ./state_machines/[sync, sync_root], + ./parallel_tasks, + std/os + + var shutdownWeave, serviceDone: Atomic[bool] + shutdownWeave.store(false, moRelaxed) + serviceDone.store(false, moRelaxed) + + var executorThread: Thread[ptr Atomic[bool]] + executorThread.runInBackground(Weave, shutdownWeave.addr) + + block: # Have an independant display service submit jobs to Weave + serviceDone.store(false, moRelaxed) + + proc display_int(x: int): bool = + stdout.write(x) + stdout.write(" - SUCCESS\n") + + return true + + proc displayService(serviceDone: ptr Atomic[bool]) = + setupSubmitterThread(Weave) + waitUntilReady(Weave) + + echo "Sanity check 1: Printing 123456 654321 in parallel" + discard submit display_int(123456) + let ok = submit display_int(654321) + + discard waitFor(ok) + serviceDone[].store(true, moRelaxed) + + var t: Thread[ptr Atomic[bool]] + t.createThread(displayService, serviceDone.addr) + joinThread(t) + + block: # Job that spawns tasks + serviceDone.store(false, moRelaxed) + + proc async_fib(n: int): int = + + if n < 2: + return n + + let x = spawn async_fib(n-1) + let y = async_fib(n-2) + + result = sync(x) + y + + proc fibonacciService(serviceDone: ptr Atomic[bool]) = + setupSubmitterThread(Weave) + waitUntilReady(Weave) + + echo "Sanity check 2: fib(20)" + let f = submit async_fib(20) + + echo waitFor(f) + serviceDone[].store(true, moRelaxed) + + var t: Thread[ptr Atomic[bool]] + t.createThread(fibonacciService, serviceDone.addr) + joinThread(t) + + block: # Delayed computation + serviceDone.store(false, moRelaxed) + + proc echoA(pA: Pledge) = + echo "Display A, sleep 1s, create parallel streams 1 and 2" + sleep(1000) + pA.fulfill() + + proc echoB1(pB1: Pledge) = + echo "Display B1, sleep 1s" + sleep(1000) + pB1.fulfill() + + proc echoB2() = + echo "Display B2, exit stream" + + proc echoC1(): bool = + echo "Display C1, exit stream" + + proc echoService(serviceDone: ptr Atomic[bool]) = + setupSubmitterThread(Weave) + waitUntilReady(Weave) + + echo "Sanity check 3: Dataflow parallelism" + let pA = newPledge() + let pB1 = newPledge() + let done = submitDelayed(pB1, echoC1()) + submitDelayed pA, echoB2() + submitDelayed pA, echoB1(pB1) + submit echoA(pA) + + discard waitFor(done) + serviceDone[].store(true, moRelaxed) + + var t: Thread[ptr Atomic[bool]] + t.createThread(echoService, serviceDone.addr) + joinThread(t) + + block: # Delayed computation with multiple dependencies + serviceDone.store(false, moRelaxed) + + proc echoA(pA: Pledge) = + echo "Display A, sleep 1s, create parallel streams 1 and 2" + sleep(1000) + pA.fulfill() + + proc echoB1(pB1: Pledge) = + echo "Display B1, sleep 1s" + sleep(1000) + pB1.fulfill() + + proc echoB2(pB2: Pledge) = + echo "Display B2, no sleep" + pB2.fulfill() + + proc echoC12(): bool = + echo "Display C12, exit stream" + return true + + proc echoService(serviceDone: ptr Atomic[bool]) = + setupSubmitterThread(Weave) + waitUntilReady(Weave) + + echo "Sanity check 4: Dataflow parallelism with multiple dependencies" + let pA = newPledge() + let pB1 = newPledge() + let pB2 = newPledge() + let done = submitDelayed(pB1, pB2, echoC12()) + submitDelayed pA, echoB2(pB2) + submitDelayed pA, echoB1(pB1) + submit echoA(pA) + + discard waitFor(done) + serviceDone[].store(true, moRelaxed) + + var t: Thread[ptr Atomic[bool]] + t.createThread(echoService, serviceDone.addr) + joinThread(t) + + # Wait until all tests are done + var backoff = 1 + while not serviceDone.load(moRelaxed): + sleep(backoff) + backoff *= 2 + if backoff > 16: + backoff = 16 + + shutdownWeave.store(true) diff --git a/weave/parallel_tasks.nim b/weave/parallel_tasks.nim index 6c78335e..8c73ca95 100644 --- a/weave/parallel_tasks.nim +++ b/weave/parallel_tasks.nim @@ -5,6 +5,12 @@ # * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0). # at your option. This file may not be copied, modified, or distributed except according to those terms. +# A task is a processing request emitted from a Weave worker thread. +# They are scheduled in LIFO order and maximize throughput of the runtime. +# In particular, it is optimized for fork-join parallelism +# where the oldest tasks spawned the recent ones and cannot complete +# without the recent tasks being complete. + # Async/await spawn/sync for compute bound tasks # ---------------------------------------------------------- @@ -23,6 +29,8 @@ proc spawnImpl(pledges: NimNode, funcCall: NimNode): NimNode = # is already done and arguments are semchecked funcCall.expectKind(nnkCall) result = newStmtList() + result.add quote do: + preCondition: onWeaveThread() # Get the return type if any let retType = funcCall[0].getImpl[3][0] @@ -153,7 +161,10 @@ proc spawnImpl(pledges: NimNode, funcCall: NimNode): NimNode = let `data` = cast[ptr `futArgsTy`](param) # TODO - restrict let res = `fnCall` - readyWith(`data`[0], res) + when typeof(`data`[]) is Flowvar: + readyWith(`data`[], res) + else: + readyWith(`data`[0], res) # Create the task let freshIdent = ident($retType) @@ -183,9 +194,17 @@ proc spawnImpl(pledges: NimNode, funcCall: NimNode): NimNode = macro spawn*(fnCall: typed): untyped = ## Spawns the input function call asynchronously, potentially on another thread of execution. + ## + ## To offload computation from a thread started with `createdThread` + ## (i.e. foreign to the Weave runtime) + ## use `setupSubmitterThread` + `submit` instead. + ## ## If the function calls returns a result, spawn will wrap it in a Flowvar. - ## You can use sync to block the current thread and extract the asynchronous result from the flowvar. - ## Spawn returns immediately. + ## You can use `sync` to block the current thread and extract the asynchronous result from the flowvar. + ## You can use `isReady` to check if result is available and if subsequent + ## `spawn` returns immediately. + ## + ## Tasks are processed approximately in Last-In-First-Out (LIFO) order result = spawnImpl(nil, fnCall) macro spawnDelayed*(pledges: varargs[typed], fnCall: typed): untyped = @@ -312,18 +331,20 @@ when isMainModule: proc echoB2() = echo "Display B2, exit stream" - proc echoC1() = + proc echoC1(): bool = echo "Display C1, exit stream" + return true proc main() = echo "Sanity check 3: Dataflow parallelism" init(Weave) let pA = newPledge() let pB1 = newPledge() - spawnDelayed pB1, echoC1() + let done = spawnDelayed(pB1, echoC1()) spawnDelayed pA, echoB2() spawnDelayed pA, echoB1(pB1) spawn echoA(pA) + discard sync(done) exit(Weave) main() diff --git a/weave/runtime.nim b/weave/runtime.nim index a9ffc8dd..93ab2a5f 100644 --- a/weave/runtime.nim +++ b/weave/runtime.nim @@ -7,7 +7,7 @@ import # Standard library - os, cpuinfo, strutils, + os, cpuinfo, strutils, atomics, # Internal ./instrumentation/[contracts, loggers], ./contexts, ./config, @@ -24,11 +24,14 @@ when defined(windows): else: import ./primitives/affinity_posix +{.push gcsafe.} + # Runtime public routines # ---------------------------------------------------------------------------------- proc init*(_: type Weave) = # TODO detect Hyper-Threading and NUMA domain + manager.acceptsJobs.store(false, moRelaxed) if existsEnv"WEAVE_NUM_THREADS": workforce() = getEnv"WEAVE_NUM_THREADS".parseInt.int32 @@ -43,12 +46,13 @@ proc init*(_: type Weave) = globalCtx.mempools = wv_alloc(TLPoolAllocator, workforce()) globalCtx.threadpool = wv_alloc(Thread[WorkerID], workforce()) globalCtx.com.thefts = wv_alloc(ChannelMpscUnboundedBatch[StealRequest], workforce()) - globalCtx.com.tasks = wv_alloc(Persistack[WV_MaxConcurrentStealPerWorker, ChannelSpscSinglePtr[Task]], workforce()) + globalCtx.com.tasksStolen = wv_alloc(Persistack[WV_MaxConcurrentStealPerWorker, ChannelSpscSinglePtr[Task]], workforce()) Backoff: globalCtx.com.parking = wv_alloc(EventNotifier, workforce()) globalCtx.barrier.init(workforce()) - # Lead thread - pinned to CPU 0 + + # Root thread - pinned to CPU 0 myID() = 0 when not(defined(cpp) and defined(vcc)): # TODO: Nim casts between Windows Handles but that requires reinterpret cast for C++ @@ -56,7 +60,8 @@ proc init*(_: type Weave) = # Create workforce() - 1 worker threads for i in 1 ..< workforce(): - createThread(globalCtx.threadpool[i], worker_entry_fn, WorkerID(i)) + {.gcsafe.}: # Workaround regression - https://github.com/nim-lang/Nim/issues/14370 + createThread(globalCtx.threadpool[i], worker_entry_fn, WorkerID(i)) # TODO: we might want to take into account Hyper-Threading (HT) # and allow spawning tasks and pinning to cores that are not HT-siblings. # This is important for memory-bound workloads (like copy, addition, ...) @@ -75,9 +80,16 @@ proc init*(_: type Weave) = myTask().fn = cast[type myTask().fn](0xEFFACED) myTask().scopedBarrier = nil - init(localCtx) + setupWorker() + + # Manager + manager.jobNotifier = globalCtx.com.parking[0].addr + manager.jobsIncoming = wv_alloc(ChannelMpscUnboundedBatch[Job]) + manager.jobsIncoming[].initialize() + # Wait for the child threads discard globalCtx.barrier.wait() + manager.acceptsJobs.store(true, moRelaxed) proc loadBalance*(_: type Weave) {.gcsafe.} = ## This makes the current thread ensures it shares work with other threads. @@ -110,6 +122,7 @@ proc loadBalance*(_: type Weave) {.gcsafe.} = # - or a 4 sockets 100+ cores server grade CPU # - are you doing addition # - or exponentiation + preCondition: onWeaveThread() shareWork() @@ -125,6 +138,7 @@ proc getThreadId*(_: type Weave): int {.inline.} = ## Returns the Weave ID of the current executing thread ## ID is in the range 0 ..< WEAVE_NUM_THREADS ## With 0 being the lead thread and WEAVE_NUM_THREADS = min(countProcessors, getEnv"WEAVE_NUM_THREADS") + preCondition: onWeaveThread() myID().int proc getNumThreads*(_: type Weave): int {.inline.} = @@ -141,7 +155,7 @@ proc globalCleanup() = # Channels, each thread cleaned its channels # We just need to reclaim the memory wv_free(globalCtx.com.thefts) - wv_free(globalCtx.com.tasks) + wv_free(globalCtx.com.tasksStolen) # The root task has no parent ascertain: myTask().isRootTask() @@ -155,7 +169,7 @@ proc globalCleanup() = proc exit*(_: type Weave) = syncRoot(_) signalTerminate(nil) - localCtx.signaledTerminate = true + workerContext.signaledTerminate = true # 1 matching barrier in worker_entry_fn discard globalCtx.barrier.wait() @@ -163,5 +177,5 @@ proc exit*(_: type Weave) = # 1 matching barrier in metrics workerMetrics() - threadLocalCleanup() + teardownWorker() globalCleanup() diff --git a/weave/scheduler.nim b/weave/scheduler.nim index 2f62b186..c0b44af2 100644 --- a/weave/scheduler.nim +++ b/weave/scheduler.nim @@ -12,13 +12,14 @@ import ./cross_thread_com/[channels_spsc_single_ptr, channels_mpsc_unbounded_batch], ./memory/[persistacks, lookaside_lists, allocs, memory_pools], ./contexts, ./config, - ./victims, ./random/rng, ./state_machines/[event_loop, dispatch_events] # Local context # ---------------------------------------------------------------------------------- +{.push gcsafe.} + # Caching description: # # Thread-local objects, with a lifetime equal to the thread lifetime @@ -67,8 +68,11 @@ import # The mempool is initialized in worker_entry_fn # as the main thread needs it for the root task -proc init*(ctx: var TLContext) {.gcsafe.} = +proc setupWorker*() = ## Initialize the thread-local context of a worker (including the lead worker) + preCondition: localThreadKind == Unknown + + template ctx: untyped = workerContext metrics: zeroMem(ctx.counters.addr, sizeof(ctx.counters)) zeroMem(ctx.thefts.addr, sizeof(ctx.thefts)) @@ -78,6 +82,8 @@ proc init*(ctx: var TLContext) {.gcsafe.} = ctx.taskCache.initialize(freeFn = memory_pools.recycle) myMemPool.hook.setCacheMaintenanceEx(ctx.taskCache) + localThreadKind = WorkerThread + # Worker # ----------------------------------------------------------- myWorker().initialize(maxID()) @@ -97,9 +103,9 @@ proc init*(ctx: var TLContext) {.gcsafe.} = # Thieves # ----------------------------------------------------------- myThieves().initialize() - localCtx.stealCache.initialize() - for i in 0 ..< localCtx.stealCache.len: - localCtx.stealCache.access(i).victims.allocate(capacity = workforce()) + ctx.stealCache.initialize() + for i in 0 ..< ctx.stealCache.len: + ctx.stealCache.access(i).victims.allocate(capacity = workforce()) myThefts().rng.seed(myID()) TargetLastVictim: @@ -116,7 +122,7 @@ proc init*(ctx: var TLContext) {.gcsafe.} = ) log("Worker %2d: steal requests channel is 0x%.08x\n", myID(), myThieves().addr) - let (sStart, sStop) = localCtx.stealCache.reservedMemRange() + let (sStart, sStop) = ctx.stealCache.reservedMemRange() log("Worker %2d: steal requests cache range 0x%.08x-0x%.08x\n", myID(), sStart, sStop) @@ -135,32 +141,35 @@ proc init*(ctx: var TLContext) {.gcsafe.} = # Scheduler # ---------------------------------------------------------------------------------- -proc threadLocalCleanup*() {.gcsafe.} = +proc teardownWorker*() = myWorker().deque.flushAndDispose() for i in 0 ..< WV_MaxConcurrentStealPerWorker: # No tasks left ascertain: myTodoBoxes().access(i).isEmpty() - localCtx.stealCache.access(i).victims.delete() + workerContext.stealCache.access(i).victims.delete() myTodoBoxes().delete() Backoff: `=destroy`(myParking()) # The task cache is full of tasks - delete(localCtx.taskCache) + delete(workerContext.taskCache) # This also deletes steal requests already sent to other workers - delete(localCtx.stealCache) + delete(workerContext.stealCache) discard myMemPool().teardown() -proc worker_entry_fn*(id: WorkerID) {.gcsafe.} = + localThreadKind = Unknown + +proc worker_entry_fn*(id: WorkerID) = ## On the start of the threadpool workers will execute this ## until they receive a termination signal # We assume that thread_local variables start all at their binary zero value - preCondition: localCtx == default(TLContext) + preCondition: workerContext == default(WorkerContext) + preCondition: localThreadKind == Unknown myID() = id # If this crashes, you need --tlsemulation:off myMemPool().initialize() - localCtx.init() + setupWorker() discard globalCtx.barrier.wait() eventLoop() @@ -171,7 +180,8 @@ proc worker_entry_fn*(id: WorkerID) {.gcsafe.} = # 1 matching barrier in init(Runtime) for lead thread workerMetrics() - threadLocalCleanup() + teardownWorker() + postCondition: localThreadKind == Unknown proc schedule*(task: sink Task) = ## Add a new task to be scheduled in parallel @@ -182,12 +192,12 @@ proc schedule*(task: sink Task) = profile_stop(enq_deq_task) - # Lead thread - if localCtx.runtimeIsQuiescent: - ascertain: myID() == LeaderID + # Root thread + if workerContext.runtimeIsQuiescent: + ascertain: myID() == RootID debugTermination: log(">>> Worker %2d resumes execution after barrier <<<\n", myID()) - localCtx.runtimeIsQuiescent = false + workerContext.runtimeIsQuiescent = false dispatchToChildrenAndThieves() diff --git a/weave/signals.nim b/weave/signals.nim index 49999836..00ec58ad 100644 --- a/weave/signals.nim +++ b/weave/signals.nim @@ -12,18 +12,20 @@ import ./memory/persistacks, ./cross_thread_com/[channels_spsc_single_ptr, scoped_barriers] +{.push gcsafe.} + # Signals # ---------------------------------------------------------------------------------- proc detectTermination*() {.inline.} = - preCondition: myID() == LeaderID + preCondition: myID() == RootID preCondition: myWorker().leftIsWaiting and myWorker().rightIsWaiting - preCondition: not localCtx.runtimeIsQuiescent + preCondition: not workerContext.runtimeIsQuiescent debugTermination: log(">>> Worker %2d detects termination <<<\n", myID()) - localCtx.runtimeIsQuiescent = true + workerContext.runtimeIsQuiescent = true proc asyncSignal(fn: proc (_: pointer) {.nimcall, gcsafe.}, chan: var ChannelSpscSinglePtr[Task]) = ## Send an asynchronous signal `fn` to channel `chan` @@ -42,8 +44,8 @@ proc asyncSignal(fn: proc (_: pointer) {.nimcall, gcsafe.}, chan: var ChannelSps debugTermination: log("Worker %2d: sending asyncSignal\n", myID()) postCondition: signalSent -proc signalTerminate*(_: pointer) {.gcsafe.} = - preCondition: not localCtx.signaledTerminate +proc signalTerminate*(_: pointer) = + preCondition: not workerContext.signaledTerminate # 1. Terminating means everyone ran out of tasks # so their cache for task channels should be full @@ -51,11 +53,11 @@ proc signalTerminate*(_: pointer) {.gcsafe.} = # 2. Since they have an unique parent, no one else sent them a signal (checked in asyncSignal) if myWorker().left != Not_a_worker: # Send the terminate signal - asyncSignal(signalTerminate, globalCtx.com.tasks[myWorker().left].access(0)) + asyncSignal(signalTerminate, globalCtx.com.tasksStolen[myWorker().left].access(0)) Backoff: # Wake the worker up so that it can process the terminate signal wakeup(myWorker().left) if myWorker().right != Not_a_worker: - asyncSignal(signalTerminate, globalCtx.com.tasks[myWorker().right].access(0)) + asyncSignal(signalTerminate, globalCtx.com.tasksStolen[myWorker().right].access(0)) Backoff: wakeup(myWorker().right) @@ -64,4 +66,4 @@ proc signalTerminate*(_: pointer) {.gcsafe.} = # as a normal task decCounter(tasksExec) - localCtx.signaledTerminate = true + workerContext.signaledTerminate = true diff --git a/weave/state_machines/decline_thief.dot b/weave/state_machines/decline_thief.dot index 9f46dc76..c669c287 100644 --- a/weave/state_machines/decline_thief.dot +++ b/weave/state_machines/decline_thief.dot @@ -3,15 +3,15 @@ digraph declineReqFSA{ node [shape = doublecircle]; InitialState DS_Exit; node [shape = circle, fontcolor=white, fillcolor=darkslategrey, style="filled"]; DS_ParkOrTerminate DS_ReceivedReq DS_FindVictimAndSteal DS_MyOwnReq DS_TreeIdle; InitialState -> DS_ReceivedReq [color="black:invis:black", xlabel="entry point"]; - node [shape = octagon, fontcolor=black, fillcolor=lightsteelblue, style="rounded,filled"]; DS_ParkOrTerminate_DSE_IamLeader DS_ReceivedReq_DSE_IamThief DS_FindVictimAndSteal_DSE_IamNewVictim DS_MyOwnReq_DSE_MyTreeIsIdle DS_TreeIdle_DSE_ItWasTheLastReq ; - DS_ParkOrTerminate_DSE_IamLeader [label="DSE_IamLeader\nlocalCtx.worker.ID == 0"]; - DS_ReceivedReq_DSE_IamThief [label="DSE_IamThief\nreq.thiefID == localCtx.worker.ID"]; - DS_FindVictimAndSteal_DSE_IamNewVictim [label="DSE_IamNewVictim\ntarget == localCtx.worker.ID"]; - DS_MyOwnReq_DSE_MyTreeIsIdle [label="DSE_MyTreeIsIdle\nreq.state == Stealing and localCtx.worker.leftIsWaiting and localCtx.worker.rightIsWaiting"]; + node [shape = octagon, fontcolor=black, fillcolor=lightsteelblue, style="rounded,filled"]; DS_ParkOrTerminate_DSE_IamRoot DS_ReceivedReq_DSE_IamThief DS_FindVictimAndSteal_DSE_IamNewVictim DS_MyOwnReq_DSE_MyTreeIsIdle DS_TreeIdle_DSE_ItWasTheLastReq ; + DS_ParkOrTerminate_DSE_IamRoot [label="DSE_IamRoot\nworkerContext.worker.ID == 0"]; + DS_ReceivedReq_DSE_IamThief [label="DSE_IamThief\nreq.thiefID == workerContext.worker.ID"]; + DS_FindVictimAndSteal_DSE_IamNewVictim [label="DSE_IamNewVictim\ntarget == workerContext.worker.ID"]; + DS_MyOwnReq_DSE_MyTreeIsIdle [label="DSE_MyTreeIsIdle\nreq.state == Stealing and workerContext.worker.leftIsWaiting and workerContext.worker.rightIsWaiting"]; DS_TreeIdle_DSE_ItWasTheLastReq [label="DSE_ItWasTheLastReq\ntrue"]; - DS_ParkOrTerminate -> DS_ParkOrTerminate_DSE_IamLeader[style=bold, xlabel="always"]; - DS_ParkOrTerminate_DSE_IamLeader -> DS_Exit [style=dashed, xlabel="true"]; - DS_ParkOrTerminate_DSE_IamLeader -> DS_Exit [xlabel="default"]; + DS_ParkOrTerminate -> DS_ParkOrTerminate_DSE_IamRoot[style=bold, xlabel="always"]; + DS_ParkOrTerminate_DSE_IamRoot -> DS_Exit [style=dashed, xlabel="true"]; + DS_ParkOrTerminate_DSE_IamRoot -> DS_Exit [xlabel="default"]; DS_ReceivedReq -> DS_ReceivedReq_DSE_IamThief[style=bold, xlabel="always"]; DS_ReceivedReq_DSE_IamThief -> DS_MyOwnReq [style=dashed, xlabel="true"]; DS_ReceivedReq_DSE_IamThief -> DS_FindVictimAndSteal [xlabel="default"]; diff --git a/weave/state_machines/decline_thief.nim b/weave/state_machines/decline_thief.nim index 0b8bcabf..645543ee 100644 --- a/weave/state_machines/decline_thief.nim +++ b/weave/state_machines/decline_thief.nim @@ -35,7 +35,7 @@ type DS_Event = enum DSE_IamNewVictim DSE_MyTreeIsIdle DSE_ItWasTheLastReq - DSE_IamLeader + DSE_IamRoot declareAutomaton(declineReqFSA, DeclineState, DS_Event) @@ -164,12 +164,12 @@ behavior(declineReqFSA): # Last steal attempt is a failure # ------------------------------------------- -implEvent(declineReqFSA, DSE_IamLeader): - myID() == LeaderID +implEvent(declineReqFSA, DSE_IamRoot): + myID() == RootID behavior(declineReqFSA): ini: DS_ParkOrTerminate - event: DSE_IamLeader + event: DSE_IamRoot transition: detectTermination() forget(req) @@ -193,7 +193,7 @@ behavior(declineReqFSA): # ------------------------------------------- synthesize(declineReqFSA): - proc decline*(req: sink StealRequest) {.gcsafe.} + proc decline*(req: sink StealRequest) {.gcsafe, raises: [].} # Dump the graph # ------------------------------------------- diff --git a/weave/state_machines/decline_thief.png b/weave/state_machines/decline_thief.png index d521b941..845019cf 100644 Binary files a/weave/state_machines/decline_thief.png and b/weave/state_machines/decline_thief.png differ diff --git a/weave/state_machines/dispatch_events.nim b/weave/state_machines/dispatch_events.nim index 8e0f8381..178863ea 100644 --- a/weave/state_machines/dispatch_events.nim +++ b/weave/state_machines/dispatch_events.nim @@ -11,15 +11,46 @@ import ../contexts, ../config, ../victims, ../thieves, - ./decline_thief, ./handle_thieves + ./decline_thief, ./handle_thieves, + ../cross_thread_com/channels_mpsc_unbounded_batch + +{.push gcsafe.} proc nextTask*(childTask: static bool): Task {.inline.} = + # Note: + # We distinguish jobs and tasks. + # - Jobs are submitted to Weave by external threads. + # Jobs enqueued have all their pledges resolved and so are independent. + # To ensure fairness in worst-case scenario, we execute them in FIFO order. + # Jobs may be split into multiple tasks. + # - Tasks are spawned on Weave runtime, we want to process them as fast as possible. + # We want to maximize throughput (process them as fast as possible). + # To maximize throughput, we execute them in LIFO order. + # This ensures that the children of tasks are processed before we try to process their parent. + # + # In particular if we have jobs A, B, C that spawns 3 tasks each + # processing order will be (on a single thread) + # A2, A1, A0, B2, B1, B0, C2, C1, C0 + # to ensure that job A, B, C have minimized latency and maximized throughput. profile(enq_deq_task): + # Try picking a new task (LIFO) if childTask: result = myWorker().deque.popFirstIfChild(myTask()) else: result = myWorker().deque.popFirst() + Manager: # TODO: profiling + # If we drained the task, try picking a new job (FIFO) + debugExecutor: + log("Manager %d: checking jobs (%d in queue)\n", myID(), managerJobQueue.peek()) + if result.isNil: + var job: Job + if managerJobQueue.tryRecv(job): + result = cast[Task](job) + + debugExecutor: + log("Manager %d: Received job 0x%.08x from provider 0x%.08x\n", myID(), job, job.parent) + when WV_StealEarly > 0: if not result.isNil: # If we have a big loop should we allow early thefts? diff --git a/weave/state_machines/event_loop.dot b/weave/state_machines/event_loop.dot index 03687c4f..bdc2c56f 100644 --- a/weave/state_machines/event_loop.dot +++ b/weave/state_machines/event_loop.dot @@ -6,7 +6,7 @@ digraph workerEventLoop{ node [shape = octagon, fontcolor=black, fillcolor=lightsteelblue, style="rounded,filled"]; WEL_CheckTask_EV_FoundTask WEL_OutOfTasks_EV_StoleTask WEL_CheckTermination_EV_SignaledTerminate ; WEL_CheckTask_EV_FoundTask [label="EV_FoundTask\nnot isNil(task)"]; WEL_OutOfTasks_EV_StoleTask [label="EV_StoleTask\nstoleTask"]; - WEL_CheckTermination_EV_SignaledTerminate [label="EV_SignaledTerminate\nlocalCtx.signaledTerminate"]; + WEL_CheckTermination_EV_SignaledTerminate [label="EV_SignaledTerminate\nworkerContext.signaledTerminate"]; WEL_CheckTask -> WEL_CheckTask_EV_FoundTask[style=bold, xlabel="always"]; WEL_CheckTask_EV_FoundTask -> WEL_CheckTask [style=dashed, xlabel="true"]; WEL_CheckTask_EV_FoundTask -> WEL_OutOfTasks [xlabel="default"]; diff --git a/weave/state_machines/event_loop.nim b/weave/state_machines/event_loop.nim index df41fbd2..31360efa 100644 --- a/weave/state_machines/event_loop.nim +++ b/weave/state_machines/event_loop.nim @@ -49,7 +49,7 @@ setTerminalState(workerEventLoop, WEL_Exit) # ------------------------------------------- implEvent(workerEventLoop, EV_SignaledTerminate): - localCtx.signaledTerminate + workerContext.signaledTerminate behavior(workerEventLoop): ini: WEL_CheckTermination @@ -84,7 +84,7 @@ behavior(workerEventLoop): execute(task) profile(enq_deq_task): # The task memory is reused but not zero-ed - localCtx.taskCache.add(task) + workerContext.taskCache.add(task) fin: WEL_CheckTask behavior(workerEventLoop): @@ -151,7 +151,7 @@ behavior(workerEventLoop): execute(task) profile(enq_deq_task): # The memory is reused but not zero-ed - localCtx.taskCache.add(task) + workerContext.taskCache.add(task) fin: WEL_CheckTermination # ------------------------------------------- diff --git a/weave/state_machines/event_loop.png b/weave/state_machines/event_loop.png index a03213b8..73277c31 100644 Binary files a/weave/state_machines/event_loop.png and b/weave/state_machines/event_loop.png differ diff --git a/weave/state_machines/handle_thieves.dot b/weave/state_machines/handle_thieves.dot index dfb21f4b..642e17d7 100644 --- a/weave/state_machines/handle_thieves.dot +++ b/weave/state_machines/handle_thieves.dot @@ -1,19 +1,30 @@ digraph handleThievesFSA{ splines=ortho; node [shape = doublecircle]; InitialState IT_Exit; - node [shape = circle, fontcolor=white, fillcolor=darkslategrey, style="filled"]; IT_CanSplit IT_CheckTheft IT_IncomingReq; + node [shape = circle, fontcolor=white, fillcolor=darkslategrey, style="filled"]; IT_ManagerCheckJob IT_CheckTheft IT_Split IT_IncomingReq IT_CheckSplit; InitialState -> IT_CheckTheft [color="black:invis:black", xlabel="entry point"]; - node [shape = octagon, fontcolor=black, fillcolor=lightsteelblue, style="rounded,filled"]; IT_CanSplit_ITE_ReqIsMine IT_CheckTheft_ITE_FoundReq IT_IncomingReq_ITE_NoTaskAndCanSplitCurrent ; - IT_CanSplit_ITE_ReqIsMine [label="ITE_ReqIsMine\nreq.thiefID == localCtx.worker.ID"]; + node [shape = octagon, fontcolor=black, fillcolor=lightsteelblue, style="rounded,filled"]; IT_ManagerCheckJob_ITE_FoundJob IT_CheckTheft_ITE_FoundReq IT_Split_ITE_ReqIsMine IT_IncomingReq_ITE_FoundTask IT_IncomingReq_ITE_Manager IT_CheckSplit_ITE_CanSplit ; + IT_ManagerCheckJob_ITE_FoundJob [label="ITE_FoundJob\nfoundJob"]; IT_CheckTheft_ITE_FoundReq [label="ITE_FoundReq\nrecv(req)"]; - IT_IncomingReq_ITE_NoTaskAndCanSplitCurrent [label="ITE_NoTaskAndCanSplitCurrent\nisEmpty(localCtx.worker.deque) and (not isNil(poppedTask) and poppedTask.isLoop and 1 < (poppedTask.stop - poppedTask.cur + poppedTask.stride - 1) div poppedTask.stride)"]; - IT_CanSplit -> IT_CanSplit_ITE_ReqIsMine[style=bold, xlabel="always"]; - IT_CanSplit_ITE_ReqIsMine -> IT_CheckTheft [style=dashed, xlabel="true"]; - IT_CanSplit_ITE_ReqIsMine -> IT_CheckTheft [xlabel="default"]; + IT_Split_ITE_ReqIsMine [label="ITE_ReqIsMine\nreq.thiefID == workerContext.worker.ID"]; + IT_IncomingReq_ITE_FoundTask [label="ITE_FoundTask\nnot isEmpty(workerContext.worker.deque)"]; + IT_IncomingReq_ITE_Manager [label="ITE_Manager\nworkerContext.worker.ID == ManagerID"]; + IT_CheckSplit_ITE_CanSplit [label="ITE_CanSplit\nnot isNil(poppedTask) and poppedTask.isLoop and 1 < (poppedTask.stop - poppedTask.cur + poppedTask.stride - 1) div poppedTask.stride"]; + IT_ManagerCheckJob -> IT_ManagerCheckJob_ITE_FoundJob[style=bold, xlabel="always"]; + IT_ManagerCheckJob_ITE_FoundJob -> IT_CheckTheft [style=dashed, xlabel="true"]; + IT_ManagerCheckJob_ITE_FoundJob -> IT_CheckSplit [xlabel="default"]; IT_CheckTheft -> IT_CheckTheft_ITE_FoundReq[style=bold, xlabel="always"]; IT_CheckTheft_ITE_FoundReq -> IT_IncomingReq [style=dashed, xlabel="true"]; IT_CheckTheft_ITE_FoundReq -> IT_Exit [xlabel="default"]; - IT_IncomingReq -> IT_IncomingReq_ITE_NoTaskAndCanSplitCurrent[style=bold, xlabel="always"]; - IT_IncomingReq_ITE_NoTaskAndCanSplitCurrent -> IT_CanSplit [style=dashed, xlabel="true"]; - IT_IncomingReq_ITE_NoTaskAndCanSplitCurrent -> IT_CheckTheft [xlabel="default"]; + IT_Split -> IT_Split_ITE_ReqIsMine[style=bold, xlabel="always"]; + IT_Split_ITE_ReqIsMine -> IT_CheckTheft [style=dashed, xlabel="true"]; + IT_Split_ITE_ReqIsMine -> IT_CheckTheft [xlabel="default"]; + IT_IncomingReq -> IT_IncomingReq_ITE_FoundTask[style=bold, xlabel="always"]; + IT_IncomingReq_ITE_FoundTask -> IT_CheckTheft [style=dashed, xlabel="true"]; + IT_IncomingReq_ITE_FoundTask -> IT_IncomingReq_ITE_Manager[style=dotted, xlabel="false"]; + IT_IncomingReq_ITE_Manager -> IT_ManagerCheckJob [style=dashed, xlabel="true"]; + IT_IncomingReq_ITE_Manager -> IT_CheckSplit [xlabel="default"]; + IT_CheckSplit -> IT_CheckSplit_ITE_CanSplit[style=bold, xlabel="always"]; + IT_CheckSplit_ITE_CanSplit -> IT_Split [style=dashed, xlabel="true"]; + IT_CheckSplit_ITE_CanSplit -> IT_CheckTheft [xlabel="default"]; } \ No newline at end of file diff --git a/weave/state_machines/handle_thieves.nim b/weave/state_machines/handle_thieves.nim index 25b82072..d2747469 100644 --- a/weave/state_machines/handle_thieves.nim +++ b/weave/state_machines/handle_thieves.nim @@ -10,9 +10,10 @@ import synthesis import ../instrumentation/[contracts, loggers], ../datatypes/[sync_types, prell_deques, context_thread_local], - ../contexts, + ../contexts, ../config, ../victims, ../loop_splitting, - ../thieves + ../thieves, + ../cross_thread_com/channels_mpsc_unbounded_batch # Scheduler - Finite Automaton rewrite # ---------------------------------------------------------------------------------- @@ -23,11 +24,16 @@ import type IncomingThievesState = enum IT_CheckTheft IT_IncomingReq - IT_CanSplit + IT_ManagerCheckJob + IT_CheckSplit + IT_Split type IT_Event = enum ITE_FoundReq - ITE_NoTaskAndCanSplitCurrent + ITE_FoundTask + ITE_Manager + ITE_FoundJob + ITE_CanSplit ITE_ReqIsMine declareAutomaton(handleThievesFSA, IncomingThievesState, IT_Event) @@ -40,18 +46,12 @@ setPrologue(handleThievesFSA): ## split our popped task if possible var req: StealRequest +# Theft +# ------------------------------------------------------ + implEvent(handleThievesFSA, ITE_FoundReq): recv(req) -implEvent(handleThievesFSA, ITE_NoTaskAndCanSplitCurrent): - # If we just popped a loop task, we may split it here - # It makes dispatching tasks simpler - # Don't send our popped task otherwise - myWorker().deque.isEmpty() and poppedTask.isSplittable() - -implEvent(handleThievesFSA, ITE_ReqIsMine): - req.thiefID == myID() - behavior(handleThievesFSA): ini: IT_CheckTheft event: ITE_FoundReq @@ -63,26 +63,94 @@ behavior(handleThievesFSA): transition: discard fin: IT_Exit +# Task +# ------------------------------------------------------ + +implEvent(handleThievesFSA, ITE_FoundTask): + not myWorker.deque.isEmpty() + +implEvent(handleThievesFSA, ITE_Manager): + workerIsManager() + # TODO, can we optimize by checking who the thief is before this step behavior(handleThievesFSA): ini: IT_IncomingReq - event: ITE_NoTaskAndCanSplitCurrent + event: ITE_FoundTask + transition: dispatchElseDecline(req) + fin: IT_CheckTheft + +behavior(handleThievesFSA): + # Fallback to check split + ini: IT_IncomingReq + event: ITE_Manager transition: discard - fin: IT_CanSplit + fin: IT_ManagerCheckJob behavior(handleThievesFSA): + # Fallback to check split ini: IT_IncomingReq + transition: discard + fin: IT_CheckSplit + +# Job +# ------------------------------------------------------ + +onEntry(handleThievesFSA, IT_ManagerCheckJob): + var job: Job + let foundJob = managerJobQueue.tryRecv(job) + +implEvent(handleThievesFSA, ITE_FoundJob): + foundJob + +behavior(handleThievesFSA): + ini: IT_ManagerCheckJob + event: ITE_FoundJob + transition: + # TODO: not pretty to enqueue, to dequeue just after in dispatchElseDecline + debugExecutor: + log("Manager %d: schedule a new job for execution.\n", myID) + myWorker().deque.addFirst cast[Task](job) + req.dispatchElseDecline() + fin: IT_CheckTheft + +behavior(handleThievesFSA): + # Fallback to check split + ini: IT_ManagerCheckJob + transition: discard + fin: IT_CheckSplit + +# Split +# ------------------------------------------------------ + +implEvent(handleThievesFSA, ITE_CanSplit): + # If we just popped a loop task, we may split it here + # It makes dispatching tasks simpler + # Don't send our popped task otherwise + poppedTask.isSplittable() + +implEvent(handleThievesFSA, ITE_ReqIsMine): + req.thiefID == myID() + +behavior(handleThievesFSA): + ini: IT_CheckSplit + event: ITE_CanSplit + transition: discard + fin: IT_Split + +behavior(handleThievesFSA): + # Fallback + ini: IT_CheckSplit transition: dispatchElseDecline(req) fin: IT_CheckTheft behavior(handleThievesFSA): - ini: IT_CanSplit + ini: IT_Split event: ITE_ReqIsMine transition: forget(req) fin: IT_CheckTheft behavior(handleThievesFSA): - ini: IT_CanSplit + ini: IT_Split transition: splitAndSend(poppedTask, req, workSharing = false) fin: IT_CheckTheft diff --git a/weave/state_machines/handle_thieves.png b/weave/state_machines/handle_thieves.png index 9c82bab5..72c310ee 100644 Binary files a/weave/state_machines/handle_thieves.png and b/weave/state_machines/handle_thieves.png differ diff --git a/weave/state_machines/recv_task_else_steal.dot b/weave/state_machines/recv_task_else_steal.dot index f5739e3d..5a702dfb 100644 --- a/weave/state_machines/recv_task_else_steal.dot +++ b/weave/state_machines/recv_task_else_steal.dot @@ -5,7 +5,7 @@ digraph recvTaskFSA{ InitialState -> RT_CheckChannel [color="black:invis:black", xlabel="entry point"]; node [shape = octagon, fontcolor=black, fillcolor=lightsteelblue, style="rounded,filled"]; RT_FoundTask_RTE_isWaiting RT_CheckChannel_RTE_FoundTask ; node [shape = diamond, fontcolor=black, fillcolor=coral, style="rounded,filled"]; RT_CheckChannel_RTE_CheckedAllChannels ; - RT_FoundTask_RTE_isWaiting [label="RTE_isWaiting\nlocalCtx.worker.isWaiting"]; + RT_FoundTask_RTE_isWaiting [label="RTE_isWaiting\nworkerContext.worker.isWaiting"]; RT_CheckChannel_RTE_FoundTask [label="RTE_FoundTask\nresult"]; RT_CheckChannel_RTE_CheckedAllChannels [label="RTE_CheckedAllChannels\ncurChanIdx == WV_MaxConcurrentStealPerWorker"]; RT_FoundTask -> RT_FoundTask_RTE_isWaiting[style=bold, xlabel="always"]; @@ -16,4 +16,4 @@ digraph recvTaskFSA{ RT_CheckChannel_RTE_CheckedAllChannels -> RT_CheckChannel_RTE_FoundTask[xlabel="normal flow"]; RT_CheckChannel_RTE_FoundTask -> RT_FoundTask [style=dashed, xlabel="true"]; RT_CheckChannel_RTE_FoundTask -> RT_CheckChannel [xlabel="default"]; -} \ No newline at end of file +} diff --git a/weave/state_machines/recv_task_else_steal.nim b/weave/state_machines/recv_task_else_steal.nim index b71c6120..454a0c73 100644 --- a/weave/state_machines/recv_task_else_steal.nim +++ b/weave/state_machines/recv_task_else_steal.nim @@ -78,7 +78,7 @@ behavior(recvTaskFSA): event: RTE_FoundTask transition: myTodoBoxes().nowAvailable(curChanIdx) - localCtx.stealCache.nowAvailable(curChanIdx) + workerContext.stealCache.nowAvailable(curChanIdx) debug: log("Worker %2d: received a task with function address 0x%.08x (Channel 0x%.08x)\n", myID(), task.fn, myTodoBoxes().access(curChanIdx).addr) fin: RT_FoundTask @@ -117,7 +117,7 @@ behavior(recvTaskFSA): # ------------------------------------------- synthesize(recvTaskFSA): - proc recvElseSteal*(task: var Task, isOutOfTasks: bool): bool + proc recvElseSteal*(task: var Task, isOutOfTasks: bool): bool {.gcsafe.} # Dump the graph # ------------------------------------------- diff --git a/weave/state_machines/sync.nim b/weave/state_machines/sync.nim index 92910274..57fb4463 100644 --- a/weave/state_machines/sync.nim +++ b/weave/state_machines/sync.nim @@ -71,7 +71,7 @@ behavior(awaitFSA): profile(run_task): execute(task) profile(enq_deq_task): - localCtx.taskCache.add(task) + workerContext.taskCache.add(task) fin: AW_CheckTask behavior(awaitFSA): @@ -155,43 +155,26 @@ behavior(awaitFSA): execute(task) profile(enq_deq_task): # The memory is reused but not zero-ed - localCtx.taskCache.add(task) + workerContext.taskCache.add(task) fin: AW_CheckTask # ------------------------------------------- synthesize(awaitFSA): - proc forceFuture[T](fv: Flowvar[T], parentResult: var T) - -# ------------------------------------------- - -EagerFV: - proc forceComplete[T](fv: Flowvar[T], parentResult: var T) {.inline.} = - ## From the parent thread awaiting on the result, force its computation - ## by eagerly processing only the child tasks spawned by the awaited task - fv.forceFuture(parentResult) - recycleChannel(fv) - -LazyFV: - template forceComplete[T](fv: Flowvar[T], parentResult: var T) = - forceFuture(fv, parentResult) - # Reclaim memory - if not fv.lfv.hasChannel: - ascertain: fv.lfv.isReady - parentResult = cast[ptr T](fv.lfv.lazy.buf.addr)[] - else: - ascertain: not fv.lfv.lazy.chan.isNil - recycleChannel(fv) + proc forceFuture[T](fv: Flowvar[T], parentResult: var T) {.gcsafe.} # Public # ------------------------------------------- -proc sync*[T](fv: FlowVar[T]): T {.inline.} = +proc sync*[T](fv: FlowVar[T]): T {.inline, gcsafe.} = ## Blocks the current thread until the flowvar is available ## and returned. ## The thread is not idle and will complete pending tasks. - fv.forceComplete(result) - + forceFuture(fv, result) + cleanup(fv) + LazyFV: + if not fv.lfv.hasChannel: + result = cast[ptr T](fv.lfv.lazy.buf.addr)[] # Dump the graph # ------------------------------------------- diff --git a/weave/state_machines/sync_root.dot b/weave/state_machines/sync_root.dot index 3fe590a7..7537b681 100644 --- a/weave/state_machines/sync_root.dot +++ b/weave/state_machines/sync_root.dot @@ -6,10 +6,10 @@ digraph syncRootFSA{ node [shape = octagon, fontcolor=black, fillcolor=lightsteelblue, style="rounded,filled"]; SY_OutOfTasks_SYE_SoleWorker SY_OutOfTasks_SYE_Quiescent SY_CheckTask_SYE_HasTask SY_Steal_SYE_ReceivedTask ; node [shape = diamond, fontcolor=black, fillcolor=coral, style="rounded,filled"]; SY_Steal_SYE_Quiescent ; SY_OutOfTasks_SYE_SoleWorker [label="SYE_SoleWorker\n globalCtx.numWorkers == 1"]; - SY_OutOfTasks_SYE_Quiescent [label="SYE_Quiescent\nlocalCtx.runtimeIsQuiescent"]; + SY_OutOfTasks_SYE_Quiescent [label="SYE_Quiescent\nworkerContext.runtimeIsQuiescent"]; SY_CheckTask_SYE_HasTask [label="SYE_HasTask\nnot task.isNil"]; SY_Steal_SYE_ReceivedTask [label="SYE_ReceivedTask\nlootedTask"]; - SY_Steal_SYE_Quiescent [label="SYE_Quiescent\nlocalCtx.runtimeIsQuiescent"]; + SY_Steal_SYE_Quiescent [label="SYE_Quiescent\nworkerContext.runtimeIsQuiescent"]; SY_OutOfTasks -> SY_OutOfTasks_SYE_SoleWorker[style=bold, xlabel="always"]; SY_OutOfTasks_SYE_SoleWorker -> SY_Exit [style=dashed, xlabel="true"]; SY_OutOfTasks_SYE_SoleWorker -> SY_OutOfTasks_SYE_Quiescent[style=dotted, xlabel="false"]; diff --git a/weave/state_machines/sync_root.nim b/weave/state_machines/sync_root.nim index d2a21496..c2031880 100644 --- a/weave/state_machines/sync_root.nim +++ b/weave/state_machines/sync_root.nim @@ -57,7 +57,7 @@ setPrologue(syncRootFSA): setEpilogue(syncRootFSA): # Execution continues but the runtime is quiescent until new tasks # are created - postCondition: localCtx.runtimeIsQuiescent + postCondition: workerContext.runtimeIsQuiescent debugTermination: log(">>> Worker %2d leaves barrier <<<\n", myID()) @@ -70,7 +70,7 @@ implEvent(syncRootFSA, SYE_HasTask): not task.isNil implEvent(syncRootFSA, SYE_Quiescent): - localCtx.runtimeIsQuiescent + workerContext.runtimeIsQuiescent implEvent(syncRootFSA, SYE_SoleWorker): workforce() == 1 @@ -87,7 +87,7 @@ behavior(syncRootFSA): profile(run_task): execute(task) profile(enq_deq_task): - localCtx.taskCache.add(task) + workerContext.taskCache.add(task) fin: SY_CheckTask behavior(syncRootFSA): @@ -100,7 +100,7 @@ behavior(syncRootFSA): behavior(syncRootFSA): ini: SY_OutOfTasks event: SYE_SoleWorker - transition: localCtx.runtimeIsQuiescent = true + transition: workerContext.runtimeIsQuiescent = true fin: SY_Exit behavior(syncRootFSA): @@ -180,7 +180,7 @@ behavior(syncRootFSA): execute(task) profile(enq_deq_task): # The memory is reused but not zero-ed - localCtx.taskCache.add(task) + workerContext.taskCache.add(task) fin: SY_CheckTask # ------------------------------------------- diff --git a/weave/state_machines/sync_root.png b/weave/state_machines/sync_root.png index 3cd3bf82..085489c5 100644 Binary files a/weave/state_machines/sync_root.png and b/weave/state_machines/sync_root.png differ diff --git a/weave/state_machines/sync_scope.nim b/weave/state_machines/sync_scope.nim index abf19b3b..15602c88 100644 --- a/weave/state_machines/sync_scope.nim +++ b/weave/state_machines/sync_scope.nim @@ -89,7 +89,7 @@ behavior(syncScopeFSA): profile(run_task): execute(task) profile(enq_deq_task): - localCtx.taskCache.add(task) + workerContext.taskCache.add(task) fin: SB_CheckTask behavior(syncScopeFSA): @@ -162,13 +162,13 @@ behavior(syncScopeFSA): execute(task) profile(enq_deq_task): # The memory is re-used but not zero-ed - localCtx.taskCache.add(task) + workerContext.taskCache.add(task) fin: SB_CheckTask # ------------------------------------------- synthesize(syncScopeFSA): - proc wait(scopedBarrier: var ScopedBarrier) + proc wait(scopedBarrier: var ScopedBarrier) {.gcsafe.} # Public # ------------------------------------------- diff --git a/weave/targets.nim b/weave/targets.nim index b270b95e..39aee6f9 100644 --- a/weave/targets.nim +++ b/weave/targets.nim @@ -11,6 +11,8 @@ import ./instrumentation/[contracts, loggers], ./config +{.push gcsafe.} + # Victim selection # ---------------------------------------------------------------------------------- @@ -84,7 +86,7 @@ proc findVictim*(req: var StealRequest): WorkerID = # and I was the last possible target. # or all threads but the main one are sleeping and it retrieved its own request # from one of the sleeper queues - postCondition: result != myID() or (result == myID() and myID() == LeaderID) + postCondition: result != myID() or (result == myID() and myID() == RootID) postCondition: result in 0 ..< workforce() postCondition: req.retry in 0 .. WV_MaxRetriesPerSteal diff --git a/weave/thieves.nim b/weave/thieves.nim index 6b88b885..1ddab929 100644 --- a/weave/thieves.nim +++ b/weave/thieves.nim @@ -14,13 +14,15 @@ import ./config, std/atomics +{.push gcsafe.} + # Thief # ---------------------------------------------------------------------------------- proc newStealRequest(): StealRequest = ## Create a new steal request ## This does not initialize the Thief state - result = localCtx.stealCache.borrow() + result = workerContext.stealCache.borrow() ascertain: result.victims.capacity.int32 == workforce() result.next.store(nil, moRelaxed) @@ -135,7 +137,7 @@ proc trySteal*(isOutOfTasks: bool) = req.state = Working req.findVictimAndSteal() -proc forget*(req: sink StealRequest) {.gcsafe.} = +proc forget*(req: sink StealRequest) = ## Removes a steal request from circulation ## Re-increment the worker quota @@ -144,7 +146,7 @@ proc forget*(req: sink StealRequest) {.gcsafe.} = myThefts().outstanding -= 1 myTodoBoxes().recycle(req.thiefAddr) - localCtx.stealCache.recycle(req) + workerContext.stealCache.recycle(req) proc drop*(req: sink StealRequest) = ## Removes a steal request from circulation @@ -162,7 +164,7 @@ proc drop*(req: sink StealRequest) = # don't decrement the count so that no new theft is initiated myThefts().dropped += 1 myTodoBoxes().recycle(req.thiefAddr) - localCtx.stealCache.recycle(req) + workerContext.stealCache.recycle(req) proc stealEarly*(){.inline.} = if workforce() == 1: diff --git a/weave/victims.nim b/weave/victims.nim index e2446611..760ffcf3 100644 --- a/weave/victims.nim +++ b/weave/victims.nim @@ -14,6 +14,8 @@ import ./thieves, ./loop_splitting, ./state_machines/decline_thief +{.push gcsafe.} + # Victims - Proxy handling on behalf of idle child workers # ---------------------------------------------------------------------------------- @@ -165,7 +167,7 @@ proc send(req: sink StealRequest, task: sink Task, numStolen: int32 = 1) {.inlin incCounter(stealHandled) incCounter(tasksSent, numStolen) -proc dispatchElseDecline*(req: sink StealRequest) {.gcsafe.}= +proc dispatchElseDecline*(req: sink StealRequest) = ## Send tasks in return of a steal request ## or decline and relay the steal request to another thread @@ -299,7 +301,16 @@ proc distributeWork*(req: sink StealRequest, workSharing: bool): bool = # the branch that leads to termination # and would logically return true - # Otherwise try to split the current one + Manager: + # Introduce a pending job otherwise + var job: Job + if managerJobQueue.tryRecv(job): + # TODO: not pretty to enqueue, to dequeue just after in dispatchElseDecline + myWorker().deque.addFirst cast[Task](job) + req.dispatchElseDecline() + return true + + # Otherwise try to split the current task if myTask().isSplittable(): if req.thiefID != myID(): myTask().splitAndSend(req, workSharing) diff --git a/weave/workers.nim b/weave/workers.nim index f53606c8..d8e5f298 100644 --- a/weave/workers.nim +++ b/weave/workers.nim @@ -12,6 +12,8 @@ import ./cross_thread_com/scoped_barriers, ./config +{.push gcsafe.} + # Worker - Tasks handling # ---------------------------------------------------------------------------------- @@ -28,7 +30,7 @@ proc restartWork*() = myWorker().isWaiting = false myThefts().dropped = 0 -proc execute*(task: Task) {.inline, gcsafe.} = +proc execute*(task: Task) {.inline.} = preCondition: not task.fn.isNil let suspendedTask = myTask()