From 5f45cfe1bcda90467cba59f20303f56f8050bbf1 Mon Sep 17 00:00:00 2001 From: Paul T Date: Fri, 3 May 2024 09:55:15 -0400 Subject: [PATCH 01/15] test: add new test for wait_for_tasks() --- test/source/thread_pool.cpp | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/test/source/thread_pool.cpp b/test/source/thread_pool.cpp index aeb6b0b..6e3ff84 100644 --- a/test/source/thread_pool.cpp +++ b/test/source/thread_pool.cpp @@ -467,6 +467,33 @@ TEST_CASE("Ensure wait_for_tasks() properly blocks current execution.") { pool.wait_for_tasks(); CHECK_EQ(counter.load(), total_tasks); + + class counter_wrapper { + public: + counter_wrapper() = default; + std::atomic_int counter = 0; + + void increment_counter() { counter.fetch_add(1); } + }; + + dp::thread_pool local_pool{}; + std::vector counts(17); + for (size_t i = 0; i < 17; i++) { + counter_wrapper cnt_wrp{}; + + for (size_t var1 = 0; var1 < 17; var1++) { + for (int var2 = 0; var2 < 12; var2++) { + local_pool.enqueue_detach([&cnt_wrp]() { cnt_wrp.increment_counter(); }); + } + } + local_pool.wait_for_tasks(); + // std::cout << cnt_wrp.counter << std::endl; + counts[i] = cnt_wrp.counter; + } + + auto all_correct_count = + std::ranges::all_of(counts, [](int count) { return count == 17 * 12; }); + CHECK(all_correct_count); } TEST_CASE("Initialization function is called") { From 1396af71ec2bde567584cd51ba3171a156c6a759 Mon Sep 17 00:00:00 2001 From: Paul T Date: Fri, 3 May 2024 10:12:03 -0400 Subject: [PATCH 02/15] docs: rename variable and add comments --- include/thread_pool/thread_pool.h | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/include/thread_pool/thread_pool.h b/include/thread_pool/thread_pool.h index 8ff6e3e..69cd884 100644 --- a/include/thread_pool/thread_pool.h +++ b/include/thread_pool/thread_pool.h @@ -56,9 +56,15 @@ namespace dp { // invoke the task while (auto task = tasks_[id].tasks.pop_front()) { try { + // decrement the unassigned tasks as the task is now going + // to be executed unassigned_tasks_.fetch_sub(1, std::memory_order_release); + // invoke the task std::invoke(std::move(task.value())); - completed_tasks_.fetch_sub(1, std::memory_order_release); + // the above task can push more work onto the pool, so we + // only decrement the in flights once the task has been + // executed because now it's now longer "in flight" + in_flight_tasks_.fetch_sub(1, std::memory_order_release); } catch (...) { } } @@ -70,7 +76,7 @@ namespace dp { // steal a task unassigned_tasks_.fetch_sub(1, std::memory_order_release); std::invoke(std::move(task.value())); - completed_tasks_.fetch_sub(1, std::memory_order_release); + in_flight_tasks_.fetch_sub(1, std::memory_order_release); // stop stealing once we have invoked a stolen task break; } @@ -82,7 +88,7 @@ namespace dp { priority_queue_.rotate_to_front(id); // check if all tasks are completed and release the barrier (binary // semaphore) - if (completed_tasks_.load(std::memory_order_acquire) == 0) { + if (in_flight_tasks_.load(std::memory_order_acquire) == 0) { threads_done_.release(); } @@ -221,7 +227,7 @@ namespace dp { * @details This function will block until all tasks have been completed. */ void wait_for_tasks() { - if (completed_tasks_.load(std::memory_order_acquire) > 0) { + if (in_flight_tasks_.load(std::memory_order_acquire) > 0) { // wait for all tasks to finish threads_done_.acquire(); } @@ -237,7 +243,7 @@ namespace dp { } auto i = *(i_opt); unassigned_tasks_.fetch_add(1, std::memory_order_relaxed); - completed_tasks_.fetch_add(1, std::memory_order_relaxed); + in_flight_tasks_.fetch_add(1, std::memory_order_relaxed); tasks_[i].tasks.push_back(std::forward(f)); tasks_[i].signal.release(); } @@ -250,7 +256,7 @@ namespace dp { std::vector threads_; std::deque tasks_; dp::thread_safe_queue priority_queue_; - std::atomic_int_fast64_t unassigned_tasks_{}, completed_tasks_{}; + std::atomic_int_fast64_t unassigned_tasks_{}, in_flight_tasks_{}; std::binary_semaphore threads_done_{0}; }; From ff9cd7f1c0e11d41a0b07d3f518aa3b38679325b Mon Sep 17 00:00:00 2001 From: Paul T Date: Fri, 3 May 2024 10:51:33 -0400 Subject: [PATCH 03/15] test: split test to two separate ones --- test/source/thread_pool.cpp | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/test/source/thread_pool.cpp b/test/source/thread_pool.cpp index 6e3ff84..535d863 100644 --- a/test/source/thread_pool.cpp +++ b/test/source/thread_pool.cpp @@ -467,18 +467,21 @@ TEST_CASE("Ensure wait_for_tasks() properly blocks current execution.") { pool.wait_for_tasks(); CHECK_EQ(counter.load(), total_tasks); +} +TEST_CASE("Ensure wait_for_tasks() properly waits for tasks to fully complete") { class counter_wrapper { public: counter_wrapper() = default; std::atomic_int counter = 0; - void increment_counter() { counter.fetch_add(1); } + void increment_counter() { counter.fetch_add(1, std::memory_order_release); } }; dp::thread_pool local_pool{}; - std::vector counts(17); - for (size_t i = 0; i < 17; i++) { + constexpr auto task_count = 10; + std::vector counts(task_count); + for (size_t i = 0; i < task_count; i++) { counter_wrapper cnt_wrp{}; for (size_t var1 = 0; var1 < 17; var1++) { @@ -488,11 +491,13 @@ TEST_CASE("Ensure wait_for_tasks() properly blocks current execution.") { } local_pool.wait_for_tasks(); // std::cout << cnt_wrp.counter << std::endl; - counts[i] = cnt_wrp.counter; + counts[i] = cnt_wrp.counter.load(std::memory_order_acquire); } auto all_correct_count = std::ranges::all_of(counts, [](int count) { return count == 17 * 12; }); + const auto sum = std::accumulate(counts.begin(), counts.end(), 0); + CHECK_EQ(sum, 17 * 12 * task_count); CHECK(all_correct_count); } From e6aa0c44f5457a2cc4da6bfc130a8d3d1e19406f Mon Sep 17 00:00:00 2001 From: Paul T Date: Fri, 3 May 2024 10:52:22 -0400 Subject: [PATCH 04/15] chore: update memory ordering of atomics --- include/thread_pool/thread_pool.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/include/thread_pool/thread_pool.h b/include/thread_pool/thread_pool.h index 69cd884..e2db2e1 100644 --- a/include/thread_pool/thread_pool.h +++ b/include/thread_pool/thread_pool.h @@ -242,8 +242,8 @@ namespace dp { return; } auto i = *(i_opt); - unassigned_tasks_.fetch_add(1, std::memory_order_relaxed); - in_flight_tasks_.fetch_add(1, std::memory_order_relaxed); + unassigned_tasks_.fetch_add(1, std::memory_order_release); + in_flight_tasks_.fetch_add(1, std::memory_order_release); tasks_[i].tasks.push_back(std::forward(f)); tasks_[i].signal.release(); } From fe90c5511ffded1a014c1d23746d1535b445079c Mon Sep 17 00:00:00 2001 From: Paul T Date: Fri, 3 May 2024 10:53:36 -0400 Subject: [PATCH 05/15] wip: use std::barrier in wait_for_tasks() Use std::barrier with wait for tasks. This requires std::move_only_function to be available. --- include/thread_pool/thread_pool.h | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/include/thread_pool/thread_pool.h b/include/thread_pool/thread_pool.h index e2db2e1..085a9f8 100644 --- a/include/thread_pool/thread_pool.h +++ b/include/thread_pool/thread_pool.h @@ -23,8 +23,10 @@ namespace dp { #ifdef __cpp_lib_move_only_function using default_function_type = std::move_only_function; + using barrier_function_type = std::move_only_function; #else using default_function_type = std::function; + using barrier_function_type = []() noexcept {}; #endif } // namespace details @@ -36,11 +38,14 @@ namespace dp { public: template > requires std::invocable && - std::is_same_v> + std::is_same_v> explicit thread_pool( const unsigned int &number_of_threads = std::thread::hardware_concurrency(), InitializationFunction init = [](std::size_t) {}) - : tasks_(number_of_threads) { + : tasks_(number_of_threads), threads_done_(number_of_threads, [this]() noexcept { + threads_complete_signal_.release(); + }) { std::size_t current_id = 0; for (std::size_t i = 0; i < number_of_threads; ++i) { priority_queue_.push_back(size_t(current_id)); @@ -89,7 +94,7 @@ namespace dp { // check if all tasks are completed and release the barrier (binary // semaphore) if (in_flight_tasks_.load(std::memory_order_acquire) == 0) { - threads_done_.release(); + threads_complete_signal_.release(); } } while (!stop_tok.stop_requested()); @@ -105,6 +110,9 @@ namespace dp { // remove our thread from the priority queue std::ignore = priority_queue_.pop_back(); + + // remove one item from the barrier + threads_done_.arrive_and_drop(); } } } @@ -229,7 +237,7 @@ namespace dp { void wait_for_tasks() { if (in_flight_tasks_.load(std::memory_order_acquire) > 0) { // wait for all tasks to finish - threads_done_.acquire(); + threads_complete_signal_.acquire(); } } @@ -257,7 +265,8 @@ namespace dp { std::deque tasks_; dp::thread_safe_queue priority_queue_; std::atomic_int_fast64_t unassigned_tasks_{}, in_flight_tasks_{}; - std::binary_semaphore threads_done_{0}; + std::barrier threads_done_; + std::binary_semaphore threads_complete_signal_{0}; }; /** From d586238d193ee183bfa4a139739156a280092435 Mon Sep 17 00:00:00 2001 From: Paul T Date: Fri, 3 May 2024 11:24:57 -0400 Subject: [PATCH 06/15] chore: use std::atomic_bool instead of a barrier --- include/thread_pool/thread_pool.h | 21 ++++++--------------- 1 file changed, 6 insertions(+), 15 deletions(-) diff --git a/include/thread_pool/thread_pool.h b/include/thread_pool/thread_pool.h index 085a9f8..29b2fa9 100644 --- a/include/thread_pool/thread_pool.h +++ b/include/thread_pool/thread_pool.h @@ -1,7 +1,6 @@ #pragma once #include -#include #include #include #include @@ -23,10 +22,8 @@ namespace dp { #ifdef __cpp_lib_move_only_function using default_function_type = std::move_only_function; - using barrier_function_type = std::move_only_function; #else using default_function_type = std::function; - using barrier_function_type = []() noexcept {}; #endif } // namespace details @@ -38,14 +35,11 @@ namespace dp { public: template > requires std::invocable && - std::is_same_v> + std::is_same_v> explicit thread_pool( const unsigned int &number_of_threads = std::thread::hardware_concurrency(), InitializationFunction init = [](std::size_t) {}) - : tasks_(number_of_threads), threads_done_(number_of_threads, [this]() noexcept { - threads_complete_signal_.release(); - }) { + : tasks_(number_of_threads) { std::size_t current_id = 0; for (std::size_t i = 0; i < number_of_threads; ++i) { priority_queue_.push_back(size_t(current_id)); @@ -94,7 +88,8 @@ namespace dp { // check if all tasks are completed and release the barrier (binary // semaphore) if (in_flight_tasks_.load(std::memory_order_acquire) == 0) { - threads_complete_signal_.release(); + threads_complete_signal_ = true; + threads_complete_signal_.notify_one(); } } while (!stop_tok.stop_requested()); @@ -110,9 +105,6 @@ namespace dp { // remove our thread from the priority queue std::ignore = priority_queue_.pop_back(); - - // remove one item from the barrier - threads_done_.arrive_and_drop(); } } } @@ -237,7 +229,7 @@ namespace dp { void wait_for_tasks() { if (in_flight_tasks_.load(std::memory_order_acquire) > 0) { // wait for all tasks to finish - threads_complete_signal_.acquire(); + threads_complete_signal_.wait(false); } } @@ -265,8 +257,7 @@ namespace dp { std::deque tasks_; dp::thread_safe_queue priority_queue_; std::atomic_int_fast64_t unassigned_tasks_{}, in_flight_tasks_{}; - std::barrier threads_done_; - std::binary_semaphore threads_complete_signal_{0}; + std::atomic_bool threads_complete_signal_{false}; }; /** From 4a4ead70f1d8505a15b76caec0548ff9734d6488 Mon Sep 17 00:00:00 2001 From: Justin Davis Date: Wed, 3 Jul 2024 16:19:17 -0400 Subject: [PATCH 07/15] simplify test to remove extra variables --- test/source/thread_pool.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/test/source/thread_pool.cpp b/test/source/thread_pool.cpp index 535d863..0d8925a 100644 --- a/test/source/thread_pool.cpp +++ b/test/source/thread_pool.cpp @@ -472,7 +472,6 @@ TEST_CASE("Ensure wait_for_tasks() properly blocks current execution.") { TEST_CASE("Ensure wait_for_tasks() properly waits for tasks to fully complete") { class counter_wrapper { public: - counter_wrapper() = default; std::atomic_int counter = 0; void increment_counter() { counter.fetch_add(1, std::memory_order_release); } @@ -480,7 +479,7 @@ TEST_CASE("Ensure wait_for_tasks() properly waits for tasks to fully complete") dp::thread_pool local_pool{}; constexpr auto task_count = 10; - std::vector counts(task_count); + std::array counts{{0, 0, 0, 0, 0, 0, 0, 0, 0, 0}}; for (size_t i = 0; i < task_count; i++) { counter_wrapper cnt_wrp{}; From ef43577300491790c61beafca7cc395f4be0f248 Mon Sep 17 00:00:00 2001 From: Justin Davis Date: Wed, 3 Jul 2024 16:21:00 -0400 Subject: [PATCH 08/15] reset the thread complete signal any time we add new tasks to an empty queue. --- include/thread_pool/thread_pool.h | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/include/thread_pool/thread_pool.h b/include/thread_pool/thread_pool.h index 29b2fa9..6da0da7 100644 --- a/include/thread_pool/thread_pool.h +++ b/include/thread_pool/thread_pool.h @@ -243,7 +243,13 @@ namespace dp { } auto i = *(i_opt); unassigned_tasks_.fetch_add(1, std::memory_order_release); - in_flight_tasks_.fetch_add(1, std::memory_order_release); + const auto prev_in_flight = in_flight_tasks_.fetch_add(1, std::memory_order_release); + + // reset the in flight signal if the list was previously empty + if (prev_in_flight == 0) { + threads_complete_signal_.store(false, std::memory_order_release); + } + tasks_[i].tasks.push_back(std::forward(f)); tasks_[i].signal.release(); } @@ -256,7 +262,8 @@ namespace dp { std::vector threads_; std::deque tasks_; dp::thread_safe_queue priority_queue_; - std::atomic_int_fast64_t unassigned_tasks_{}, in_flight_tasks_{}; + // guarantee these get zero-initialized + std::atomic_int_fast64_t unassigned_tasks_{0}, in_flight_tasks_{0}; std::atomic_bool threads_complete_signal_{false}; }; From 214ac70aad3fcb850f27610461d66b614d5ba2a0 Mon Sep 17 00:00:00 2001 From: Justin Davis Date: Wed, 3 Jul 2024 16:23:41 -0400 Subject: [PATCH 09/15] add extra test --- test/source/thread_pool.cpp | 49 +++++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/test/source/thread_pool.cpp b/test/source/thread_pool.cpp index 0d8925a..15350c4 100644 --- a/test/source/thread_pool.cpp +++ b/test/source/thread_pool.cpp @@ -500,6 +500,55 @@ TEST_CASE("Ensure wait_for_tasks() properly waits for tasks to fully complete") CHECK(all_correct_count); } +TEST_CASE("Ensure wait_for_tasks() can be called multiple times on the same pool") { + class counter_wrapper { + public: + std::atomic_int counter = 0; + + void increment_counter() { counter.fetch_add(1, std::memory_order_release); } + }; + + dp::thread_pool local_pool{}; + constexpr auto task_count = 10; + std::array counts{{0, 0, 0, 0, 0, 0, 0, 0, 0, 0}}; + for (size_t i = 0; i < task_count; i++) { + counter_wrapper cnt_wrp{}; + + for (size_t var1 = 0; var1 < 17; var1++) { + for (int var2 = 0; var2 < 12; var2++) { + local_pool.enqueue_detach([&cnt_wrp]() { cnt_wrp.increment_counter(); }); + } + } + local_pool.wait_for_tasks(); + // std::cout << cnt_wrp.counter << std::endl; + counts[i] = cnt_wrp.counter.load(std::memory_order_acquire); + } + + auto all_correct_count = + std::ranges::all_of(counts, [](int count) { return count == 17 * 12; }); + auto sum = std::accumulate(counts.begin(), counts.end(), 0); + CHECK_EQ(sum, 17 * 12 * task_count); + CHECK(all_correct_count); + + for (size_t i = 0; i < task_count; i++) { + counter_wrapper cnt_wrp{}; + + for (size_t var1 = 0; var1 < 17; var1++) { + for (int var2 = 0; var2 < 12; var2++) { + local_pool.enqueue_detach([&cnt_wrp]() { cnt_wrp.increment_counter(); }); + } + } + local_pool.wait_for_tasks(); + // std::cout << cnt_wrp.counter << std::endl; + counts[i] = cnt_wrp.counter.load(std::memory_order_acquire); + } + + all_correct_count = std::ranges::all_of(counts, [](int count) { return count == 17 * 12; }); + sum = std::accumulate(counts.begin(), counts.end(), 0); + CHECK_EQ(sum, 17 * 12 * task_count); + CHECK(all_correct_count); +} + TEST_CASE("Initialization function is called") { std::atomic_int counter = 0; { From 798ad5e877de0e7cce3db919bdac73fe1250625c Mon Sep 17 00:00:00 2001 From: Paul T Date: Wed, 3 Jul 2024 17:22:38 -0400 Subject: [PATCH 10/15] chore: remove unecessary try/catch We suppress exceptions before enquing the task so it doesn't seem necessary to have try/catch when we invoke the enqueued task --- include/thread_pool/thread_pool.h | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/include/thread_pool/thread_pool.h b/include/thread_pool/thread_pool.h index 6da0da7..8b6840c 100644 --- a/include/thread_pool/thread_pool.h +++ b/include/thread_pool/thread_pool.h @@ -54,18 +54,15 @@ namespace dp { do { // invoke the task while (auto task = tasks_[id].tasks.pop_front()) { - try { - // decrement the unassigned tasks as the task is now going - // to be executed - unassigned_tasks_.fetch_sub(1, std::memory_order_release); - // invoke the task - std::invoke(std::move(task.value())); - // the above task can push more work onto the pool, so we - // only decrement the in flights once the task has been - // executed because now it's now longer "in flight" - in_flight_tasks_.fetch_sub(1, std::memory_order_release); - } catch (...) { - } + // decrement the unassigned tasks as the task is now going + // to be executed + unassigned_tasks_.fetch_sub(1, std::memory_order_release); + // invoke the task + std::invoke(std::move(task.value())); + // the above task can push more work onto the pool, so we + // only decrement the in flights once the task has been + // executed because now it's now longer "in flight" + in_flight_tasks_.fetch_sub(1, std::memory_order_release); } // try to steal a task @@ -241,6 +238,7 @@ namespace dp { // would only be a problem if there are zero threads return; } + // get the index auto i = *(i_opt); unassigned_tasks_.fetch_add(1, std::memory_order_release); const auto prev_in_flight = in_flight_tasks_.fetch_add(1, std::memory_order_release); @@ -250,6 +248,7 @@ namespace dp { threads_complete_signal_.store(false, std::memory_order_release); } + // assign work tasks_[i].tasks.push_back(std::forward(f)); tasks_[i].signal.release(); } From dcd047cf18631d0000b14e50f26865c7947210c5 Mon Sep 17 00:00:00 2001 From: Paul T Date: Wed, 3 Jul 2024 17:23:01 -0400 Subject: [PATCH 11/15] test: add variety to tests and added missing include --- test/source/thread_pool.cpp | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/test/source/thread_pool.cpp b/test/source/thread_pool.cpp index 15350c4..af311bf 100644 --- a/test/source/thread_pool.cpp +++ b/test/source/thread_pool.cpp @@ -9,6 +9,7 @@ #include #include #include +#include auto multiply(int a, int b) { return a * b; } @@ -514,8 +515,8 @@ TEST_CASE("Ensure wait_for_tasks() can be called multiple times on the same pool for (size_t i = 0; i < task_count; i++) { counter_wrapper cnt_wrp{}; - for (size_t var1 = 0; var1 < 17; var1++) { - for (int var2 = 0; var2 < 12; var2++) { + for (size_t var1 = 0; var1 < 16; var1++) { + for (int var2 = 0; var2 < 13; var2++) { local_pool.enqueue_detach([&cnt_wrp]() { cnt_wrp.increment_counter(); }); } } @@ -525,9 +526,9 @@ TEST_CASE("Ensure wait_for_tasks() can be called multiple times on the same pool } auto all_correct_count = - std::ranges::all_of(counts, [](int count) { return count == 17 * 12; }); + std::ranges::all_of(counts, [](int count) { return count == 16 * 13; }); auto sum = std::accumulate(counts.begin(), counts.end(), 0); - CHECK_EQ(sum, 17 * 12 * task_count); + CHECK_EQ(sum, 16 * 13 * task_count); CHECK(all_correct_count); for (size_t i = 0; i < task_count; i++) { From c97f6389d461e05e694c88624017edcf111944b3 Mon Sep 17 00:00:00 2001 From: Paul T Date: Wed, 3 Jul 2024 17:26:49 -0400 Subject: [PATCH 12/15] format: auto formatting fixes --- test/source/thread_pool.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/source/thread_pool.cpp b/test/source/thread_pool.cpp index af311bf..dc16faa 100644 --- a/test/source/thread_pool.cpp +++ b/test/source/thread_pool.cpp @@ -4,12 +4,12 @@ #include #include +#include #include #include #include #include #include -#include auto multiply(int a, int b) { return a * b; } From 71eefd71888b03323d1a8ad5550e9666c51b4449 Mon Sep 17 00:00:00 2001 From: Paul T Date: Thu, 4 Jul 2024 07:41:53 -0400 Subject: [PATCH 13/15] docs: add more doc strings and comments --- include/thread_pool/thread_pool.h | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/include/thread_pool/thread_pool.h b/include/thread_pool/thread_pool.h index 8b6840c..f761e4b 100644 --- a/include/thread_pool/thread_pool.h +++ b/include/thread_pool/thread_pool.h @@ -217,6 +217,11 @@ namespace dp { })); } + /** + * @brief Returns the number of threads in the pool. + * + * @return std::size_t The number of threads in the pool. + */ [[nodiscard]] auto size() const { return threads_.size(); } /** @@ -240,6 +245,8 @@ namespace dp { } // get the index auto i = *(i_opt); + + // increment the unassigned tasks and in flight tasks unassigned_tasks_.fetch_add(1, std::memory_order_release); const auto prev_in_flight = in_flight_tasks_.fetch_add(1, std::memory_order_release); From 5bce59ec93e7fea949b938492f6f3a76d52e86b4 Mon Sep 17 00:00:00 2001 From: Paul T Date: Thu, 4 Jul 2024 07:42:15 -0400 Subject: [PATCH 14/15] fix: catch exceptions in the thread init function --- include/thread_pool/thread_pool.h | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/include/thread_pool/thread_pool.h b/include/thread_pool/thread_pool.h index f761e4b..cbcfd70 100644 --- a/include/thread_pool/thread_pool.h +++ b/include/thread_pool/thread_pool.h @@ -46,7 +46,13 @@ namespace dp { try { threads_.emplace_back([&, id = current_id, init](const std::stop_token &stop_tok) { - init(id); + // invoke the init function on the thread + try { + std::invoke(init, id); + } catch (...) { + // suppress exceptions + } + do { // wait until signaled tasks_[id].signal.acquire(); From 9b5486da451faebc34b44a396915365b95b8b772 Mon Sep 17 00:00:00 2001 From: Paul T Date: Thu, 4 Jul 2024 07:42:33 -0400 Subject: [PATCH 15/15] chore: use `store` with atomic bool --- include/thread_pool/thread_pool.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/thread_pool/thread_pool.h b/include/thread_pool/thread_pool.h index cbcfd70..48b00e6 100644 --- a/include/thread_pool/thread_pool.h +++ b/include/thread_pool/thread_pool.h @@ -91,7 +91,7 @@ namespace dp { // check if all tasks are completed and release the barrier (binary // semaphore) if (in_flight_tasks_.load(std::memory_order_acquire) == 0) { - threads_complete_signal_ = true; + threads_complete_signal_.store(true, std::memory_order_release); threads_complete_signal_.notify_one(); }