Skip to content

Commit

Permalink
Storages: Add statistical data of TableScanning in ScanContext (pingc…
Browse files Browse the repository at this point in the history
  • Loading branch information
JinheLin authored and JaySon-Huang committed May 7, 2024
1 parent a92744f commit 06fef86
Show file tree
Hide file tree
Showing 32 changed files with 636 additions and 178 deletions.
2 changes: 1 addition & 1 deletion contrib/tipb
4 changes: 2 additions & 2 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1184,7 +1184,7 @@ void DAGStorageInterpreter::buildLocalStreams(DAGPipeline & pipeline, size_t max
size_t total_local_region_num = mvcc_query_info->regions_query_info.size();
if (total_local_region_num == 0)
return;
mvcc_query_info->scan_context->total_local_region_num = total_local_region_num;
mvcc_query_info->scan_context->setRegionNumOfCurrentInstance(total_local_region_num);
const auto table_query_infos = generateSelectQueryInfos();
bool has_multiple_partitions = table_query_infos.size() > 1;
// MultiPartitionStreamPool will be disabled in no partition mode or single-partition case
Expand Down Expand Up @@ -1250,7 +1250,7 @@ void DAGStorageInterpreter::buildLocalExec(
size_t total_local_region_num = mvcc_query_info->regions_query_info.size();
if (total_local_region_num == 0)
return;
mvcc_query_info->scan_context->total_local_region_num = total_local_region_num;
mvcc_query_info->scan_context->setRegionNumOfCurrentInstance(total_local_region_num);
const auto table_query_infos = generateSelectQueryInfos();
bool has_multiple_partitions = table_query_infos.size() > 1;
ConcatBuilderPool builder_pool{max_streams};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ bool equalSummaries(const ExecutionSummary & left, const ExecutionSummary & righ
(left.num_iterations == right.num_iterations) && //
(left.num_produced_rows == right.num_produced_rows) && //
(left.time_processed_ns == right.time_processed_ns) && //
(left.scan_context->total_dmfile_scanned_rows == right.scan_context->total_dmfile_scanned_rows) && //
(left.scan_context->total_dmfile_skipped_rows == right.scan_context->total_dmfile_skipped_rows);
(left.scan_context->dmfile_data_scanned_rows == right.scan_context->dmfile_data_scanned_rows) && //
(left.scan_context->dmfile_data_skipped_rows == right.scan_context->dmfile_data_skipped_rows);
}

struct MockWriter
Expand All @@ -73,10 +73,12 @@ struct MockWriter
summary.concurrency = 1;
summary.scan_context = std::make_unique<DM::ScanContext>();

summary.scan_context->total_dmfile_scanned_packs = 1;
summary.scan_context->total_dmfile_skipped_packs = 2;
summary.scan_context->total_dmfile_scanned_rows = 8000;
summary.scan_context->total_dmfile_skipped_rows = 15000;
summary.scan_context->dmfile_data_scanned_rows = 8000;
summary.scan_context->dmfile_data_skipped_rows = 15000;
summary.scan_context->dmfile_mvcc_scanned_rows = 8000;
summary.scan_context->dmfile_mvcc_skipped_rows = 15000;
summary.scan_context->dmfile_lm_filter_scanned_rows = 8000;
summary.scan_context->dmfile_lm_filter_skipped_rows = 15000;
summary.scan_context->total_dmfile_rough_set_index_check_time_ns = 10;
summary.scan_context->total_dmfile_read_time_ns = 200;
summary.scan_context->create_snapshot_time_ns = 5;
Expand Down
9 changes: 9 additions & 0 deletions dbms/src/Flash/Statistics/TableScanImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,15 @@ void TableScanStatistics::collectExtraRuntimeDetail()
});
break;
}

if (auto it = dag_context.scan_context_map.find(executor_id); it != dag_context.scan_context_map.end())
{
it->second->setStreamCost(
std::max(local_table_scan_detail.min_stream_cost_ns, 0.0),
std::max(local_table_scan_detail.max_stream_cost_ns, 0.0),
std::max(remote_table_scan_detail.min_stream_cost_ns, 0.0),
std::max(remote_table_scan_detail.max_stream_cost_ns, 0.0));
}
}

TableScanStatistics::TableScanStatistics(const tipb::Executor * executor, DAGContext & dag_context_)
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Flash/Statistics/TableScanImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ namespace DB
{
struct TableScanDetail : public ConnectionProfileInfo
{
bool is_local;
const bool is_local;
double min_stream_cost_ns = -1.0;
double max_stream_cost_ns = -1.0;

explicit TableScanDetail(bool is_local_)
: is_local(is_local_)
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ struct Settings
* but we are not going to do it, because settings is used everywhere as static struct fields.
*/

// clang-format off
// clang-format off
#define APPLY_FOR_SETTINGS(M) \
M(SettingString, regions, "", "Deprecated. the region need to be read.") \
M(SettingBool, resolve_locks, false, "resolve locks for TiDB transaction") \
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -951,6 +951,8 @@ int Server::main(const std::vector<std::string> & /*args*/)

TiFlashErrorRegistry::instance(); // This invocation is for initializing

DM::ScanContext::initCurrentInstanceId(config(), log);

const auto disaggregated_mode = getDisaggregatedMode(config());
const auto use_autoscaler = useAutoScaler(config());
const bool use_autoscaler_without_s3 = useAutoScalerWithoutS3(config());
Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
#include <Core/Block.h>
#include <IO/WriteHelpers.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileDataProvider_fwd.h>
#include <Storages/DeltaMerge/DMContext.h>
#include <Storages/DeltaMerge/File/DMFile.h>
#include <Storages/DeltaMerge/ReadMode.h>
#include <Storages/DeltaMerge/RowKeyRange.h>
#include <Storages/DeltaMerge/StoragePool/StoragePool_fwd.h>
#include <Storages/Page/PageDefinesBase.h>
Expand Down Expand Up @@ -178,6 +180,8 @@ class ColumnFileReader

/// Create a new reader from current reader with different columns to read.
virtual ColumnFileReaderPtr createNewReader(const ColumnDefinesPtr & col_defs) = 0;

virtual void setReadTag(ReadTag /*read_tag*/) {}
};

std::pair<size_t, size_t> copyColumnsData(
Expand Down
8 changes: 8 additions & 0 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ void ColumnFileBigReader::initStream()
DMFileBlockInputStreamBuilder builder(context.db_context);
file_stream
= builder.setTracingID(context.tracing_id)
.setReadTag(read_tag)
.build(column_file.getFile(), *col_defs, RowKeyRanges{column_file.segment_range}, context.scan_context);

header = file_stream->getHeader();
Expand Down Expand Up @@ -386,5 +387,12 @@ ColumnFileReaderPtr ColumnFileBigReader::createNewReader(const ColumnDefinesPtr
return std::make_shared<ColumnFileBigReader>(context, column_file, new_col_defs);
}

void ColumnFileBigReader::setReadTag(ReadTag read_tag_)
{
// `read_tag` should be set before `file_stream` is initialized.
RUNTIME_CHECK(file_stream == nullptr);
read_tag = read_tag_;
}

} // namespace DM
} // namespace DB
4 changes: 4 additions & 0 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ class ColumnFileBigReader : public ColumnFileReader
Block cur_block;
Columns cur_block_data; // The references to columns in cur_block, for faster access.

ReadTag read_tag;

private:
void initStream();
std::pair<size_t, size_t> readRowsRepeatedly(
Expand Down Expand Up @@ -190,6 +192,8 @@ class ColumnFileBigReader : public ColumnFileReader
size_t skipNextBlock() override;

ColumnFileReaderPtr createNewReader(const ColumnDefinesPtr & new_col_defs) override;

void setReadTag(ReadTag read_tag_) override;
};

} // namespace DM
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ ColumnFileSetReader::ColumnFileSetReader(
const DMContext & context_,
const ColumnFileSetSnapshotPtr & snapshot_,
const ColumnDefinesPtr & col_defs_,
const RowKeyRange & segment_range_)
const RowKeyRange & segment_range_,
ReadTag read_tag_)
: context(context_)
, snapshot(snapshot_)
, col_defs(col_defs_)
Expand All @@ -98,10 +99,11 @@ ColumnFileSetReader::ColumnFileSetReader(
column_file_rows.push_back(f->getRows());
column_file_rows_end.push_back(total_rows);
column_file_readers.push_back(f->getReader(context, snapshot->getDataProvider(), col_defs));
column_file_readers.back()->setReadTag(read_tag_);
}
}

ColumnFileSetReaderPtr ColumnFileSetReader::createNewReader(const ColumnDefinesPtr & new_col_defs)
ColumnFileSetReaderPtr ColumnFileSetReader::createNewReader(const ColumnDefinesPtr & new_col_defs, ReadTag read_tag)
{
auto * new_reader = new ColumnFileSetReader(context);
new_reader->snapshot = snapshot;
Expand All @@ -111,7 +113,10 @@ ColumnFileSetReaderPtr ColumnFileSetReader::createNewReader(const ColumnDefinesP
new_reader->column_file_rows_end = column_file_rows_end;

for (auto & fr : column_file_readers)
{
new_reader->column_file_readers.push_back(fr->createNewReader(new_col_defs));
new_reader->column_file_readers.back()->setReadTag(read_tag);
}

return std::shared_ptr<ColumnFileSetReader>(new_reader);
}
Expand Down
10 changes: 6 additions & 4 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,12 @@ class ColumnFileSetReader
const DMContext & context_,
const ColumnFileSetSnapshotPtr & snapshot_,
const ColumnDefinesPtr & col_defs_,
const RowKeyRange & segment_range_);
const RowKeyRange & segment_range_,
ReadTag read_tag_);

// If we need to read columns besides pk and version, a ColumnFileSetReader can NOT be used more than once.
// This method create a new reader based on the current one. It will reuse some caches in the current reader.
ColumnFileSetReaderPtr createNewReader(const ColumnDefinesPtr & new_col_defs);
ColumnFileSetReaderPtr createNewReader(const ColumnDefinesPtr & new_col_defs, ReadTag read_tag);

// Use for DeltaMergeBlockInputStream to read rows from MemTableSet to do full compaction with other layer.
// This method will check whether offset and limit are valid. It only return those valid rows.
Expand Down Expand Up @@ -104,8 +105,9 @@ class ColumnFileSetInputStream : public SkippableBlockInputStream
const DMContext & context_,
const ColumnFileSetSnapshotPtr & delta_snap_,
const ColumnDefinesPtr & col_defs_,
const RowKeyRange & segment_range_)
: reader(context_, delta_snap_, col_defs_, segment_range_)
const RowKeyRange & segment_range_,
ReadTag read_tag_)
: reader(context_, delta_snap_, col_defs_, segment_range_, read_tag_)
, column_files(reader.snapshot->getColumnFiles())
, column_files_count(column_files.size())
{}
Expand Down
17 changes: 12 additions & 5 deletions dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h
Original file line number Diff line number Diff line change
Expand Up @@ -427,11 +427,12 @@ class DeltaValueReader
const DMContext & context_,
const DeltaSnapshotPtr & delta_snap_,
const ColumnDefinesPtr & col_defs_,
const RowKeyRange & segment_range_);
const RowKeyRange & segment_range_,
ReadTag read_tag_);

// If we need to read columns besides pk and version, a DeltaValueReader can NOT be used more than once.
// This method create a new reader based on then current one. It will reuse some caches in the current reader.
DeltaValueReaderPtr createNewReader(const ColumnDefinesPtr & new_col_defs);
DeltaValueReaderPtr createNewReader(const ColumnDefinesPtr & new_col_defs, ReadTag read_tag);

void setDeltaIndex(const DeltaIndexCompactedPtr & delta_index_) { compacted_delta_index = delta_index_; }

Expand Down Expand Up @@ -473,9 +474,15 @@ class DeltaValueInputStream : public SkippableBlockInputStream
const DMContext & context_,
const DeltaSnapshotPtr & delta_snap_,
const ColumnDefinesPtr & col_defs_,
const RowKeyRange & segment_range_)
: mem_table_input_stream(context_, delta_snap_->getMemTableSetSnapshot(), col_defs_, segment_range_)
, persisted_files_input_stream(context_, delta_snap_->getPersistedFileSetSnapshot(), col_defs_, segment_range_)
const RowKeyRange & segment_range_,
ReadTag read_tag_)
: mem_table_input_stream(context_, delta_snap_->getMemTableSetSnapshot(), col_defs_, segment_range_, read_tag_)
, persisted_files_input_stream(
context_,
delta_snap_->getPersistedFileSetSnapshot(),
col_defs_,
segment_range_,
read_tag_)
{}

String getName() const override { return "DeltaValue"; }
Expand Down
15 changes: 9 additions & 6 deletions dbms/src/Storages/DeltaMerge/Delta/Snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,29 +71,32 @@ DeltaValueReader::DeltaValueReader(
const DMContext & context,
const DeltaSnapshotPtr & delta_snap_,
const ColumnDefinesPtr & col_defs_,
const RowKeyRange & segment_range_)
const RowKeyRange & segment_range_,
ReadTag read_tag_)
: delta_snap(delta_snap_)
, mem_table_reader(std::make_shared<ColumnFileSetReader>(
context,
delta_snap_->getMemTableSetSnapshot(),
col_defs_,
segment_range_))
segment_range_,
read_tag_))
, persisted_files_reader(std::make_shared<ColumnFileSetReader>(
context,
delta_snap_->getPersistedFileSetSnapshot(),
col_defs_,
segment_range_))
segment_range_,
read_tag_))
, col_defs(col_defs_)
, segment_range(segment_range_)
{}

DeltaValueReaderPtr DeltaValueReader::createNewReader(const ColumnDefinesPtr & new_col_defs)
DeltaValueReaderPtr DeltaValueReader::createNewReader(const ColumnDefinesPtr & new_col_defs, ReadTag read_tag)
{
auto * new_reader = new DeltaValueReader();
new_reader->delta_snap = delta_snap;
new_reader->compacted_delta_index = compacted_delta_index;
new_reader->persisted_files_reader = persisted_files_reader->createNewReader(new_col_defs);
new_reader->mem_table_reader = mem_table_reader->createNewReader(new_col_defs);
new_reader->persisted_files_reader = persisted_files_reader->createNewReader(new_col_defs, read_tag);
new_reader->mem_table_reader = mem_table_reader->createNewReader(new_col_defs, read_tag);
new_reader->col_defs = new_col_defs;
new_reader->segment_range = segment_range;

Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ DMFileBlockInputStreamPtr DMFileBlockInputStreamBuilder::build(
read_one_pack_every_time,
tracing_id,
max_sharing_column_bytes_for_all,
scan_context);
scan_context,
read_tag);

return std::make_shared<DMFileBlockInputStream>(std::move(reader), max_sharing_column_bytes_for_all > 0);
}
Expand Down
7 changes: 7 additions & 0 deletions dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,12 @@ class DMFileBlockInputStreamBuilder
return *this;
}

DMFileBlockInputStreamBuilder & setReadTag(ReadTag read_tag_)
{
read_tag = read_tag_;
return *this;
}

private:
// These methods are called by the ctor

Expand Down Expand Up @@ -196,6 +202,7 @@ class DMFileBlockInputStreamBuilder
bool read_one_pack_every_time = false;
size_t max_sharing_column_bytes_for_all = 0;
String tracing_id;
ReadTag read_tag = ReadTag::Internal;
};

/**
Expand Down
Loading

0 comments on commit 06fef86

Please sign in to comment.