Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

[NSE856] optimize binary buffer allocation #953

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 113 additions & 59 deletions native-sql-engine/cpp/src/benchmarks/shuffle_split_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <parquet/file_reader.h>
#include <sched.h>
#include <shuffle/splitter.h>
#include <stdlib.h>
#include <sys/mman.h>

#include <chrono>
Expand Down Expand Up @@ -159,7 +160,13 @@ class LargePageMemoryPool : public MemoryPool {

class BenchmarkShuffleSplit {
public:
BenchmarkShuffleSplit(std::string file_name) { GetRecordBatchReader(file_name); }
BenchmarkShuffleSplit(const std::string& file_name,
const std::string& spark_local_dirs = "") {
GetRecordBatchReader(file_name);
setenv("NATIVESQL_SPARK_LOCAL_DIRS", spark_local_dirs.c_str(), 1);

ARROW_ASSIGN_OR_THROW(spark_local_dirs_, GetConfiguredLocalDirs());
}

void GetRecordBatchReader(const std::string& input_file) {
std::unique_ptr<::parquet::arrow::FileReader> parquet_reader;
Expand Down Expand Up @@ -213,19 +220,24 @@ class BenchmarkShuffleSplit {
options.compression_type = compression_type;
options.buffer_size = split_buffer_size;
options.buffered_write = true;
options.offheap_per_task = 128 * 1024 * 1024 * 1024L;
options.offheap_per_task = 7 * 1024 * 1024 * 1024L;
options.prefer_spill = true;
options.write_schema = false;
options.memory_pool = pool.get();

options.data_file = arrow::fs::internal::ConcatAbstractPath(
spark_local_dirs_[state.thread_index() % spark_local_dirs_.size()],
"temp_shuffle_" + GenerateUUID());

std::shared_ptr<Splitter> splitter;
int64_t elapse_read = 0;
int64_t num_batches = 0;
int64_t num_rows = 0;
int64_t split_time = 0;
int64_t dur_time = 0;

Do_Split(splitter, elapse_read, num_batches, num_rows, split_time, num_partitions,
options, state);
Do_Split(splitter, elapse_read, num_batches, num_rows, split_time, dur_time,
num_partitions, options, state);

auto fs = std::make_shared<arrow::fs::LocalFileSystem>();
fs->DeleteFile(splitter->DataFile());
Expand Down Expand Up @@ -285,6 +297,8 @@ class BenchmarkShuffleSplit {
splitter->TotalWriteTime();
state.counters["split_time"] = benchmark::Counter(
split_time, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000);
state.counters["elapsed_time"] = benchmark::Counter(
dur_time, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000);
splitter.reset();
}

Expand All @@ -297,7 +311,7 @@ class BenchmarkShuffleSplit {
}
virtual void Do_Split(std::shared_ptr<Splitter>& splitter, int64_t& elapse_read,
int64_t& num_batches, int64_t& num_rows, int64_t& split_time,
const int num_partitions, SplitOptions options,
int64_t& dur_time, const int num_partitions, SplitOptions options,
benchmark::State& state) {}

protected:
Expand All @@ -308,39 +322,42 @@ class BenchmarkShuffleSplit {
std::shared_ptr<arrow::Schema> schema;
std::vector<std::shared_ptr<::gandiva::Expression>> expr_vector;
parquet::ArrowReaderProperties properties;
std::vector<std::string> spark_local_dirs_;
};

class BenchmarkShuffleSplit_CacheScan_Benchmark : public BenchmarkShuffleSplit {
public:
BenchmarkShuffleSplit_CacheScan_Benchmark(std::string filename)
: BenchmarkShuffleSplit(filename) {}
BenchmarkShuffleSplit_CacheScan_Benchmark(std::string filename,
const std::string& spark_local_dirs = "")
: BenchmarkShuffleSplit(filename, spark_local_dirs) {}

protected:
void Do_Split(std::shared_ptr<Splitter>& splitter, int64_t& elapse_read,
int64_t& num_batches, int64_t& num_rows, int64_t& split_time,
const int num_partitions, SplitOptions options, benchmark::State& state) {
int64_t& dur_time, const int num_partitions, SplitOptions options,
benchmark::State& state) {
std::vector<int> local_column_indices;
// local_column_indices.push_back(0);
/* local_column_indices.push_back(0);
local_column_indices.push_back(1);
local_column_indices.push_back(2);
local_column_indices.push_back(4);
local_column_indices.push_back(5);
local_column_indices.push_back(6);
local_column_indices.push_back(7);
*/

local_column_indices.push_back(0);
local_column_indices.push_back(1);
local_column_indices.push_back(2);
local_column_indices.push_back(4);
local_column_indices.push_back(5);
local_column_indices.push_back(6);
local_column_indices.push_back(8);
local_column_indices.push_back(9);
local_column_indices.push_back(13);
local_column_indices.push_back(14);
local_column_indices.push_back(15);

std::shared_ptr<arrow::Schema> local_schema;
arrow::FieldVector fields;
fields.push_back(schema->field(0));
fields.push_back(schema->field(1));
fields.push_back(schema->field(2));
fields.push_back(schema->field(4));
fields.push_back(schema->field(5));
fields.push_back(schema->field(6));
fields.push_back(schema->field(8));
fields.push_back(schema->field(9));
fields.push_back(schema->field(13));
fields.push_back(schema->field(14));
fields.push_back(schema->field(15));
local_schema = std::make_shared<arrow::Schema>(fields);

Expand All @@ -360,6 +377,8 @@ class BenchmarkShuffleSplit_CacheScan_Benchmark : public BenchmarkShuffleSplit {
std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
ASSERT_NOT_OK(parquet_reader->GetRecordBatchReader(
row_group_indices, local_column_indices, &record_batch_reader));

auto start_time = std::chrono::steady_clock::now();
do {
TIME_NANO_OR_THROW(elapse_read, record_batch_reader->ReadNext(&record_batch));

Expand All @@ -384,26 +403,52 @@ class BenchmarkShuffleSplit_CacheScan_Benchmark : public BenchmarkShuffleSplit {
}

TIME_NANO_OR_THROW(split_time, splitter->Stop());
auto end_time = std::chrono::steady_clock::now();
dur_time = std::chrono::duration_cast<std::chrono::nanoseconds>(end_time - start_time)
.count();
}
};

class BenchmarkShuffleSplit_IterateScan_Benchmark : public BenchmarkShuffleSplit {
public:
BenchmarkShuffleSplit_IterateScan_Benchmark(std::string filename)
: BenchmarkShuffleSplit(filename) {}
BenchmarkShuffleSplit_IterateScan_Benchmark(std::string filename,
const std::string& spark_local_dirs = "")
: BenchmarkShuffleSplit(filename, spark_local_dirs) {}

protected:
void Do_Split(std::shared_ptr<Splitter>& splitter, int64_t& elapse_read,
int64_t& num_batches, int64_t& num_rows, int64_t& split_time,
const int num_partitions, SplitOptions options, benchmark::State& state) {
if (state.thread_index() == 0) std::cout << schema->ToString() << std::endl;
int64_t& dur_time, const int num_partitions, SplitOptions options,
benchmark::State& state) {
// if (state.thread_index() == 0) std::cout << schema->ToString() << std::endl;

std::vector<int> local_column_indices;

local_column_indices.push_back(0);
local_column_indices.push_back(1);
local_column_indices.push_back(2);
local_column_indices.push_back(4);
local_column_indices.push_back(5);
local_column_indices.push_back(6);

std::shared_ptr<arrow::Schema> local_schema;
arrow::FieldVector fields;
fields.push_back(schema->field(0));
fields.push_back(schema->field(1));
fields.push_back(schema->field(2));
fields.push_back(schema->field(4));
fields.push_back(schema->field(5));
fields.push_back(schema->field(6));
local_schema = std::make_shared<arrow::Schema>(fields);

if (state.thread_index() == 0) std::cout << local_schema->ToString() << std::endl;

if (!expr_vector.empty()) {
ARROW_ASSIGN_OR_THROW(splitter, Splitter::Make("hash", schema, num_partitions,
ARROW_ASSIGN_OR_THROW(splitter, Splitter::Make("hash", local_schema, num_partitions,
expr_vector, std::move(options)));
} else {
ARROW_ASSIGN_OR_THROW(
splitter, Splitter::Make("rr", schema, num_partitions, std::move(options)));
ARROW_ASSIGN_OR_THROW(splitter, Splitter::Make("rr", local_schema, num_partitions,
std::move(options)));
}

std::shared_ptr<arrow::RecordBatch> record_batch;
Expand All @@ -414,10 +459,11 @@ class BenchmarkShuffleSplit_IterateScan_Benchmark : public BenchmarkShuffleSplit
arrow::default_memory_pool(), ::parquet::ParquetFileReader::Open(file),
properties, &parquet_reader));

auto start_time = std::chrono::steady_clock::now();
for (auto _ : state) {
std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
ASSERT_NOT_OK(parquet_reader->GetRecordBatchReader(
row_group_indices, column_indices, &record_batch_reader));
row_group_indices, local_column_indices, &record_batch_reader));
TIME_NANO_OR_THROW(elapse_read, record_batch_reader->ReadNext(&record_batch));
while (record_batch) {
num_batches += 1;
Expand All @@ -427,6 +473,9 @@ class BenchmarkShuffleSplit_IterateScan_Benchmark : public BenchmarkShuffleSplit
}
}
TIME_NANO_OR_THROW(split_time, splitter->Stop());
auto end_time = std::chrono::steady_clock::now();
dur_time = std::chrono::duration_cast<std::chrono::nanoseconds>(end_time - start_time)
.count();
}
};

Expand Down Expand Up @@ -472,6 +521,7 @@ int main(int argc, char** argv) {
uint32_t partitions = 512;
uint32_t threads = 1;
std::string datafile;
std::string spark_local_dirs = "";

for (int i = 0; i < argc; i++) {
if (strcmp(argv[i], "--iterations") == 0) {
Expand All @@ -482,48 +532,52 @@ int main(int argc, char** argv) {
threads = atol(argv[i + 1]);
} else if (strcmp(argv[i], "--file") == 0) {
datafile = argv[i + 1];
} else if (strcmp(argv[i], "--local_dirs") == 0) {
spark_local_dirs = argv[i + 1];
}
}
std::cout << "iterations = " << iterations << std::endl;
std::cout << "partitions = " << partitions << std::endl;
std::cout << "threads = " << threads << std::endl;
std::cout << "datafile = " << datafile << std::endl;

sparkcolumnarplugin::shuffle::BenchmarkShuffleSplit_CacheScan_Benchmark bck(datafile);
/* sparkcolumnarplugin::shuffle::BenchmarkShuffleSplit_CacheScan_Benchmark
bck(datafile, spark_local_dirs);
benchmark::RegisterBenchmark("BenchmarkShuffleSplit::CacheScan", bck)
->Iterations(iterations)
->Args({partitions, arrow::Compression::FASTPFOR})
->Threads(threads)
->ReportAggregatesOnly(false)
->MeasureProcessCPUTime()
->Unit(benchmark::kSecond);
*/
sparkcolumnarplugin::shuffle::BenchmarkShuffleSplit_IterateScan_Benchmark bck(
datafile, spark_local_dirs);

benchmark::RegisterBenchmark("BenchmarkShuffleSplit::CacheScan", bck)
benchmark::RegisterBenchmark("BenchmarkShuffleSplit::IterateScan", bck)
->Iterations(iterations)
->Args({partitions, arrow::Compression::FASTPFOR})
->Threads(threads)
->ReportAggregatesOnly(false)
->Unit(benchmark::kSecond)
->MeasureProcessCPUTime()
->Unit(benchmark::kSecond);

/* sparkcolumnarplugin::shuffle::BenchmarkShuffleSplit_IterateScan_Benchmark
bck(datafile);

benchmark::RegisterBenchmark("BenchmarkShuffleSplit::IterateScan", bck)
->Iterations(1)
->Args({96*2, arrow::Compression::FASTPFOR})
->Args({96*4, arrow::Compression::FASTPFOR})
->Args({96*8, arrow::Compression::FASTPFOR})
->Args({96*16, arrow::Compression::FASTPFOR})
->Args({96*32, arrow::Compression::FASTPFOR})
->Threads(24)
->Unit(benchmark::kSecond);

benchmark::RegisterBenchmark("BenchmarkShuffleSplit::IterateScan", bck)
->Iterations(1)
->Args({4096, arrow::Compression::FASTPFOR})
->Threads(1)
->Threads(2)
->Threads(4)
->Threads(8)
->Threads(16)
->Threads(24)
->Unit(benchmark::kSecond);
*/
->UseRealTime();
/*
->Args({96*4, arrow::Compression::FASTPFOR})
->Args({96*8, arrow::Compression::FASTPFOR})
->Args({96*16, arrow::Compression::FASTPFOR})
->Args({96*32, arrow::Compression::FASTPFOR})
benchmark::RegisterBenchmark("BenchmarkShuffleSplit::IterateScan", bck)
->Iterations(1)
->Args({4096, arrow::Compression::FASTPFOR})
->Threads(1)
->Threads(2)
->Threads(4)
->Threads(8)
->Threads(16)
->Threads(24)
->Unit(benchmark::kSecond);
*/
benchmark::Initialize(&argc, argv);
benchmark::RunSpecifiedBenchmarks();
benchmark::Shutdown();
}
}
Loading