Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement periodic concurrency manager/worker and inference profiler workflow #401

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/c++/perf_analyzer/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ set(
sequence_manager.cc
profile_data_collector.cc
profile_data_exporter.cc
periodic_concurrency_manager.cc
periodic_concurrency_worker.cc
)

set(
Expand Down Expand Up @@ -109,6 +111,8 @@ set(
request_record.h
profile_data_collector.h
profile_data_exporter.h
periodic_concurrency_manager.h
periodic_concurrency_worker.h
)

add_executable(
Expand Down
8 changes: 7 additions & 1 deletion src/c++/perf_analyzer/command_line_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ struct PerfAnalyzerParameters {
{
return (
using_concurrency_range || using_old_options ||
!(using_request_rate_range || using_custom_intervals));
!(using_request_rate_range || using_custom_intervals ||
is_using_periodic_concurrency_mode));
}

// Sets the threshold for PA client overhead.
Expand All @@ -148,6 +149,11 @@ struct PerfAnalyzerParameters {

// The profile export file path.
std::string profile_export_file{""};

bool is_using_periodic_concurrency_mode{false};

Range<uint64_t> periodic_concurrency_range{1, 1, 1};
nv-braf marked this conversation as resolved.
Show resolved Hide resolved
uint64_t periodic_concurrency_request_period{10};
};

using PAParamsPtr = std::shared_ptr<PerfAnalyzerParameters>;
Expand Down
19 changes: 10 additions & 9 deletions src/c++/perf_analyzer/concurrency_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,23 @@ class ConcurrencyManager : public LoadManager {
std::shared_ptr<ThreadStat>,
std::shared_ptr<ConcurrencyWorker::ThreadConfig>);

private:
ConcurrencyManager(
const bool async, const bool streaming, const int32_t batch_size,
const size_t max_threads, const size_t max_concurrency,
const SharedMemoryType shared_memory_type, const size_t output_shm_size,
const std::shared_ptr<ModelParser>& parser,
const std::shared_ptr<cb::ClientBackendFactory>& factory);

// The number of worker threads with non-zero concurrencies
size_t active_threads_;

bool execute_;
nv-braf marked this conversation as resolved.
Show resolved Hide resolved

size_t max_concurrency_;

std::vector<std::shared_ptr<ConcurrencyWorker::ThreadConfig>> threads_config_;

private:
void InitManagerFinalize() override;

// Pause all worker threads that are working on sequences
Expand All @@ -118,14 +127,6 @@ class ConcurrencyManager : public LoadManager {
//
void ResumeSequenceWorkers();

// The number of worker threads with non-zero concurrencies
size_t active_threads_;

bool execute_;

size_t max_concurrency_;
std::vector<std::shared_ptr<ConcurrencyWorker::ThreadConfig>> threads_config_;

#ifndef DOCTEST_CONFIG_DISABLE
friend TestConcurrencyManager;

Expand Down
47 changes: 24 additions & 23 deletions src/c++/perf_analyzer/concurrency_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,33 +46,34 @@ ConcurrencyWorker::Infer()

// run inferencing until receiving exit signal to maintain server load.
do {
HandleExecuteOff();

if (HandleNoConcurrency()) {
return;
}

CreateContextsAsNecessary();

if (HandleExitConditions()) {
return;
}

SendInferRequests();

if (HandleExitConditions()) {
return;
}

WaitForResponses();

if (HandleExitConditions()) {
return;
if (RunInference()) {
break;
}

} while (true);
}

bool
ConcurrencyWorker::RunInference()
{
HandleExecuteOff();
if (HandleNoConcurrency()) {
return true;
}
CreateContextsAsNecessary();
if (HandleExitConditions()) {
return true;
}
SendInferRequests();
if (HandleExitConditions()) {
return true;
}
WaitForResponses();
if (HandleExitConditions()) {
return true;
}
return false;
}

void
ConcurrencyWorker::CreateCtxIdTracker()
{
Expand Down
23 changes: 14 additions & 9 deletions src/c++/perf_analyzer/concurrency_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,11 @@ class NaggyMockConcurrencyWorker;
class ConcurrencyWorker : public LoadWorker {
public:
struct ThreadConfig {
ThreadConfig(size_t thread_id)
: thread_id_(thread_id), concurrency_(0), seq_stat_index_offset_(0),
is_paused_(false)
ThreadConfig(
size_t thread_id, size_t concurrency = 0,
size_t seq_stat_index_offset = 0)
: thread_id_(thread_id), concurrency_(concurrency),
seq_stat_index_offset_(seq_stat_index_offset), is_paused_(false)
{
}

Expand Down Expand Up @@ -91,7 +93,15 @@ class ConcurrencyWorker : public LoadWorker {
{
}

void Infer() override;
virtual void Infer() override;

protected:
bool RunInference();

void CreateCtxIdTracker();
nv-braf marked this conversation as resolved.
Show resolved Hide resolved

// Reserve vector size for contexts
void ReserveContexts();

private:
const size_t max_concurrency_;
Expand All @@ -101,11 +111,6 @@ class ConcurrencyWorker : public LoadWorker {

std::shared_ptr<ThreadConfig> thread_config_;

void CreateCtxIdTracker();

// Reserve vector size for contexts
void ReserveContexts();

// Handle the case where execute_ is false
void HandleExecuteOff();

Expand Down
7 changes: 7 additions & 0 deletions src/c++/perf_analyzer/infer_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ InferContext::AsyncCallbackFuncImpl(cb::InferResult* result)
return;
}
it->second.response_times_.push_back(std::chrono::system_clock::now());
num_responses_++;
debermudez marked this conversation as resolved.
Show resolved Hide resolved
if (is_null_response == true) {
it->second.has_null_last_response_ = true;
}
Expand All @@ -267,6 +268,7 @@ InferContext::AsyncCallbackFuncImpl(cb::InferResult* result)
return;
}
if (is_final_response) {
has_received_final_response_ = is_final_response;
thread_stat_->request_records_.emplace_back(
it->second.start_time_, it->second.response_times_,
it->second.sequence_end_, it->second.delayed_,
Expand All @@ -279,8 +281,13 @@ InferContext::AsyncCallbackFuncImpl(cb::InferResult* result)
}
}

if (worker_callback_) {
worker_callback_(id_);
}

if (is_final_response) {
total_ongoing_requests_--;
num_responses_ = 0;

if (async_callback_finalize_func_ != nullptr) {
async_callback_finalize_func_(id_);
Expand Down
13 changes: 13 additions & 0 deletions src/c++/perf_analyzer/infer_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,18 +116,28 @@ class InferContext {
// object and have not returned
uint GetNumOngoingRequests() { return total_ongoing_requests_; }

// Returns the number of responses for the current request
uint64_t GetNumResponsesForCurrentRequest() { return num_responses_; }

// Register a function that will get called after every async request returns
void RegisterAsyncCallbackFinalize(std::function<void(uint32_t)> callback)
{
async_callback_finalize_func_ = callback;
}

void RegisterWorkerCallback(std::function<void(uint32_t)> worker_callback)
{
worker_callback_ = worker_callback;
}

// TODO REFACTOR TMA-1043 this should be in memory class
void SetNumActiveThreads(size_t num_threads)
{
num_active_threads_ = num_threads;
}

bool HasReceivedFinalResponse() { return has_received_final_response_; }

protected:
/// A helper function to issue inference request to the server.
/// \param request_id The unique id to be associated with the request.
Expand Down Expand Up @@ -191,6 +201,9 @@ class InferContext {
std::reference_wrapper<const bool> execute_{execute_placeholder_};

std::shared_ptr<SequenceManager> sequence_manager_{nullptr};
uint64_t num_responses_{0};
std::function<void(uint32_t)> worker_callback_{nullptr};
bool has_received_final_response_{false};

#ifndef DOCTEST_CONFIG_DISABLE
friend NaggyMockInferContext;
Expand Down
13 changes: 13 additions & 0 deletions src/c++/perf_analyzer/inference_profiler.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
#include "metrics_manager.h"
#include "model_parser.h"
#include "mpi_utils.h"
#include "periodic_concurrency_manager.h"
#include "profile_data_collector.h"
#include "request_rate_manager.h"

Expand Down Expand Up @@ -306,6 +307,18 @@ class InferenceProfiler {
return cb::Error::Success;
}

cb::Error ProfilePeriodicConcurrencyMode()
{
auto& manager{dynamic_cast<PeriodicConcurrencyManager&>(*manager_)};
std::vector<RequestRecord> request_records{manager.RunExperiment()};
// FIXME - Refactor collector class to not need ID or window in the case of
// periodic concurrency mode
InferenceLoadMode id{1, 0.0};
nv-braf marked this conversation as resolved.
Show resolved Hide resolved
collector_->AddWindow(id, 0, UINT64_MAX);
collector_->AddData(id, std::move(request_records));
return cb::Error::Success;
}

bool IncludeServerStats() { return include_server_stats_; }

private:
Expand Down
18 changes: 17 additions & 1 deletion src/c++/perf_analyzer/perf_analyzer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "perf_analyzer.h"

#include "perf_analyzer_exception.h"
#include "periodic_concurrency_manager.h"
#include "report_writer.h"
#include "request_rate_manager.h"

Expand Down Expand Up @@ -159,6 +160,12 @@ PerfAnalyzer::CreateAnalyzerObjects()
}

std::unique_ptr<pa::LoadManager> manager;
params_->is_using_periodic_concurrency_mode = true;
params_->periodic_concurrency_range = {
std::stoi(std::getenv("MY_START")), std::stoi(std::getenv("MY_END")),
debermudez marked this conversation as resolved.
Show resolved Hide resolved
std::stoi(std::getenv("MY_STEP"))};
params_->periodic_concurrency_request_period =
std::stoi(std::getenv("MY_REQUEST_PERIOD"));

if (params_->targeting_concurrency()) {
if ((parser_->SchedulerType() == pa::ModelParser::SEQUENCE) ||
Expand Down Expand Up @@ -209,6 +216,13 @@ PerfAnalyzer::CreateAnalyzerObjects()
factory, &manager),
"failed to create concurrency manager");

} else if (params_->is_using_periodic_concurrency_mode) {
manager = std::make_unique<pa::PeriodicConcurrencyManager>(
params_->async, params_->streaming, params_->batch_size,
params_->max_threads, params_->max_concurrency,
params_->shared_memory_type, params_->output_shm_size, parser_, factory,
params_->periodic_concurrency_range,
params_->periodic_concurrency_request_period);
} else if (params_->using_request_rate_range) {
if ((params_->sequence_id_range != 0) &&
(params_->sequence_id_range < params_->num_of_sequences)) {
Expand Down Expand Up @@ -370,6 +384,8 @@ PerfAnalyzer::Profile()
err = profiler_->Profile<size_t>(
params_->concurrency_range.start, params_->concurrency_range.end,
params_->concurrency_range.step, params_->search_mode, perf_statuses_);
} else if (params_->is_using_periodic_concurrency_mode) {
err = profiler_->ProfilePeriodicConcurrencyMode();
} else {
err = profiler_->Profile<double>(
params_->request_rate_range[pa::SEARCH_RANGE::kSTART],
Expand All @@ -393,7 +409,7 @@ PerfAnalyzer::Profile()
void
PerfAnalyzer::WriteReport()
{
if (!perf_statuses_.size()) {
if (!perf_statuses_.size() || params_->is_using_periodic_concurrency_mode) {
return;
}

Expand Down
Loading
Loading