Skip to content

Commit

Permalink
initial threading - mostly works
Browse files Browse the repository at this point in the history
  • Loading branch information
ibell13 committed Oct 8, 2024
1 parent 0fa3995 commit 79bfeba
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 35 deletions.
48 changes: 28 additions & 20 deletions io/CopcReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,14 +234,14 @@ struct CopcReader::Args
struct CopcReader::Private
{
public:
std::mutex mutex;
std::queue<copc::Tile> contents;
std::unique_ptr<ThreadPool> pool;
std::unique_ptr<copc::Tile> currentTile;

std::unique_ptr<connector::Connector> connector;
std::queue<copc::Tile> contents;
copc::Hierarchy hierarchy;
las::LoaderDriver loader;
std::mutex mutex;
std::condition_variable contentsCv;
std::condition_variable consumedCv;
std::vector<PolyXform> polys;
Expand All @@ -255,7 +255,7 @@ struct CopcReader::Private
las::Header header;
copc::Info copc_info;
point_count_t hierarchyPointCount;
bool done;
std::atomic<bool> done;
SrsTransform llToBcbfTransform;
};

Expand All @@ -264,7 +264,12 @@ CopcReader::CopcReader() : m_args(new CopcReader::Args), m_p(new CopcReader::Pri


CopcReader::~CopcReader()
{}
{
// We join the pool rather than let the dtor do it because the workers use m_p
// which will get 0'ed in the dtor before access through m_p is complete in
// the workers.
done();
}


std::string CopcReader::getName() const
Expand Down Expand Up @@ -333,11 +338,11 @@ void CopcReader::setForwards(StringMap& headers, StringMap& query)

void CopcReader::initialize(PointTableRef table)
{
const std::size_t threads(m_args->threads);
if (threads > 100)
if (m_args->threads > 100)
log()->get(LogLevel::Warning) << "Using a large thread count: " <<
threads << " threads" << std::endl;
m_p->pool.reset(new ThreadPool(threads));
m_args->threads << " threads" << std::endl;
// Make sure we allow at least as many chunks as we have threads.
m_args->keepAliveChunkCount = (std::max)(m_args->threads, (size_t)m_args->keepAliveChunkCount);

StringMap headers;
StringMap query;
Expand Down Expand Up @@ -406,7 +411,7 @@ void CopcReader::initialize(PointTableRef table)
m_p->depthEnd = m_args->resolution ?
(std::max)(1, (int)ceil(log2(m_p->copc_info.spacing / m_args->resolution)) + 1) :
0;

if (m_args->resolution)
log()->get(LogLevel::Debug) << "Maximum depth: " << m_p->depthEnd <<
std::endl;
Expand Down Expand Up @@ -490,7 +495,7 @@ las::VlrList CopcReader::fetchSrsVlrs(const las::VlrCatalog& catalog)
fetchVlr(las::TransformUserId, las::LASFWkt2recordId);
fetchVlr(las::PdalUserId, las::PdalProjJsonRecordId);
fetchVlr(las::TransformUserId, las::WktRecordId);

// User told us to ditch them
if (m_args->nosrs)
vlrs.clear();
Expand Down Expand Up @@ -638,7 +643,6 @@ QuickInfo CopcReader::inspect()
qi.m_bounds.clip(b);
}
qi.m_valid = true;
done(t);

return qi;
}
Expand Down Expand Up @@ -672,6 +676,7 @@ void CopcReader::addDimensions(PointLayoutPtr layout)

void CopcReader::ready(PointTableRef table)
{
m_p->pool.reset(new ThreadPool(m_args->threads));
// Determine all overlapping data files we'll need to fetch.
try
{
Expand All @@ -694,7 +699,6 @@ void CopcReader::ready(PointTableRef table)
m_p->tileCount = m_p->hierarchy.size();
log()->get(LogLevel::Debug) << m_p->tileCount << " overlapping nodes" << std::endl;

m_p->pool.reset(new ThreadPool(m_p->pool->numThreads()));
m_p->done = false;
for (const copc::Entry& entry : m_p->hierarchy)
load(entry);
Expand Down Expand Up @@ -833,10 +837,9 @@ void CopcReader::load(const copc::Entry& entry)

// Put the tile on the output queue.
std::unique_lock<std::mutex> l(m_p->mutex);
if (m_p->done)
return;
while (m_p->contents.size() >= (std::max)((size_t)m_args->keepAliveChunkCount, m_p->pool->numThreads()))
m_p->consumedCv.wait(l);
m_p->consumedCv.wait(l, [this] {
return (m_p->done ||
m_p->contents.size() < (size_t)m_args->keepAliveChunkCount);});
m_p->contents.push(std::move(tile));
l.unlock();
m_p->contentsCv.notify_one();
Expand Down Expand Up @@ -1005,13 +1008,18 @@ bool CopcReader::processOne(PointRef& point)

void CopcReader::done(PointTableRef)
{
done();
}

void CopcReader::done()
{
if (m_p->pool)
{
std::unique_lock<std::mutex> l(m_p->mutex);
m_p->done = true;
m_p->consumedCv.notify_all();
m_p->pool->stop();
m_p->connector.reset();
}
m_p->consumedCv.notify_all();
m_p->pool->stop();
m_p->connector.reset();
}

} // namespace pdal
1 change: 1 addition & 0 deletions io/CopcReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ class PDAL_DLL CopcReader : public Reader, public Streamable
void validateVlrInfo(const las::Vlr& v, const copc::Info& i);
void createSpatialFilters();

void done();
void loadHierarchy();
void loadHierarchy(copc::Hierarchy& hierarchy, const copc::HierarchyPage& page,
const copc::Entry& entry);
Expand Down
41 changes: 26 additions & 15 deletions kernels/TIndexKernel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ void TIndexKernel::addSubSwitches(ProgramArgs& args,
"Write absolute rather than relative file paths", m_absPath);
args.add("stdin,s", "Read filespec pattern from standard input",
m_usestdin);
args.add("threads", "Number of threads to use for file processing",
m_threads);
}
else if (subcommand == "merge")
{
Expand Down Expand Up @@ -275,30 +277,39 @@ void TIndexKernel::createFile()

size_t filecount(0);
StageFactory factory(false);
ThreadPool pool(m_threads);
const std::chrono::time_point<std::chrono::steady_clock> start =
std::chrono::steady_clock::now();
for (auto f : m_files)
{
const std::chrono::time_point<std::chrono::steady_clock> start =
std::chrono::steady_clock::now();
//ABELL - Not sure why we need to get absolute path here.
f = FileUtils::toAbsolutePath(f);
FileInfo info;
if (getFileInfo(factory, f, info))

pool.add([this, f, &factory, &indexes]()
{
filecount++;
if (!isFileIndexed(indexes, info))
FileInfo info;
if (getFileInfo(factory, f, info))
{
if (createFeature(indexes, info))
m_log->get(LogLevel::Info) << "Indexed file " << f <<
std::endl;
else
m_log->get(LogLevel::Error) << "Failed to create feature "
"for file '" << f << "'" << std::endl;
if (!isFileIndexed(indexes, info))
{
if (createFeature(indexes, info))
m_log->get(LogLevel::Info) << "Indexed file " << f <<
std::endl;
else
m_log->get(LogLevel::Error) << "Failed to create feature "
"for file '" << f << "'" << std::endl;
}
}
}
const std::chrono::time_point<std::chrono::steady_clock> end =
std::chrono::steady_clock::now();
std::cout << "file processing took " << std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count() << " milliseconds\n";
);
// this doesn't tell us anything useful - something needs to be incremented in
// the lambda, or we need another way to determine no files were indexed.
filecount++;
}
pool.await();
const std::chrono::time_point<std::chrono::steady_clock> end =
std::chrono::steady_clock::now();
std::cout << "file processing took " << std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count() << " milliseconds\n";
if (!filecount)
throw pdal_error("Couldn't index any files.");
OGR_DS_Destroy(m_dataset);
Expand Down
1 change: 1 addition & 0 deletions kernels/TIndexKernel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ class PDAL_DLL TIndexKernel : public SubcommandKernel
std::string m_wkt;
BOX2D m_bounds;
bool m_absPath;
int m_threads;

void *m_dataset;
void *m_layer;
Expand Down

0 comments on commit 79bfeba

Please sign in to comment.