From 8eeed46cfc1b9cc6fcf0accdb907cf3db9b8ef0a Mon Sep 17 00:00:00 2001 From: halx99 Date: Fri, 7 Jun 2024 00:33:01 +0800 Subject: [PATCH] Add JobSystem support (#1968) --- .github/workflows/build.yml | 13 - CHANGELOG.md | 10 + core/2d/RenderTexture.cpp | 4 +- core/3d/MeshRenderer.cpp | 69 +++-- core/audio/AudioEngine.cpp | 97 +------ core/audio/AudioEngine.h | 5 - core/audio/AudioPlayer.cpp | 6 +- core/base/CMakeLists.txt | 2 + core/base/Director.cpp | 42 +-- core/base/Director.h | 10 + core/base/JobSystem.cpp | 211 ++++++++++++++ core/base/JobSystem.h | 116 ++++++++ core/base/Utils.cpp | 36 ++- core/base/astc.cpp | 273 +++++++----------- core/media/WmfMediaEngine.cpp | 4 +- core/platform/FileUtils.h | 4 +- .../src/assets-manager/AssetsManagerEx.cpp | 35 +-- 17 files changed, 566 insertions(+), 371 deletions(-) create mode 100644 core/base/JobSystem.cpp create mode 100644 core/base/JobSystem.h diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 40f491d7c3ec..63a09ca70c7e 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -23,10 +23,6 @@ jobs: - uses: actions/checkout@v4 with: submodules: 'recursive' - - uses: ilammy/msvc-dev-cmd@v1 - with: - toolset: '14.39' - arch: 'x64' - name: Build shell: pwsh run: | @@ -38,10 +34,6 @@ jobs: - uses: actions/checkout@v4 with: submodules: 'recursive' - - uses: ilammy/msvc-dev-cmd@v1 - with: - toolset: '14.39' - arch: 'x64' - name: Build shell: pwsh run: .\tools\cmdline\axmol -p win32 -a x64 -dll @@ -52,11 +44,6 @@ jobs: - uses: actions/checkout@v4 with: submodules: 'recursive' - - uses: ilammy/msvc-dev-cmd@v1 - with: - toolset: '14.39' - arch: 'x64' - uwp: true - name: Build shell: pwsh run: .\tools\cmdline\axmol -p winuwp -a x64 -O0 diff --git a/CHANGELOG.md b/CHANGELOG.md index ba5dd310d8d8..af40eff55800 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,13 @@ +## axmol-2.1.4 ?? 2024 + +### Significant changes relative to 2.1.2: + +- Add JobSystem support + +### Mark as deprecated + +- `AsyncTaskPool`, use `JobSystem` instead. + ## axmol-2.1.3 May.26 2024 ### Significant changes relative to 2.1.2: diff --git a/core/2d/RenderTexture.cpp b/core/2d/RenderTexture.cpp index a297a49532f6..d72318db7f45 100644 --- a/core/2d/RenderTexture.cpp +++ b/core/2d/RenderTexture.cpp @@ -470,7 +470,7 @@ void RenderTexture::onSaveToFile(std::string filename, bool isRGBA, bool forceNo { if (forceNonPMA && image->hasPremultipliedAlpha()) { - std::thread([this, image, _filename, isRGBA, forceNonPMA]() { + _director->getJobSystem()->enqueue([this, image, _filename, isRGBA, forceNonPMA]() { image->reversePremultipliedAlpha(); Director::getInstance()->getScheduler()->runOnAxmolThread([this, image, _filename, isRGBA] { @@ -480,7 +480,7 @@ void RenderTexture::onSaveToFile(std::string filename, bool isRGBA, bool forceNo _saveFileCallback(this, _filename); } }); - }).detach(); + }); } else { diff --git a/core/3d/MeshRenderer.cpp b/core/3d/MeshRenderer.cpp index 2b1dedd4a899..a4ebe79dee11 100644 --- a/core/3d/MeshRenderer.cpp +++ b/core/3d/MeshRenderer.cpp @@ -33,7 +33,6 @@ #include "3d/Mesh.h" #include "base/Director.h" -#include "base/AsyncTaskPool.h" #include "base/UTF8.h" #include "base/Utils.h" #include "2d/Light.h" @@ -85,16 +84,16 @@ MeshRenderer* MeshRenderer::create(std::string_view modelPath, std::string_view } void MeshRenderer::createAsync(std::string_view modelPath, - const std::function& callback, - void* callbackparam) + const std::function& callback, + void* callbackparam) { createAsync(modelPath, "", callback, callbackparam); } void MeshRenderer::createAsync(std::string_view modelPath, - std::string_view texturePath, - const std::function& callback, - void* callbackparam) + std::string_view texturePath, + const std::function& callback, + void* callbackparam) { MeshRenderer* meshRenderer = new MeshRenderer(); if (meshRenderer->loadFromCache(modelPath)) @@ -106,20 +105,22 @@ void MeshRenderer::createAsync(std::string_view modelPath, } meshRenderer->_asyncLoadParam.afterLoadCallback = callback; - meshRenderer->_asyncLoadParam.texPath = texturePath; - meshRenderer->_asyncLoadParam.modelPath = modelPath; - meshRenderer->_asyncLoadParam.modelFullPath = FileUtils::getInstance()->fullPathForFilename(modelPath); - meshRenderer->_asyncLoadParam.callbackParam = callbackparam; - meshRenderer->_asyncLoadParam.materialdatas = new MaterialDatas(); - meshRenderer->_asyncLoadParam.meshdatas = new MeshDatas(); - meshRenderer->_asyncLoadParam.nodeDatas = new NodeDatas(); - AsyncTaskPool::getInstance()->enqueue( - AsyncTaskPool::TaskType::TASK_IO, AX_CALLBACK_1(MeshRenderer::afterAsyncLoad, meshRenderer), - (void*)(&meshRenderer->_asyncLoadParam), [meshRenderer]() { - auto& loadParam = meshRenderer->_asyncLoadParam; - loadParam.result = meshRenderer->loadFromFile(loadParam.modelFullPath, loadParam.nodeDatas, - loadParam.meshdatas, loadParam.materialdatas); - }); + meshRenderer->_asyncLoadParam.texPath = texturePath; + meshRenderer->_asyncLoadParam.modelPath = modelPath; + meshRenderer->_asyncLoadParam.modelFullPath = FileUtils::getInstance()->fullPathForFilename(modelPath); + meshRenderer->_asyncLoadParam.callbackParam = callbackparam; + meshRenderer->_asyncLoadParam.materialdatas = new MaterialDatas(); + meshRenderer->_asyncLoadParam.meshdatas = new MeshDatas(); + meshRenderer->_asyncLoadParam.nodeDatas = new NodeDatas(); + + auto director = Director::getInstance(); + director->getJobSystem()->enqueue( + [director, meshRenderer] { + auto& loadParam = meshRenderer->_asyncLoadParam; + loadParam.result = meshRenderer->loadFromFile(loadParam.modelFullPath, loadParam.nodeDatas, loadParam.meshdatas, + loadParam.materialdatas); + }, + [meshRenderer] { meshRenderer->afterAsyncLoad(&meshRenderer->_asyncLoadParam); }); } void MeshRenderer::afterAsyncLoad(void* param) @@ -232,9 +233,9 @@ bool MeshRenderer::loadFromCache(std::string_view path) } bool MeshRenderer::loadFromFile(std::string_view path, - NodeDatas* nodedatas, - MeshDatas* meshdatas, - MaterialDatas* materialdatas) + NodeDatas* nodedatas, + MeshDatas* meshdatas, + MaterialDatas* materialdatas) { std::string fullPath = FileUtils::getInstance()->fullPathForFilename(path); @@ -368,7 +369,9 @@ bool MeshRenderer::initFrom(const NodeDatas& nodeDatas, const MeshDatas& meshdat return true; } -MeshRenderer* MeshRenderer::createMeshRendererNode(NodeData* nodedata, ModelData* modeldata, const MaterialDatas& materialdatas) +MeshRenderer* MeshRenderer::createMeshRendererNode(NodeData* nodedata, + ModelData* modeldata, + const MaterialDatas& materialdatas) { auto meshRenderer = new MeshRenderer(); @@ -404,7 +407,8 @@ MeshRenderer* MeshRenderer::createMeshRendererNode(NodeData* nodedata, ModelData texParams.sAddressMode = textureData->wrapS; texParams.tAddressMode = textureData->wrapT; tex->setTexParameters(texParams); - _transparentMaterialHint = materialData->getTextureData(NTextureData::Usage::Transparency) != nullptr; + _transparentMaterialHint = + materialData->getTextureData(NTextureData::Usage::Transparency) != nullptr; } } textureData = materialData->getTextureData(NTextureData::Usage::Normal); @@ -570,7 +574,8 @@ void MeshRenderer::createNode(NodeData* nodedata, Node* root, const MaterialData texParams.sAddressMode = textureData->wrapS; texParams.tAddressMode = textureData->wrapT; tex->setTexParameters(texParams); - _transparentMaterialHint = materialData->getTextureData(NTextureData::Usage::Transparency) != nullptr; + _transparentMaterialHint = + materialData->getTextureData(NTextureData::Usage::Transparency) != nullptr; } } textureData = materialData->getTextureData(NTextureData::Usage::Normal); @@ -676,7 +681,8 @@ Texture2D* MeshRenderer::setMeshTexture(Mesh* mesh, std::string_view texPath, NT return tex; } -void MeshRenderer::setModelTexture(std::string_view modelPath, std::string_view texturePath) { +void MeshRenderer::setModelTexture(std::string_view modelPath, std::string_view texturePath) +{ if (!texturePath.empty()) setTexture(texturePath); else if (!_meshTextureHint) @@ -892,7 +898,7 @@ void MeshRenderer::draw(Renderer* renderer, const Mat4& transform, uint32_t flag } } -bool MeshRenderer::setProgramState(backend::ProgramState* programState, bool ownPS/* = false*/) +bool MeshRenderer::setProgramState(backend::ProgramState* programState, bool ownPS /* = false*/) { if (Node::setProgramState(programState, ownPS)) { @@ -1097,13 +1103,12 @@ static MeshMaterial* getMeshRendererMaterialForAttribs(MeshVertexData* meshVerte { if (hasTangentSpace) { - type = hasNormal && usesLight ? MeshMaterial::MaterialType::BUMPED_DIFFUSE - : MeshMaterial::MaterialType::UNLIT; + type = + hasNormal && usesLight ? MeshMaterial::MaterialType::BUMPED_DIFFUSE : MeshMaterial::MaterialType::UNLIT; } else { - type = hasNormal && usesLight ? MeshMaterial::MaterialType::DIFFUSE - : MeshMaterial::MaterialType::UNLIT; + type = hasNormal && usesLight ? MeshMaterial::MaterialType::DIFFUSE : MeshMaterial::MaterialType::UNLIT; } } else diff --git a/core/audio/AudioEngine.cpp b/core/audio/AudioEngine.cpp index cebec41e223d..314242fd4c65 100644 --- a/core/audio/AudioEngine.cpp +++ b/core/audio/AudioEngine.cpp @@ -54,10 +54,6 @@ AudioEngine::ProfileHelper* AudioEngine::_defaultProfileHelper = nullptr; std::unordered_map AudioEngine::_audioIDInfoMap; AudioEngineImpl* AudioEngine::_audioEngineImpl = nullptr; -#if !defined(__EMSCRIPTEN__) || defined(__EMSCRIPTEN_PTHREADS__) -AudioEngine::AudioEngineThreadPool* AudioEngine::s_threadPool = nullptr; -#endif - bool AudioEngine::_isEnabled = true; AudioEngine::AudioInfo::AudioInfo() @@ -66,90 +62,12 @@ AudioEngine::AudioInfo::AudioInfo() AudioEngine::AudioInfo::~AudioInfo() {} -#if !defined(__EMSCRIPTEN__) || defined(__EMSCRIPTEN_PTHREADS__) -class AudioEngine::AudioEngineThreadPool -{ -public: - AudioEngineThreadPool(int threads = 4) : _stop(false) - { - for (int index = 0; index < threads; ++index) - { - _workers.emplace_back(std::thread(std::bind(&AudioEngineThreadPool::threadFunc, this))); - } - } - - void addTask(const std::function& task) - { - std::unique_lock lk(_queueMutex); - _taskQueue.emplace(task); - _taskCondition.notify_one(); - } - - ~AudioEngineThreadPool() - { - { - std::unique_lock lk(_queueMutex); - _stop = true; - _taskCondition.notify_all(); - } - - for (auto&& worker : _workers) - { - worker.join(); - } - } - -private: - void threadFunc() - { - while (true) - { - std::function task = nullptr; - { - std::unique_lock lk(_queueMutex); - if (_stop) - { - break; - } - if (!_taskQueue.empty()) - { - task = std::move(_taskQueue.front()); - _taskQueue.pop(); - } - else - { - _taskCondition.wait(lk); - continue; - } - } - - task(); - } - } - - std::vector _workers; - std::queue> _taskQueue; - - std::mutex _queueMutex; - std::condition_variable _taskCondition; - bool _stop; -}; -#endif - void AudioEngine::end() { // make sure everythings cleanup before delete audio engine // fix #127 uncacheAll(); -#if !defined(__EMSCRIPTEN__) || defined(__EMSCRIPTEN_PTHREADS__) - if (s_threadPool) - { - delete s_threadPool; - s_threadPool = nullptr; - } -#endif - delete _audioEngineImpl; _audioEngineImpl = nullptr; @@ -170,13 +88,6 @@ bool AudioEngine::lazyInit() } } -#if !defined(__EMSCRIPTEN__) || defined(__EMSCRIPTEN_PTHREADS__) - if (s_threadPool == nullptr) - { - s_threadPool = new AudioEngineThreadPool(); - } -#endif - return true; } @@ -594,13 +505,7 @@ void AudioEngine::preload(std::string_view filePath, std::function& task) { lazyInit(); - -#if !defined(__EMSCRIPTEN__) || defined(__EMSCRIPTEN_PTHREADS__) - if (_audioEngineImpl && s_threadPool) - { - s_threadPool->addTask(task); - } -#endif + Director::getInstance()->getJobSystem()->enqueue(task); } int AudioEngine::getPlayingAudioCount() diff --git a/core/audio/AudioEngine.h b/core/audio/AudioEngine.h index c90a1df5421b..1876bd8fb7cf 100644 --- a/core/audio/AudioEngine.h +++ b/core/audio/AudioEngine.h @@ -382,11 +382,6 @@ class AX_DLL AudioEngine static AudioEngineImpl* _audioEngineImpl; -#if !defined(__EMSCRIPTEN__) || defined(__EMSCRIPTEN_PTHREADS__) - class AudioEngineThreadPool; - static AudioEngineThreadPool* s_threadPool; -#endif - static bool _isEnabled; friend class AudioEngineImpl; diff --git a/core/audio/AudioPlayer.cpp b/core/audio/AudioPlayer.cpp index 064796c45b48..7bef8232e08a 100644 --- a/core/audio/AudioPlayer.cpp +++ b/core/audio/AudioPlayer.cpp @@ -33,6 +33,8 @@ #include "audio/AudioDecoder.h" #include "audio/AudioDecoderManager.h" +#include "yasio/thread_name.hpp" + #ifdef VERY_VERY_VERBOSE_LOGGING # define ALOGVV ALOGV #else @@ -284,9 +286,7 @@ bool AudioPlayer::play2d() // rotateBufferThread is used to rotate alBufferData for _alSource when playing big audio file void AudioPlayer::rotateBufferThread(int offsetFrame) { -#if defined(__APPLE__) - pthread_setname_np("ALStreaming"); -#endif + yasio::set_thread_name("axmol-audio"); char* tmpBuffer = nullptr; auto& fullPath = _audioCache->_fileFullPath; diff --git a/core/base/CMakeLists.txt b/core/base/CMakeLists.txt index e16296b0cc69..c3c752b79d82 100644 --- a/core/base/CMakeLists.txt +++ b/core/base/CMakeLists.txt @@ -81,10 +81,12 @@ set(_AX_BASE_HEADER base/IMEDispatcher.h base/PaddedString.h base/JsonWriter.h + base/JobSystem.h ) set(_AX_BASE_SRC base/AsyncTaskPool.cpp + base/JobSystem.cpp base/AutoreleasePool.cpp base/Configuration.cpp base/Logging.cpp diff --git a/core/base/Director.cpp b/core/base/Director.cpp index ce3f9a481534..410d939902b3 100644 --- a/core/base/Director.cpp +++ b/core/base/Director.cpp @@ -61,7 +61,7 @@ THE SOFTWARE. #include "base/ObjectFactory.h" #include "platform/Application.h" #if defined(AX_ENABLE_AUDIO) - #include "audio/AudioEngine.h" +# include "audio/AudioEngine.h" #endif #if AX_ENABLE_SCRIPT_BINDING @@ -115,8 +115,12 @@ bool Director::init() // FPS _lastUpdate = std::chrono::steady_clock::now(); + + auto concurrency = Configuration::getInstance()->getValue("axmol.concurrency", Value{-1}).asInt(); + _jobSystem = JobSystem::create(concurrency); + #ifdef AX_ENABLE_CONSOLE - _console = new Console; + _console = new Console(); #endif // scheduler _scheduler = new Scheduler(); @@ -151,10 +155,9 @@ bool Director::init() #if AX_ENABLE_CACHE_TEXTURE_DATA // listen the event that renderer was recreated on Android/WP8 - _rendererRecreatedListener = EventListenerCustom::create( - EVENT_RENDERER_RECREATED, [this](EventCustom*) { - _isStatusLabelUpdated = true; // Force recreation of textures - }); + _rendererRecreatedListener = EventListenerCustom::create(EVENT_RENDERER_RECREATED, [this](EventCustom*) { + _isStatusLabelUpdated = true; // Force recreation of textures + }); _eventDispatcher->addEventListenerWithFixedPriority(_rendererRecreatedListener, -1); #endif @@ -199,6 +202,8 @@ Director::~Director() /** clean auto release pool. */ PoolManager::destroyInstance(); + AX_SAFE_DELETE(_jobSystem); + s_SharedDirector = nullptr; } @@ -247,7 +252,8 @@ void Director::setDefaultValues() Image::setCompressedImagesHavePMA(Image::CompressedImagePMAFlag::ASTC, astc_alpha_premultiplied); // ETC2 has alpha premultiplied ? - // Note: no suitable tools(etc2comp, Mali Texture Compression Tool, PVRTexTool) support do PMA currently, so set etc2 PMA default to `false` + // Note: no suitable tools(etc2comp, Mali Texture Compression Tool, PVRTexTool) support do PMA currently, so set + // etc2 PMA default to `false` bool etc2_alpha_premultiplied = conf->getValue("axmol.texture.etc2_has_pma", Value{false}).asBool(); Image::setCompressedImagesHavePMA(Image::CompressedImagePMAFlag::ETC2, etc2_alpha_premultiplied); } @@ -299,7 +305,8 @@ void Director::drawScene() if (_runningScene) { -#if (defined(AX_ENABLE_PHYSICS) || (defined(AX_ENABLE_3D_PHYSICS) && AX_ENABLE_BULLET_INTEGRATION) || defined(AX_ENABLE_NAVMESH)) +#if (defined(AX_ENABLE_PHYSICS) || (defined(AX_ENABLE_3D_PHYSICS) && AX_ENABLE_BULLET_INTEGRATION) || \ + defined(AX_ENABLE_NAVMESH)) _runningScene->stepPhysicsAndNavigation(_deltaTime); #endif // clear draw stats @@ -827,7 +834,7 @@ void Director::replaceScene(Scene* scene) { _nextScene->onExit(); } - if(_nextScene) + if (_nextScene) { _nextScene->cleanup(); } @@ -864,7 +871,7 @@ void Director::pushScene(Scene* scene) } #endif // AX_ENABLE_GC_FOR_NATIVE_OBJECTS _scenesStack.pushBack(scene); - _nextScene = scene; + _nextScene = scene; } void Director::popScene() @@ -1113,10 +1120,9 @@ void Director::restartDirector() #if AX_ENABLE_CACHE_TEXTURE_DATA // listen the event that renderer was recreated on Android/WP8 - _rendererRecreatedListener = EventListenerCustom::create( - EVENT_RENDERER_RECREATED, [this](EventCustom*) { - _isStatusLabelUpdated = true; // Force recreation of textures - }); + _rendererRecreatedListener = EventListenerCustom::create(EVENT_RENDERER_RECREATED, [this](EventCustom*) { + _isStatusLabelUpdated = true; // Force recreation of textures + }); _eventDispatcher->addEventListenerWithFixedPriority(_rendererRecreatedListener, -1); #endif @@ -1151,7 +1157,7 @@ void Director::setNextScene() _runningScene->release(); } _runningScene = _nextScene; - if(_nextScene) + if (_nextScene) { _nextScene->retain(); } @@ -1359,9 +1365,9 @@ void Director::setStatsAnchor(AnchorPreset anchor) showStats(); { - static Vec2 _fpsPosition = {0, 0}; - auto safeOrigin = getSafeAreaRect().origin; - auto safeSize = getSafeAreaRect().size; + static Vec2 _fpsPosition = {0, 0}; + auto safeOrigin = getSafeAreaRect().origin; + auto safeSize = getSafeAreaRect().size; const int height_spacing = (int)(22 / AX_CONTENT_SCALE_FACTOR()); switch (anchor) diff --git a/core/base/Director.h b/core/base/Director.h index 93e2b97a4513..816d6189b5d6 100644 --- a/core/base/Director.h +++ b/core/base/Director.h @@ -45,6 +45,8 @@ THE SOFTWARE. #include "base/Console.h" #endif +#include "base/JobSystem.h" + NS_AX_BEGIN /** @@ -397,6 +399,12 @@ class AX_DLL Director */ float getContentScaleFactor() const { return _contentScaleFactor; } + + /** Gets the JobSystem associated with this director. + * @since axmol-2.1.4 + */ + JobSystem* getJobSystem() const { return _jobSystem; } + /** Gets the Scheduler associated with this director. * @since v2.0 */ @@ -595,6 +603,8 @@ class AX_DLL Director which inherit from it as default renderer context,you can have your own by inherit from it*/ GLView* _glView = nullptr; + JobSystem* _jobSystem = nullptr; + // texture cache belongs to this director TextureCache* _textureCache = nullptr; diff --git a/core/base/JobSystem.cpp b/core/base/JobSystem.cpp new file mode 100644 index 000000000000..b07fc3114512 --- /dev/null +++ b/core/base/JobSystem.cpp @@ -0,0 +1,211 @@ +#include "base/JobSystem.h" +#include "base/Director.h" +#include "yasio/thread_name.hpp" + +#include +#include +#include +#include +#include +#include +#include + +NS_AX_BEGIN + +#pragma region JobExecutor +class JobExecutor +{ +public: + explicit JobExecutor(std::span> tdds) : stop(false) + { + for (auto thread_data : tdds) + workers.emplace_back([this, thread_data] { + thread_data->init(); + yasio::set_thread_name(thread_data->name()); + for (;;) + { + std::function task; + { + std::unique_lock lock(this->queue_mutex); + this->condition.wait(lock, [this] { return this->stop || !this->tasks.empty(); }); + if (this->stop && this->tasks.empty()) + break; + task = std::move(this->tasks.front()); + this->tasks.pop(); + } + + task(thread_data.get()); + } + thread_data->finz(); + }); + } + template + auto enqueue(F&& f, Args&&... args) -> std::future::type> + { + using return_type = typename std::invoke_result::type; + + auto task = std::make_shared>( + std::bind(std::forward(f), std::placeholders::_1, std::forward(args)...)); + + std::future res = task->get_future(); + { + std::unique_lock lock(queue_mutex); + + // don't allow enqueueing after stopping the pool + if (stop) + throw std::runtime_error("enqueue on stopped executor"); + + tasks.emplace([task](JobThreadData* thread_data) { (*task)(thread_data); }); + } + condition.notify_one(); + return res; + } + + template + void enqueue_v(F&& f, Args&&... args) + { + auto task = std::bind(std::forward(f), std::placeholders::_1, std::forward(args)...); + + { + std::unique_lock lock(queue_mutex); + + // don't allow enqueueing after stopping the pool + if (stop) + throw std::runtime_error("enqueue on stopped executor"); + + tasks.emplace(std::move(task)); + } + condition.notify_one(); + } + ~JobExecutor() + { + { + std::unique_lock lock(queue_mutex); + stop = true; + } + condition.notify_all(); + for (std::thread& worker : workers) + worker.join(); + } + +private: + // need to keep track of threads so we can join them + std::vector workers; + + // the task queue + std::queue> tasks; + + // synchronization + std::mutex queue_mutex; + std::condition_variable condition; + bool stop; +}; + +#pragma endregion + +#pragma region JobSystem + +JobSystem* JobSystem::create(int nthread) +{ + auto inst = new JobSystem(); + inst->start(nthread); + return inst; +} + +void JobSystem::destroy(JobSystem* inst) +{ + if (inst) + { + inst->stop(); + delete inst; + } +} + +JobSystem* JobSystem::create(std::span> tdds) +{ + if (!tdds.empty()) + { + auto inst = new JobSystem(); + inst->start(tdds); + return inst; + } + return nullptr; +} + +JobSystem::~JobSystem() +{ + stop(); +} + +// Call at task collect thread +void JobSystem::start(int nThreads) +{ + if (_executor) + return; + + if (nThreads < 0) + { +#if !defined(__EMSCRIPTEN_PTHREADS__) + nThreads = (std::max)(static_cast(std::thread::hardware_concurrency() * 3 / 2), 1); +#else + nThreads = 4; +#endif + } + std::vector> tdds; + for (auto i = 0; i < nThreads; ++i) + tdds.emplace_back(std::make_shared()); + + _executor = new JobExecutor(tdds); +} + +void JobSystem::start(std::span> tdds) +{ + if (_executor) + return; + + _executor = new JobExecutor(tdds); +} + +// Call at task collect thread +void JobSystem::stop() +{ + if (_executor != nullptr) + { + delete _executor; + } +} + +void JobSystem::enqueue_v(std::function task) +{ + _executor->enqueue_v(std::move(task)); +} + +void JobSystem::enqueue(std::function task) +{ + _executor->enqueue_v([task_ = std::move(task)](JobThreadData*) { task_(); }); +} + +void JobSystem::enqueue(std::shared_ptr task) +{ + _executor->enqueue_v([task](JobThreadData* thread_data) { + if (!task->isRequestCancel()) + { + task->setThreadData(thread_data); + task->setState(JobThreadTask::State::Inprogress); + task->execute(); + task->setState(JobThreadTask::State::Idle); + } + }); +} + +void JobSystem::enqueue(std::function task, std::function done) +{ + _executor->enqueue_v([task_ = std::move(task), done_ = std::move(done)](JobThreadData*) { + task_(); + Director::getInstance()->getScheduler()->runOnAxmolThread(done_); + }); +} + +#pragma endregion + +NS_AX_END diff --git a/core/base/JobSystem.h b/core/base/JobSystem.h new file mode 100644 index 000000000000..bb82408a4b7e --- /dev/null +++ b/core/base/JobSystem.h @@ -0,0 +1,116 @@ +/**************************************************************************** + + Copyright (c) 2019-present Axmol Engine + * contributors (see AUTHORS.md). + + https://axmolengine.github.io/ + + Permission is hereby granted, free of charge, to + * any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the + * Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, + * distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished + * to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be + * included in + all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY + * OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A + * PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, + * DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + THE SOFTWARE. + + * ****************************************************************************/ + +#pragma once + +#include +#include +#include +#include +#include "base/Config.h" +#include "platform/PlatformDefine.h" + +NS_AX_BEGIN + +class JobExecutor; +class JobSystem; +class JobThreadData +{ +public: + virtual ~JobThreadData() {} + virtual void init() {} + virtual void finz() {} + virtual const char* name() { return "axmol"; } +}; + +class JobThreadTask +{ + friend class JobSystem; + +public: + enum class State + { + Idle, + Inprogress, + RequestCancel, + }; + virtual ~JobThreadTask() {} + bool isInprogress() const { return _taskState == State::Inprogress; } + bool isIdle() const { return _taskState == State::Idle; } + bool isRequestCancel() const { return _taskState == State::RequestCancel; } + void cancel() + { + if (isInprogress()) + _taskState = State::RequestCancel; + } + + JobThreadData* getThreadData() const { return _threadData; } + +protected: + void setState(State state) { _taskState = state; } + void setThreadData(JobThreadData* threadData) { _threadData = threadData; } + + virtual void execute() {} + + State _taskState{State::Idle}; + JobThreadData* _threadData{nullptr}; +}; + +class AX_API JobSystem +{ +public: + static JobSystem* create(int nthread = -1); + static JobSystem* create(std::span> tdds); + static void destroy(JobSystem* system); + + ~JobSystem(); + + void start(int nThreads = -1); + void start(std::span> tdds); + + void stop(); + + void enqueue_v(std::function task); + + void enqueue(std::function task); + void enqueue(std::function task, std::function done); + void enqueue(std::shared_ptr task); + +private: + JobExecutor* _executor{nullptr}; +}; + +NS_AX_END diff --git a/core/base/Utils.cpp b/core/base/Utils.cpp index 61aa842aa10e..cb1a04020be4 100644 --- a/core/base/Utils.cpp +++ b/core/base/Utils.cpp @@ -39,7 +39,6 @@ THE SOFTWARE. #include "openssl/evp.h" #include "base/Director.h" -#include "base/AsyncTaskPool.h" #include "base/EventDispatcher.h" #include "base/Constants.h" #include "base/UTF8.h" @@ -111,20 +110,20 @@ void captureScreen(std::function)> imageCallback) s_captureScreenListener = eventDispatcher->addCustomEventListener(Director::EVENT_AFTER_DRAW, [=](EventCustom* /*event*/) { #endif - eventDispatcher->removeEventListener(s_captureScreenListener); - s_captureScreenListener = nullptr; - // !!!GL: AFTER_DRAW and BEFORE_END_FRAME - renderer->readPixels(renderer->getDefaultRenderTarget(), [=](const backend::PixelBufferDescriptor& pbd) { - if (pbd) - { - auto image = utils::makeInstance(&Image::initWithRawData, pbd._data.getBytes(), - pbd._data.getSize(), pbd._width, pbd._height, 8, false); - imageCallback(image); - } - else - imageCallback(nullptr); - }); + eventDispatcher->removeEventListener(s_captureScreenListener); + s_captureScreenListener = nullptr; + // !!!GL: AFTER_DRAW and BEFORE_END_FRAME + renderer->readPixels(renderer->getDefaultRenderTarget(), [=](const backend::PixelBufferDescriptor& pbd) { + if (pbd) + { + auto image = utils::makeInstance(&Image::initWithRawData, pbd._data.getBytes(), + pbd._data.getSize(), pbd._width, pbd._height, 8, false); + imageCallback(image); + } + else + imageCallback(nullptr); }); + }); } static std::unordered_map s_captureNodeListener; @@ -201,13 +200,12 @@ void captureScreen(std::function afterCap, std::st outfile = FileUtils::getInstance()->getWritablePath().append(filename); captureScreen([_afterCap = std::move(afterCap), _outfile = std::move(outfile)](RefPtr image) mutable { - AsyncTaskPool::getInstance()->enqueue( - AsyncTaskPool::TaskType::TASK_IO, + Director::getInstance()->getJobSystem()->enqueue( [_afterCap = std::move(_afterCap), image = std::move(image), _outfile = std::move(_outfile)]() mutable { bool ok = image->saveToFile(_outfile); Director::getInstance()->getScheduler()->runOnAxmolThread( [ok, _afterCap = std::move(_afterCap), _outfile = std::move(_outfile)] { _afterCap(ok, _outfile); }); - }); + }); }); } @@ -408,8 +406,8 @@ std::string getFileMD5Hash(std::string_view filename, uint32_t bufferSize) std::string getDataMD5Hash(const Data& data) { - //if (data.isNull()) - // return std::string{}; + // if (data.isNull()) + // return std::string{}; return computeDigest(std::string_view{(const char*)data.getBytes(), (size_t)data.getSize()}, "md5"sv); } diff --git a/core/base/astc.cpp b/core/base/astc.cpp index 6db1f89989ea..379014a40931 100644 --- a/core/base/astc.cpp +++ b/core/base/astc.cpp @@ -22,47 +22,50 @@ #include "astcenc/astcenc_internal_entry.h" #include "yasio/utils.hpp" +#include "base/Director.h" + #if !defined(__EMSCRIPTEN__) || defined(__EMSCRIPTEN_PTHREADS__) -#define ASTCDEC_NO_CONTEXT 1 -#define ASTCDEC_PRINT_BENCHMARK 0 +# define ASTCDEC_NO_CONTEXT 1 +# define ASTCDEC_PRINT_BENCHMARK 0 +# define ASTCDEC_MAX_PARALLELS 8u typedef std::mutex astc_decompress_mutex_t; struct astc_decompress_task { - astc_decompress_task() {} - ~astc_decompress_task() + astc_decompress_task() noexcept {} + ~astc_decompress_task() noexcept { -#if ASTCDEC_NO_CONTEXT +# if ASTCDEC_NO_CONTEXT if (_bsd) aligned_free(_bsd); -#else +# else if (_context) astcenc_context_free(this->_context); -#endif +# endif } void wait_done() { -#if ASTCDEC_NO_CONTEXT +# if ASTCDEC_NO_CONTEXT _decompress_pm.wait(); -#else +# else _context->manage_decompress.wait(); -#endif +# endif } - const uint8_t* _in_texels = nullptr; + const uint8_t* _in_texels{nullptr}; void* _out_texels[1]{}; - unsigned int _xblocks, _yblocks; -#if ASTCDEC_NO_CONTEXT - unsigned int _block_x, _block_y; + unsigned int _xblocks{0}, _yblocks{0}; +# if ASTCDEC_NO_CONTEXT + unsigned int _block_x{0}, _block_y{0}; ParallelManager _decompress_pm{}; - block_size_descriptor* _bsd = nullptr; -#else + block_size_descriptor* _bsd{nullptr}; +# else astcenc_config _config{}; - astcenc_context* _context = nullptr; -#endif + astcenc_context* _context{nullptr}; +# endif astcenc_image _image_out{}; }; @@ -74,35 +77,9 @@ class astc_decompress_job_manager static astc_decompress_job_manager s_instance; return &s_instance; } - astc_decompress_job_manager() - { -#if !defined(__EMSCRIPTEN_PTHREADS__) - int thread_count = std::thread::hardware_concurrency(); -#else - constexpr int thread_count = 2; -#endif - for (int i = 0; i < thread_count; ++i) - { - _threads.emplace_back(std::thread{&astc_decompress_job_manager::run, this}); - } - } + astc_decompress_job_manager() {} - ~astc_decompress_job_manager() - { - _stopped = true; - - _task_queue_mtx.lock(); - _task_queue.clear(); - _task_queue_cv.notify_all(); - _task_queue_mtx.unlock(); - - for (auto&& t : _threads) - { - if (t.joinable()) - t.join(); - } - _threads.clear(); - } + ~astc_decompress_job_manager() {} int decompress_parallel_sync(const uint8_t* in, uint32_t inlen, @@ -117,23 +94,77 @@ class astc_decompress_job_manager if (!task) return ASTCENC_ERR_OUT_OF_MEM; - _task_queue_mtx.lock(); - _task_queue.emplace_back(task); - _task_queue_mtx.unlock(); - _task_queue_cv.notify_all(); // notify all thread to process the single decompress task parallel + auto jobSystem = ax::Director::getInstance()->getJobSystem(); - task->wait_done(); + const int PARALLELS = std::clamp(std::thread::hardware_concurrency(), 2u, ASTCDEC_MAX_PARALLELS); + for (int i = 0; i < PARALLELS; ++i) + jobSystem->enqueue([task] { execute(task); }); - _task_queue_mtx.lock(); - assert(!_task_queue.empty()); - auto t = _task_queue.front(); - assert(t.get() == task.get()); - _task_queue.pop_front(); - _task_queue_mtx.unlock(); + task->wait_done(); return ASTCENC_SUCCESS; } + static void execute(std::shared_ptr task) + { + constexpr astcenc_swizzle swz_decode{ASTCENC_SWZ_R, ASTCENC_SWZ_G, ASTCENC_SWZ_B, ASTCENC_SWZ_A}; + + auto& image_out = task->_image_out; + +# if ASTCDEC_NO_CONTEXT + unsigned int block_x = task->_block_x; + unsigned int block_y = task->_block_y; + unsigned int block_z = 1; // task->block_z; + auto& bsd = *task->_bsd; + auto& decompress_pm = task->_decompress_pm; +# else + unsigned int block_x = task->_config.block_x; + unsigned int block_y = task->_config.block_y; + unsigned int block_z = 1; // task->_config.block_z; + auto& bsd = *task->_context->context.bsd; + auto& decompress_pm = task->_context->manage_decompress; +# endif + unsigned int xblocks = task->_xblocks; + unsigned int yblocks = task->_yblocks; + unsigned int zblocks = 1; // (image_out.dim_z + block_z - 1) / block_z; + + int row_blocks = xblocks; + int plane_blocks = xblocks * yblocks; + + image_block blk; + auto data = task->_in_texels; + for (;;) + { // process the task + unsigned int count = 0; + unsigned int base = decompress_pm.get_task_assignment(128, count); + + bool no_task_count = !count; + if (no_task_count) + { // this thread will going to suspend until new task added + break; + } + + for (unsigned int i = base; i < base + count; i++) + { + // Decode i into x, y, z block indices + int z = i / plane_blocks; + unsigned int rem = i - (z * plane_blocks); + int y = rem / row_blocks; + int x = rem - (y * row_blocks); + + unsigned int offset = (((z * yblocks + y) * xblocks) + x) * 16; + symbolic_compressed_block scb; + physical_to_symbolic(bsd, data + offset, scb); + + decompress_symbolic_block(ASTCENC_PRF_LDR, bsd, x * block_x, y * block_y, z * block_z, scb, blk); + + store_image_block(image_out, blk, bsd, x * block_x, y * block_y, z * block_z, swz_decode); + } + + decompress_pm.complete_task_assignment(count); + } + } + private: std::shared_ptr make_task(const uint8_t* in, unsigned int inlen, @@ -160,7 +191,7 @@ class astc_decompress_job_manager task->_xblocks = xblocks; task->_yblocks = yblocks; -#if ASTCDEC_NO_CONTEXT +# if ASTCDEC_NO_CONTEXT // since astcenc-3.3, doesn't required // static std::once_flag once_flag; // std::call_once(once_flag, init_quant_mode_table); @@ -171,99 +202,27 @@ class astc_decompress_job_manager init_block_size_descriptor(block_x, block_y, 1, false, 0 /*unused for decompress*/, 0, *task->_bsd); // since astcenc-4.7.0, add second argument=nullptr task->_decompress_pm.init(total_blocks, nullptr); -#else +# else (void)astcenc_config_init(ASTCENC_PRF_LDR, block_x, block_y, 1, 0, ASTCENC_FLG_DECOMPRESS_ONLY, &task->_config); (void)astcenc_context_alloc(&task->_config, (unsigned int)_threads.size(), &task->_context); task->_context->manage_decompress.init(total_blocks); -#endif +# endif return task; } - void run() - { - const astcenc_swizzle swz_decode{ASTCENC_SWZ_R, ASTCENC_SWZ_G, ASTCENC_SWZ_B, ASTCENC_SWZ_A}; - - bool no_task_count = false; - - for (;;) - { - std::unique_lock lck(_task_queue_mtx); - if (!_stopped && (_task_queue.empty() || no_task_count)) - _task_queue_cv.wait(lck); - - if (_stopped) - break; - - if (_task_queue.empty()) - continue; - auto task = _task_queue.front(); - lck.unlock(); // unlock make sure other thread can work for the task - - auto& image_out = task->_image_out; - -#if ASTCDEC_NO_CONTEXT - unsigned int block_x = task->_block_x; - unsigned int block_y = task->_block_y; - unsigned int block_z = 1; // task->block_z; - auto& bsd = *task->_bsd; - auto& decompress_pm = task->_decompress_pm; -#else - unsigned int block_x = task->_config.block_x; - unsigned int block_y = task->_config.block_y; - unsigned int block_z = 1; // task->_config.block_z; - auto& bsd = *task->_context->context.bsd; - auto& decompress_pm = task->_context->manage_decompress; -#endif - unsigned int xblocks = task->_xblocks; - unsigned int yblocks = task->_yblocks; - unsigned int zblocks = 1; // (image_out.dim_z + block_z - 1) / block_z; - - int row_blocks = xblocks; - int plane_blocks = xblocks * yblocks; - - image_block blk; - auto data = task->_in_texels; - for (;;) - { // process the task - unsigned int count = 0; - unsigned int base = decompress_pm.get_task_assignment(128, count); - - no_task_count = !count; - if (no_task_count) - { // this thread will going to suspend until new task added - break; - } - - for (unsigned int i = base; i < base + count; i++) - { - // Decode i into x, y, z block indices - int z = i / plane_blocks; - unsigned int rem = i - (z * plane_blocks); - int y = rem / row_blocks; - int x = rem - (y * row_blocks); - - unsigned int offset = (((z * yblocks + y) * xblocks) + x) * 16; - symbolic_compressed_block scb; - physical_to_symbolic(bsd, data + offset, scb); - - decompress_symbolic_block(ASTCENC_PRF_LDR, bsd, x * block_x, y * block_y, z * block_z, scb, blk); - - store_image_block(image_out, blk, bsd, x * block_x, y * block_y, z * block_z, swz_decode); - } - - decompress_pm.complete_task_assignment(count); - } - } - } - - std::vector _threads; - - std::deque> _task_queue; - astc_decompress_mutex_t _task_queue_mtx; - std::condition_variable_any _task_queue_cv; - - bool _stopped = false; }; +template +struct benchmark_printer +{ + benchmark_printer(_FMT&& fmt, int w, int h, float den) + : _fmt(fmt), _w(w), _h(h), _den(den), _start(yasio::highp_clock()) + {} + ~benchmark_printer() { AXLOGI("{}", fmt::format(_fmt, _w, _h, (yasio::highp_clock() - _start) / _den)); } + _FMT _fmt; + int _w, _h; + float _den; + yasio::highp_time_t _start; +}; int astc_decompress_image(const uint8_t* in, uint32_t inlen, uint8_t* out, @@ -272,20 +231,10 @@ int astc_decompress_image(const uint8_t* in, uint32_t block_x, uint32_t block_y) { -#if ASTCDEC_PRINT_BENCHMARK - struct benchmark_printer - { - benchmark_printer(const char* fmt, int w, int h, float den) - : _fmt(fmt), _w(w), _h(h), _den(den), _start(yasio::highp_clock()) - {} - ~benchmark_printer() { AXLOGI(_fmt, _w, _h, (yasio::highp_clock() - _start) / _den); } - const char* _fmt; - int _w, _h; - float _den; - yasio::highp_time_t _start; - }; - benchmark_printer __printer("decompress astc image ({}x{}) cost: {}(ms)", dim_x, dim_y, (float)std::milli::den); -#endif +# if ASTCDEC_PRINT_BENCHMARK + benchmark_printer __printer(FMT_COMPILE("decompress astc image ({}x{}) cost: {}(ms)"), dim_x, dim_y, + (float)std::milli::den); +# endif return astc_decompress_job_manager::get_instance()->decompress_parallel_sync(in, inlen, out, dim_x, dim_y, block_x, block_y); } @@ -310,7 +259,8 @@ int astc_decompress_image(const uint8_t* in, // Check we have enough output space (16 bytes per block) size_t size_needed = xblocks * yblocks * zblocks * 16; - if (inlen < size_needed) { + if (inlen < size_needed) + { return ASTCENC_ERR_OUT_OF_MEM; } @@ -322,14 +272,15 @@ int astc_decompress_image(const uint8_t* in, astcenc_image image_out{dim_x, dim_y, 1, ASTCENC_TYPE_U8, data}; const auto total_blocks = zblocks * yblocks * xblocks; const astcenc_swizzle swz_decode{ASTCENC_SWZ_R, ASTCENC_SWZ_G, ASTCENC_SWZ_B, ASTCENC_SWZ_A}; - for (unsigned int i = 0; i < total_blocks; ++i) { + for (unsigned int i = 0; i < total_blocks; ++i) + { // Decode i into x, y, z block indices int z = i / plane_blocks; unsigned int rem = i - (z * plane_blocks); int y = rem / row_blocks; int x = rem - (y * row_blocks); - unsigned int offset = (((z * yblocks + y) * xblocks) + x) * 16; + unsigned int offset = (((z * yblocks + y) * xblocks) + x) * 16; symbolic_compressed_block scb; physical_to_symbolic(*bsd, in + offset, scb); diff --git a/core/media/WmfMediaEngine.cpp b/core/media/WmfMediaEngine.cpp index b4db6245a2fb..3eef666733ab 100644 --- a/core/media/WmfMediaEngine.cpp +++ b/core/media/WmfMediaEngine.cpp @@ -361,7 +361,8 @@ bool WmfMediaEngine::open(std::string_view sourceUri) this->QueryInterface(IID_IUnknown, &sharedFromThis); m_bOpenPending = true; - std::thread t([this, sharedFromThis, wsourceUri = ntcvt::from_chars(sourceUri)] { + + Director::getInstance()->getJobSystem()->enqueue([this, sharedFromThis, wsourceUri = ntcvt::from_chars(sourceUri)] { TComPtr pTopology; TComPtr pClock; @@ -415,7 +416,6 @@ bool WmfMediaEngine::open(std::string_view sourceUri) m_bOpenPending = false; SetEvent(m_hOpenEvent); }); - t.detach(); return true; } diff --git a/core/platform/FileUtils.h b/core/platform/FileUtils.h index 9d87346d2e9e..8f8867539ba9 100644 --- a/core/platform/FileUtils.h +++ b/core/platform/FileUtils.h @@ -39,7 +39,6 @@ THE SOFTWARE. #include "base/Types.h" #include "base/Value.h" #include "base/Data.h" -#include "base/AsyncTaskPool.h" #include "base/Scheduler.h" #include "base/Director.h" @@ -899,8 +898,7 @@ class AX_DLL FileUtils }, std::forward(action), std::forward(callback), std::forward(args)...); - AsyncTaskPool::getInstance()->enqueue( - AsyncTaskPool::TaskType::TASK_IO, [](void*) {}, nullptr, std::move(lambda)); + Director::getInstance()->getJobSystem()->enqueue(std::move(lambda)); } }; diff --git a/extensions/assets-manager/src/assets-manager/AssetsManagerEx.cpp b/extensions/assets-manager/src/assets-manager/AssetsManagerEx.cpp index 4f8364dda8b8..07836816d28d 100644 --- a/extensions/assets-manager/src/assets-manager/AssetsManagerEx.cpp +++ b/extensions/assets-manager/src/assets-manager/AssetsManagerEx.cpp @@ -35,21 +35,20 @@ # include "unzip.h" #endif #include -#include "base/AsyncTaskPool.h" NS_AX_EXT_BEGIN -#define TEMP_FOLDERNAME "_temp" -#define VERSION_FILENAME "version.manifest" -#define TEMP_MANIFEST_FILENAME "project.manifest.temp" -#define MANIFEST_FILENAME "project.manifest" +#define TEMP_FOLDERNAME "_temp" +#define VERSION_FILENAME "version.manifest" +#define TEMP_MANIFEST_FILENAME "project.manifest.temp" +#define MANIFEST_FILENAME "project.manifest" -#define BUFFER_SIZE 8192 -#define MAX_FILENAME 512 +#define BUFFER_SIZE 8192 +#define MAX_FILENAME 512 #define DEFAULT_CONNECTION_TIMEOUT 45 -#define SAVE_POINT_INTERVAL 0.1 +#define SAVE_POINT_INTERVAL 0.1 const std::string AssetsManagerEx::VERSION_ID = "@version"; const std::string AssetsManagerEx::MANIFEST_ID = "@manifest"; @@ -561,15 +560,17 @@ void AssetsManagerEx::decompressDownloadedZip(std::string_view customId, std::st } delete dataInner; }; - AsyncTaskPool::getInstance()->enqueue(AsyncTaskPool::TaskType::TASK_OTHER, std::move(decompressFinished), - (void*)asyncData, [this, asyncData]() { - // Decompress all compressed files - if (decompress(asyncData->zipFile)) - { - asyncData->succeed = true; - } - _fileUtils->removeFile(asyncData->zipFile); - }); + + Director::getInstance()->getJobSystem()->enqueue( + [this, asyncData]() { + // Decompress all compressed files + if (decompress(asyncData->zipFile)) + { + asyncData->succeed = true; + } + _fileUtils->removeFile(asyncData->zipFile); + }, + [decompressFinished, asyncData]() { decompressFinished(asyncData); }); } void AssetsManagerEx::dispatchUpdateEvent(EventAssetsManagerEx::EventCode code,