Skip to content

Commit

Permalink
Weave as a background service (#136)
Browse files Browse the repository at this point in the history
* Create base types for composability with other execution engines #132

* Renamings

* Prepare for job support

* Weave as an executor service - high-level API

* Update the state machine to support executor mode

* Wait for the runtime to be ready (next step, wakeup of worker tree on submission)

* Have a manager thread handle job submissions to avoid race in termination detection

* Fix parallel jobs example to properly wait

* add job spawning tasks test

* Fix awaiting delayed computations

* Implement jobs with multiple dependencies

* add runInBackground to start Weave as a service

* Update tests and documentation

* Workaround upstream regression on GC-safe createThread nim-lang/Nim#14370

* cleanup can have side effect on Windows. Also bmp nimble version

* cleanup for LazyFlowvar can also have side-effects

* Threads cannot be copied ¯\\\_(ツ)\_/¯
  • Loading branch information
mratsim authored May 16, 2020
1 parent 46cf323 commit 42360c3
Show file tree
Hide file tree
Showing 38 changed files with 1,199 additions and 193 deletions.
103 changes: 95 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,14 +152,28 @@ exit(Weave)

### Complete list

- `init(Weave)`, `exit(Weave)` to start and stop the runtime. Forgetting this will give you nil pointer exceptions on spawn.
- `spawn fnCall(args)` which spawns a function that may run on another thread and gives you an awaitable Flowvar handle.
We separate the list depending on the threading context

#### Root thread

The root thread is the thread that started the Weave runtime. It has special privileges.

- `init(Weave)`, `exit(Weave)` to start and stop the runtime. Forgetting this will give you nil pointer exceptions on spawn.\
The thread that calls `init` will become the root thread.
- `syncRoot(Weave)` is a global barrier. The root thread will not continue beyond
until all tasks in the runtime are finished.

#### Weave worker thread

A worker thread is automatically created per (logical) core on the machine.
The root thread is also a worker thread.
Worker threads are tuned to maximize throughput of computational **tasks**.

- `spawn fnCall(args)` which spawns a function that may run on another thread and gives you an awaitable `Flowvar` handle.
- `newPledge`, `fulfill` and `spawnDelayed` (experimental) to delay a task until some dependencies are met. This allows expressing precise data dependencies and producer-consumer relationships.
- `sync(Flowvar)` will await a Flowvar and block until you receive a result.
- `isReady(Flowvar)` will check if `sync` will actually block or return the result immediately.
- `syncRoot(Weave)` is a global barrier for the main thread on the main task.
Using syncRoot in a proc means that the can only be called from the main thread.
`syncRoot(Weave)` is implicitly called by `exit(Weave)`

- `syncScope` is a scope barrier. The thread will not move beyond the scope until
all tasks and parallel loops spawned and their descendants are finished.
`syncScope` is composable, it can be called by any thread, it can be nested.
Expand All @@ -177,13 +191,83 @@ exit(Weave)
may be created by the current tasks.
- `parallelFor`, `parallelForStrided`, `parallelForStaged`, `parallelForStagedStrided` are described above and in the experimental section.
- `loadBalance(Weave)` gives the runtime the opportunity to distribute work. Insert this within long computation as due to Weave design, it's the busy workers that are also in charge of load balancing. This is done automatically when using `parallelFor`.
- `isSpawned` allows you to build speculative algorithm where a thread is spawned only if certain conditions are valid. See the `nqueens` benchmark for an example.
- `getThreadId` returns a unique thread ID. The thread ID is in the range 0 ..< number of threads.
- `isSpawned(Flowvar)` allows you to build speculative algorithm where a thread is spawned only if certain conditions are valid. See the `nqueens` benchmark for an example.
- `getThreadId(Weave)` returns a unique thread ID. The thread ID is in the range 0 ..< number of threads.

The max number of threads can be configured by the environment variable WEAVE_NUM_THREADS
The max number of worker threads can be configured by the environment variable WEAVE_NUM_THREADS
and default to your number of logical cores (including HyperThreading).
Weave uses Nim's `countProcessors()` in `std/cpuinfo`

#### Foreign thread & Background service (experimental)

Weave can also be run as a background service and process `jobs` similar to the `Executor` concept in C++.
Jobs will be processed in FIFO order.

> **Experimental**:
> The distinction between spawn/sync on a Weave thread
> and submit/waitFor on a foreign thread may be removed in the future.
A background service can be started with either:
- `thr.runInBackground(Weave)`
- or `thr.runInBackground(Weave, signalShutdown: ptr Atomic[bool])`

with `thr` an uninitialized `Thread[void]` or `Thread[ptr Atomic[bool]]`

Then the foreign thread should call:
- `setupSubmitterThread(Weave)`: Configure a thread so that it can send jobs to a background Weave service
and on shutdown
- `waitUntilReady(Weave)`: Block the foreign thread until the Weave runtime is ready to accept jobs.

and for shutdown
- `teardownSubmitterThread(Weave)`: Cleanup Weave resources allocated on the thread.

Once setup, a foreign thread can submit jobs via:

- `submit fnCall(args)` which submits a function to the Weave runtime and gives you an awaitable `Pending` handle.
- `newPledge`, `fulfill` and `submitDelayed` (experimental) to delay a task until some dependencies are met. This allows expressing precise data dependencies and producer-consumer relationships.
- `waitFor(Pending)` which await a Pending job result and blocks the current thread
- `isReady(Pending)` will check if `waitFor` will actually block or return the result immediately.
- `isSubmitted(job)` allows you to build speculative algorithm where a job is submitted only if certain conditions are valid.

Within a job, tasks can be spawned and parallel for constructs can be used.

If `runInBackground()` does not provide fine enough control, a Weave background event loop
can be customized using the following primitive:
- at a very low-level:
- The root thread primitives: `init(Weave)` and `exit(Weave)`
- `processAllandTryPark(Weave)`: Process all pending jobs and try sleeping. The sleep may fail to avoid deadlocks
if a job is submitted concurrently. This should be used in a `while true` event loop.
- at a medium level:
- `runForever(Weave)`: Start a never-ending event loop that processes all pending jobs and sleep until new work arrives.
- `runUntil(Weave, signalShutdown: ptr Atomic[bool])`: Start an event-loop that quits on signal.

For example:
```Nim
proc runUntil*(_: typedesc[Weave], signal: ptr Atomic[bool]) =
## Start a Weave event loop until signal is true on the current thread.
## It wakes-up on job submission, handles multithreaded load balancing,
## help process tasks
## and spin down when there is no work anymore.
preCondition: not signal.isNil
while not signal[].load(moRelaxed):
processAllandTryPark(Weave)
syncRoot(Weave)
proc runInBackground*(
_: typedesc[Weave],
signalShutdown: ptr Atomic[bool]
): Thread[ptr Atomic[bool]] =
## Start the Weave runtime on a background thread.
## It wakes-up on job submissions, handles multithreaded load balancing,
## help process tasks
## and spin down when there is no work anymore.
proc eventLoop(shutdown: ptr Atomic[bool]) {.thread.} =
init(Weave)
Weave.runUntil(shutdown)
exit(Weave)
result.createThread(eventLoop, signalShutdown)
```

## Table of Contents

- [Weave, a state-of-the-art multithreading runtime](#weave-a-state-of-the-art-multithreading-runtime)
Expand All @@ -194,6 +278,9 @@ Weave uses Nim's `countProcessors()` in `std/cpuinfo`
- [Data parallelism](#data-parallelism)
- [Strided loops](#strided-loops)
- [Complete list](#complete-list)
- [Root thread](#root-thread)
- [Weave worker thread](#weave-worker-thread)
- [Foreign thread & Background service (experimental)](#foreign-thread--background-service-experimental)
- [Table of Contents](#table-of-contents)
- [Platforms supported](#platforms-supported)
- [C++ compilation](#c-compilation)
Expand Down
12 changes: 12 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,18 @@
#### Features

- Added `isReady(Flowvar)` which will return true is `sync` would block on that Flowvar or if the result is actually immediately available.
- `syncScope:` to block until all tasks and their (recursive)
descendants are completed.
- Dataflow parallelism can now be used with the C++ target.
- Weave as a background service (experimental).
Weave can now be started on a dedicated thread
and handle **jobs** from any thread.
To do this, Weave can be started with `thr.runInBackground(Weave)`.
Job providing threads should call `setupSubmitterThread(Weave)`,
and can now use `submit function(args...)` and `waitFor(PendingResult)`
to have Weave work as a job system.
Jobs are handled in FIFO order.
Within a job, tasks can be spawned.

### v0.4.0 - April 2020 - "Bespoke"

Expand Down
13 changes: 11 additions & 2 deletions weave.nim
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ import
weave/state_machines/[sync_root, sync, sync_scope],
weave/datatypes/flowvars,
weave/cross_thread_com/pledges,
weave/contexts
weave/contexts,
weave/[executor, parallel_jobs]

export
Flowvar, Weave,
Expand All @@ -28,4 +29,12 @@ export
syncScope,
# Experimental dataflow parallelism
spawnDelayed, Pledge,
fulfill, newPledge
fulfill, newPledge,
# Experimental background service
Pending,
submit, submitDelayed,
runInBackground, waitUntilReady,
setupSubmitterThread, teardownSubmitterThread,
waitFor, isSubmitted,
processAllandTryPark,
runForever, runUntil
10 changes: 7 additions & 3 deletions weave.nimble
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Package

version = "0.4.0"
version = "0.4.9"
author = "Mamy André-Ratsimbazafy"
description = "a state-of-the-art ùultithreading runtime"
license = "MIT or Apache License 2.0"
Expand Down Expand Up @@ -45,11 +45,13 @@ task test, "Run Weave tests":
test "", "weave/parallel_for.nim"
test "", "weave/parallel_for_staged.nim"
test "", "weave/parallel_reduce.nim"
test "", "weave/parallel_jobs.nim"

test "-d:WV_LazyFlowvar", "weave/parallel_tasks.nim"
test "-d:WV_LazyFlowvar", "weave/parallel_for.nim"
test "-d:WV_LazyFlowvar", "weave/parallel_for_staged.nim"
test "-d:WV_LazyFlowvar", "weave/parallel_reduce.nim"
test "-d:WV_LazyFlowvar", "weave/parallel_jobs.nim"

test "", "benchmarks/dfs/weave_dfs.nim"
test "", "benchmarks/fibonacci/weave_fib.nim"
Expand All @@ -70,7 +72,7 @@ task test, "Run Weave tests":
test "-d:WV_LazyFlowvar", "benchmarks/heat/weave_heat.nim"
test "-d:WV_LazyFlowvar", "benchmarks/matrix_transposition/weave_transposes.nim"
test "-d:WV_LazyFlowvar", "benchmarks/nqueens/weave_nqueens.nim"
when not defined(windows):
when not defined(windows): # Timer impl missing
test "-d:WV_LazyFlowvar", "benchmarks/single_task_producer/weave_spc.nim"
test "-d:WV_LazyFlowvar", "benchmarks/bouncing_producer_consumer/weave_bpc.nim"
when defined(i386) or defined(amd64):
Expand Down Expand Up @@ -107,11 +109,13 @@ task test_gc_arc, "Run Weave tests with --gc:arc":
test "--gc:arc", "weave/parallel_for.nim"
test "--gc:arc", "weave/parallel_for_staged.nim"
test "--gc:arc", "weave/parallel_reduce.nim"
test "--gc:arc", "weave/parallel_jobs.nim"

test "--gc:arc -d:WV_LazyFlowvar", "weave/parallel_tasks.nim"
test "--gc:arc -d:WV_LazyFlowvar", "weave/parallel_for.nim"
test "--gc:arc -d:WV_LazyFlowvar", "weave/parallel_for_staged.nim"
test "--gc:arc -d:WV_LazyFlowvar", "weave/parallel_reduce.nim"
test "--gc:arc -d:WV_LazyFlowvar", "weave/parallel_jobs.nim"

test "--gc:arc", "benchmarks/dfs/weave_dfs.nim"
test "--gc:arc", "benchmarks/fibonacci/weave_fib.nim"
Expand All @@ -132,7 +136,7 @@ task test_gc_arc, "Run Weave tests with --gc:arc":
test "--gc:arc -d:WV_LazyFlowvar", "benchmarks/heat/weave_heat.nim"
test "--gc:arc -d:WV_LazyFlowvar", "benchmarks/matrix_transposition/weave_transposes.nim"
test "--gc:arc -d:WV_LazyFlowvar", "benchmarks/nqueens/weave_nqueens.nim"
when not defined(windows):
when not defined(windows): # Timer impl missing
test "--gc:arc -d:WV_LazyFlowvar", "benchmarks/single_task_producer/weave_spc.nim"
test "--gc:arc -d:WV_LazyFlowvar", "benchmarks/bouncing_producer_consumer/weave_bpc.nim"
when defined(i386) or defined(amd64):
Expand Down
6 changes: 6 additions & 0 deletions weave/config.nim
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

import strutils

{.push gcsafe.}

# Platform support
# ----------------------------------------------------------------------------------

Expand Down Expand Up @@ -100,6 +102,10 @@ template debugTermination*(body: untyped): untyped =
when defined(WV_DebugTermination) or defined(WV_Debug):
block: {.noSideEffect, gcsafe.}: body

template debugExecutor*(body: untyped): untyped =
when defined(WV_DebugExecutor) or defined(WV_Debug):
block: {.noSideEffect, gcsafe.}: body

template debug*(body: untyped): untyped =
when defined(WV_Debug):
block: {.noSideEffect, gcsafe.}: body
Expand Down
Loading

0 comments on commit 42360c3

Please sign in to comment.