From 79bfebacdece969b944f85e0f97dd96160e670e6 Mon Sep 17 00:00:00 2001 From: ibell13 Date: Tue, 8 Oct 2024 16:52:59 -0500 Subject: [PATCH] initial threading - mostly works --- io/CopcReader.cpp | 48 +++++++++++++++++++++++----------------- io/CopcReader.hpp | 1 + kernels/TIndexKernel.cpp | 41 +++++++++++++++++++++------------- kernels/TIndexKernel.hpp | 1 + 4 files changed, 56 insertions(+), 35 deletions(-) diff --git a/io/CopcReader.cpp b/io/CopcReader.cpp index 544a7ad3cf..30a09ad85a 100644 --- a/io/CopcReader.cpp +++ b/io/CopcReader.cpp @@ -234,14 +234,14 @@ struct CopcReader::Args struct CopcReader::Private { public: + std::mutex mutex; + std::queue contents; std::unique_ptr pool; std::unique_ptr currentTile; std::unique_ptr connector; - std::queue contents; copc::Hierarchy hierarchy; las::LoaderDriver loader; - std::mutex mutex; std::condition_variable contentsCv; std::condition_variable consumedCv; std::vector polys; @@ -255,7 +255,7 @@ struct CopcReader::Private las::Header header; copc::Info copc_info; point_count_t hierarchyPointCount; - bool done; + std::atomic done; SrsTransform llToBcbfTransform; }; @@ -264,7 +264,12 @@ CopcReader::CopcReader() : m_args(new CopcReader::Args), m_p(new CopcReader::Pri CopcReader::~CopcReader() -{} +{ + // We join the pool rather than let the dtor do it because the workers use m_p + // which will get 0'ed in the dtor before access through m_p is complete in + // the workers. + done(); +} std::string CopcReader::getName() const @@ -333,11 +338,11 @@ void CopcReader::setForwards(StringMap& headers, StringMap& query) void CopcReader::initialize(PointTableRef table) { - const std::size_t threads(m_args->threads); - if (threads > 100) + if (m_args->threads > 100) log()->get(LogLevel::Warning) << "Using a large thread count: " << - threads << " threads" << std::endl; - m_p->pool.reset(new ThreadPool(threads)); + m_args->threads << " threads" << std::endl; + // Make sure we allow at least as many chunks as we have threads. + m_args->keepAliveChunkCount = (std::max)(m_args->threads, (size_t)m_args->keepAliveChunkCount); StringMap headers; StringMap query; @@ -406,7 +411,7 @@ void CopcReader::initialize(PointTableRef table) m_p->depthEnd = m_args->resolution ? (std::max)(1, (int)ceil(log2(m_p->copc_info.spacing / m_args->resolution)) + 1) : 0; - + if (m_args->resolution) log()->get(LogLevel::Debug) << "Maximum depth: " << m_p->depthEnd << std::endl; @@ -490,7 +495,7 @@ las::VlrList CopcReader::fetchSrsVlrs(const las::VlrCatalog& catalog) fetchVlr(las::TransformUserId, las::LASFWkt2recordId); fetchVlr(las::PdalUserId, las::PdalProjJsonRecordId); fetchVlr(las::TransformUserId, las::WktRecordId); - + // User told us to ditch them if (m_args->nosrs) vlrs.clear(); @@ -638,7 +643,6 @@ QuickInfo CopcReader::inspect() qi.m_bounds.clip(b); } qi.m_valid = true; - done(t); return qi; } @@ -672,6 +676,7 @@ void CopcReader::addDimensions(PointLayoutPtr layout) void CopcReader::ready(PointTableRef table) { + m_p->pool.reset(new ThreadPool(m_args->threads)); // Determine all overlapping data files we'll need to fetch. try { @@ -694,7 +699,6 @@ void CopcReader::ready(PointTableRef table) m_p->tileCount = m_p->hierarchy.size(); log()->get(LogLevel::Debug) << m_p->tileCount << " overlapping nodes" << std::endl; - m_p->pool.reset(new ThreadPool(m_p->pool->numThreads())); m_p->done = false; for (const copc::Entry& entry : m_p->hierarchy) load(entry); @@ -833,10 +837,9 @@ void CopcReader::load(const copc::Entry& entry) // Put the tile on the output queue. std::unique_lock l(m_p->mutex); - if (m_p->done) - return; - while (m_p->contents.size() >= (std::max)((size_t)m_args->keepAliveChunkCount, m_p->pool->numThreads())) - m_p->consumedCv.wait(l); + m_p->consumedCv.wait(l, [this] { + return (m_p->done || + m_p->contents.size() < (size_t)m_args->keepAliveChunkCount);}); m_p->contents.push(std::move(tile)); l.unlock(); m_p->contentsCv.notify_one(); @@ -1005,13 +1008,18 @@ bool CopcReader::processOne(PointRef& point) void CopcReader::done(PointTableRef) { + done(); +} + +void CopcReader::done() +{ + if (m_p->pool) { - std::unique_lock l(m_p->mutex); m_p->done = true; + m_p->consumedCv.notify_all(); + m_p->pool->stop(); + m_p->connector.reset(); } - m_p->consumedCv.notify_all(); - m_p->pool->stop(); - m_p->connector.reset(); } } // namespace pdal diff --git a/io/CopcReader.hpp b/io/CopcReader.hpp index ed52c82c4a..15c2cd6f70 100644 --- a/io/CopcReader.hpp +++ b/io/CopcReader.hpp @@ -93,6 +93,7 @@ class PDAL_DLL CopcReader : public Reader, public Streamable void validateVlrInfo(const las::Vlr& v, const copc::Info& i); void createSpatialFilters(); + void done(); void loadHierarchy(); void loadHierarchy(copc::Hierarchy& hierarchy, const copc::HierarchyPage& page, const copc::Entry& entry); diff --git a/kernels/TIndexKernel.cpp b/kernels/TIndexKernel.cpp index 0f7cb5fe36..adb2a0bcca 100644 --- a/kernels/TIndexKernel.cpp +++ b/kernels/TIndexKernel.cpp @@ -118,6 +118,8 @@ void TIndexKernel::addSubSwitches(ProgramArgs& args, "Write absolute rather than relative file paths", m_absPath); args.add("stdin,s", "Read filespec pattern from standard input", m_usestdin); + args.add("threads", "Number of threads to use for file processing", + m_threads); } else if (subcommand == "merge") { @@ -275,30 +277,39 @@ void TIndexKernel::createFile() size_t filecount(0); StageFactory factory(false); + ThreadPool pool(m_threads); + const std::chrono::time_point start = + std::chrono::steady_clock::now(); for (auto f : m_files) { - const std::chrono::time_point start = - std::chrono::steady_clock::now(); //ABELL - Not sure why we need to get absolute path here. f = FileUtils::toAbsolutePath(f); - FileInfo info; - if (getFileInfo(factory, f, info)) + + pool.add([this, f, &factory, &indexes]() { - filecount++; - if (!isFileIndexed(indexes, info)) + FileInfo info; + if (getFileInfo(factory, f, info)) { - if (createFeature(indexes, info)) - m_log->get(LogLevel::Info) << "Indexed file " << f << - std::endl; - else - m_log->get(LogLevel::Error) << "Failed to create feature " - "for file '" << f << "'" << std::endl; + if (!isFileIndexed(indexes, info)) + { + if (createFeature(indexes, info)) + m_log->get(LogLevel::Info) << "Indexed file " << f << + std::endl; + else + m_log->get(LogLevel::Error) << "Failed to create feature " + "for file '" << f << "'" << std::endl; + } } } - const std::chrono::time_point end = - std::chrono::steady_clock::now(); - std::cout << "file processing took " << std::chrono::duration_cast(end - start).count() << " milliseconds\n"; + ); + // this doesn't tell us anything useful - something needs to be incremented in + // the lambda, or we need another way to determine no files were indexed. + filecount++; } + pool.await(); + const std::chrono::time_point end = + std::chrono::steady_clock::now(); + std::cout << "file processing took " << std::chrono::duration_cast(end - start).count() << " milliseconds\n"; if (!filecount) throw pdal_error("Couldn't index any files."); OGR_DS_Destroy(m_dataset); diff --git a/kernels/TIndexKernel.hpp b/kernels/TIndexKernel.hpp index 85c734127e..b983209f8b 100644 --- a/kernels/TIndexKernel.hpp +++ b/kernels/TIndexKernel.hpp @@ -106,6 +106,7 @@ class PDAL_DLL TIndexKernel : public SubcommandKernel std::string m_wkt; BOX2D m_bounds; bool m_absPath; + int m_threads; void *m_dataset; void *m_layer;