From 4eb8a8b6e0701e41ff87f5548d01835e79a077ab Mon Sep 17 00:00:00 2001 From: Anatoly Volkov Date: Wed, 3 Jul 2024 02:33:12 -0700 Subject: [PATCH] Add init to env constructor --- .../src/externals/core_threading_win_dll.cpp | 18 ++--- cpp/daal/src/services/env_detect.cpp | 16 ++++- cpp/daal/src/threading/threading.cpp | 65 ++++++++++++------- cpp/daal/src/threading/threading.h | 12 ++-- 4 files changed, 71 insertions(+), 40 deletions(-) diff --git a/cpp/daal/src/externals/core_threading_win_dll.cpp b/cpp/daal/src/externals/core_threading_win_dll.cpp index bfd7ac01a32..4de140b8236 100644 --- a/cpp/daal/src/externals/core_threading_win_dll.cpp +++ b/cpp/daal/src/externals/core_threading_win_dll.cpp @@ -142,8 +142,8 @@ typedef void (*_daal_run_task_group_t)(void * taskGroupPtr, daal::task * t); typedef void (*_daal_wait_task_group_t)(void * taskGroupPtr); typedef bool (*_daal_is_in_parallel_t)(); -typedef void (*_daal_tbb_task_scheduler_free_t)(void *& globalControl); -typedef size_t (*_setNumberOfThreads_t)(const size_t, void **); +// typedef void (*_daal_tbb_task_scheduler_free_t)(void *& globalControl); +typedef size_t (*_setNumberOfThreads_t)(const size_t); //, void **); typedef void * (*_daal_threader_env_t)(); typedef void (*_daal_parallel_sort_int32_t)(int *, int *); @@ -205,10 +205,10 @@ static _daal_del_task_group_t _daal_del_task_group_ptr = NULL; static _daal_run_task_group_t _daal_run_task_group_ptr = NULL; static _daal_wait_task_group_t _daal_wait_task_group_ptr = NULL; -static _daal_is_in_parallel_t _daal_is_in_parallel_ptr = NULL; -static _daal_tbb_task_scheduler_free_t _daal_tbb_task_scheduler_free_ptr = NULL; -static _setNumberOfThreads_t _setNumberOfThreads_ptr = NULL; -static _daal_threader_env_t _daal_threader_env_ptr = NULL; +static _daal_is_in_parallel_t _daal_is_in_parallel_ptr = NULL; +// static _daal_tbb_task_scheduler_free_t _daal_tbb_task_scheduler_free_ptr = NULL; +static _setNumberOfThreads_t _setNumberOfThreads_ptr = NULL; +static _daal_threader_env_t _daal_threader_env_ptr = NULL; static _daal_parallel_sort_int32_t _daal_parallel_sort_int32_ptr = NULL; static _daal_parallel_sort_uint64_t _daal_parallel_sort_uint64_ptr = NULL; @@ -636,6 +636,7 @@ DAAL_EXPORT bool _daal_is_in_parallel() return _daal_is_in_parallel_ptr(); } +/* DAAL_EXPORT void _daal_tbb_task_scheduler_free(void *& init) { if (init == NULL) @@ -656,15 +657,16 @@ DAAL_EXPORT void _daal_tbb_task_scheduler_free(void *& init) } return _daal_tbb_task_scheduler_free_ptr(init); } +*/ -DAAL_EXPORT size_t _setNumberOfThreads(const size_t numThreads, void ** init) +DAAL_EXPORT size_t _setNumberOfThreads(const size_t numThreads /*, void ** init*/) { load_daal_thr_dll(); if (_setNumberOfThreads_ptr == NULL) { _setNumberOfThreads_ptr = (_setNumberOfThreads_t)load_daal_thr_func("_setNumberOfThreads"); } - return _setNumberOfThreads_ptr(numThreads, init); + return _setNumberOfThreads_ptr(numThreads /*, init*/); } DAAL_EXPORT void * _daal_threader_env() diff --git a/cpp/daal/src/services/env_detect.cpp b/cpp/daal/src/services/env_detect.cpp index cf88b8a385a..036f2801702 100644 --- a/cpp/daal/src/services/env_detect.cpp +++ b/cpp/daal/src/services/env_detect.cpp @@ -28,6 +28,7 @@ #include "src/externals/service_service.h" #include "src/threading/threading.h" #include "services/error_indexes.h" +#include #include "src/services/service_topo.h" #include "src/threading/service_thread_pinner.h" @@ -127,9 +128,11 @@ DAAL_EXPORT void daal::services::Environment::setDynamicLibraryThreadingTypeOnWi DAAL_EXPORT daal::services::Environment::Environment() /* : _globalControl {}*/ { + std::cerr << "Environment constructor" << std::endl; _env.cpuid_init_flag = false; _env.cpuid = -1; this->setDefaultExecutionContext(internal::CpuExecutionContext()); + daal::services::Environment::initNumberOfThreads(); } DAAL_EXPORT daal::services::Environment::Environment(const Environment & e) : daal::services::Environment::Environment() {} @@ -137,6 +140,7 @@ DAAL_EXPORT daal::services::Environment::Environment(const Environment & e) : da DAAL_EXPORT void daal::services::Environment::initNumberOfThreads() { if (isInit) return; + std::cerr << "Inside init" << std::endl; /* if HT enabled - set _numThreads to physical cores num */ if (daal::internal::ServiceInst::serv_get_ht()) @@ -145,16 +149,24 @@ DAAL_EXPORT void daal::services::Environment::initNumberOfThreads() int ncores = daal::internal::ServiceInst::serv_get_ncpus() * daal::internal::ServiceInst::serv_get_ncorespercpu(); /* Re-set number of threads if ncores is valid and different to _numThreads */ - if ((ncores > 0) && (ncores < _daal_threader_get_max_threads())) + + std::cerr << "Init with " << ncores << std::endl; + if (ncores > 0) { daal::services::Environment::setNumberOfThreads(ncores); } } + else + { + std::cerr << "Init with " << (_daal_threader_get_max_threads()) << std::endl; + daal::services::Environment::setNumberOfThreads(_daal_threader_get_max_threads()); + } isInit = true; } DAAL_EXPORT daal::services::Environment::~Environment() { + std::cerr << "Env destructor" << std::endl; daal::services::daal_free_buffers(); // _daal_tbb_task_scheduler_free(_globalControl); } @@ -171,7 +183,7 @@ void daal::services::Environment::_cpu_detect(int enable) DAAL_EXPORT void daal::services::Environment::setNumberOfThreads(const size_t numThreads) { isInit = true; - daal::setNumberOfThreads(numThreads /*, &_globalControl*/); + daal::setNumberOfThreads(numThreads); } DAAL_EXPORT size_t daal::services::Environment::getNumberOfThreads() const diff --git a/cpp/daal/src/threading/threading.cpp b/cpp/daal/src/threading/threading.cpp index a26298b36ec..73109fae214 100644 --- a/cpp/daal/src/threading/threading.cpp +++ b/cpp/daal/src/threading/threading.cpp @@ -35,11 +35,43 @@ #include #include #include "services/daal_atomic_int.h" +#include #if defined(TBB_INTERFACE_VERSION) && TBB_INTERFACE_VERSION >= 12002 #include #endif +namespace daal +{ +ThreaderEnvironment::ThreaderEnvironment() : _numberOfThreads(1), _taskArena(nullptr) +{ + std::cout << "ThreaderEnv constructor" << std::endl; +} +ThreaderEnvironment::~ThreaderEnvironment() +{ + std::cerr << "ThreaderEnv destructor" << std::endl; + if (_taskArena) + { + delete reinterpret_cast(_taskArena); + _taskArena = nullptr; + } +} +void ThreaderEnvironment::setNumberOfThreads(size_t value) +{ + std::cerr << "setNumberOfThreads from " << (_numberOfThreads) << " to " << value << std::endl; + if (_taskArena) + { + delete reinterpret_cast(_taskArena); + _taskArena = nullptr; + } + if (value > 1) + { + _taskArena = reinterpret_cast(new tbb::task_arena(value)); + } + _numberOfThreads = value; +} +} // namespace daal + using namespace daal::services; DAAL_EXPORT void * _threaded_scalable_malloc(const size_t size, const size_t alignment) @@ -52,24 +84,14 @@ DAAL_EXPORT void _threaded_scalable_free(void * ptr) scalable_aligned_free(ptr); } -DAAL_EXPORT void _daal_tbb_task_arena_free(void *& taskArena) -{ - // void* taskArena = daal::threader_env()->getTaskArena(); - if (taskArena) - { - delete reinterpret_cast(taskArena); - taskArena = nullptr; - } -} - -DAAL_EXPORT void _daal_tbb_task_scheduler_free(void *& globalControl) -{ - if (globalControl) - { - delete reinterpret_cast(globalControl); - globalControl = nullptr; - } -} +// DAAL_EXPORT void _daal_tbb_task_scheduler_free(void *& globalControl) +// { +// if (globalControl) +// { +// delete reinterpret_cast(globalControl); +// globalControl = nullptr; +// } +// } DAAL_EXPORT size_t _setNumberOfThreads(const size_t numThreads) { @@ -79,13 +101,11 @@ DAAL_EXPORT size_t _setNumberOfThreads(const size_t numThreads) { const size_t maxNumThreads = _daal_threader_get_max_threads(); const size_t limitedNumThreads = numThreads < maxNumThreads ? numThreads : maxNumThreads; - void *& taskArena = daal::threader_env()->getTaskArena(); - _daal_tbb_task_arena_free(taskArena); - taskArena = reinterpret_cast(new tbb::task_arena(limitedNumThreads)); + std::cerr << "_set nthreads " << numThreads << "(max " << maxNumThreads << ")" << std::endl; daal::threader_env()->setNumberOfThreads(limitedNumThreads); return limitedNumThreads; } - _daal_tbb_task_arena_free(daal::threader_env()->getTaskArena()); + std::cerr << "_set nthreads 1" << std::endl; daal::threader_env()->setNumberOfThreads(1); return 1; } @@ -215,7 +235,6 @@ DAAL_EXPORT int64_t _daal_parallel_reduce_int32_int64(int32_t n, int64_t init, c if (daal::threader_env()->getNumberOfThreads() > 1) { tbb::task_arena * taskArena = reinterpret_cast(daal::threader_env()->getTaskArena()); - // ????? return taskArena->execute([&] { return tbb::parallel_reduce( tbb::blocked_range(0, n), init, diff --git a/cpp/daal/src/threading/threading.h b/cpp/daal/src/threading/threading.h index f5ec7d3b882..1bbe05f2de6 100644 --- a/cpp/daal/src/threading/threading.h +++ b/cpp/daal/src/threading/threading.h @@ -100,9 +100,7 @@ extern "C" DAAL_EXPORT void _daal_del_task_group(void * taskGroupPtr); DAAL_EXPORT void _daal_run_task_group(void * taskGroupPtr, daal::task * t); DAAL_EXPORT void _daal_wait_task_group(void * taskGroupPtr); - - DAAL_EXPORT void _daal_tbb_task_arena_free(void *& taskArena); - DAAL_EXPORT void _daal_tbb_task_scheduler_free(void *& globalControl); + // DAAL_EXPORT void _daal_tbb_task_scheduler_free(void *& globalControl); DAAL_EXPORT size_t _setNumberOfThreads(const size_t numThreads); DAAL_EXPORT void * _daal_threader_env(); @@ -166,11 +164,11 @@ inline void threaded_scalable_free(void * ptr) class ThreaderEnvironment { public: - ThreaderEnvironment() : _numberOfThreads(1 /*_daal_threader_get_max_threads()*/) {} - ~ThreaderEnvironment() { _daal_tbb_task_arena_free(_taskArena); } + ThreaderEnvironment(); + ~ThreaderEnvironment(); size_t getNumberOfThreads() const { return _numberOfThreads; } - void setNumberOfThreads(size_t value) { _numberOfThreads = value; } - void *& getTaskArena() { return _taskArena; } + void * getTaskArena() const { return _taskArena; }; + void setNumberOfThreads(size_t value); private: size_t _numberOfThreads;