Skip to content

Commit

Permalink
Storages: Add statistical data of TableScanning in ScanContext (#8842)
Browse files Browse the repository at this point in the history
ref #8675
  • Loading branch information
JinheLin authored Mar 18, 2024
1 parent 1b3e193 commit 596a222
Show file tree
Hide file tree
Showing 29 changed files with 611 additions and 159 deletions.
2 changes: 1 addition & 1 deletion contrib/tipb
4 changes: 2 additions & 2 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1187,7 +1187,7 @@ void DAGStorageInterpreter::buildLocalStreams(DAGPipeline & pipeline, size_t max
size_t total_local_region_num = mvcc_query_info->regions_query_info.size();
if (total_local_region_num == 0)
return;
mvcc_query_info->scan_context->total_local_region_num = total_local_region_num;
mvcc_query_info->scan_context->setRegionNumOfCurrentInstance(total_local_region_num);
const auto table_query_infos = generateSelectQueryInfos();
bool has_multiple_partitions = table_query_infos.size() > 1;
// MultiPartitionStreamPool will be disabled in no partition mode or single-partition case
Expand Down Expand Up @@ -1253,7 +1253,7 @@ void DAGStorageInterpreter::buildLocalExec(
size_t total_local_region_num = mvcc_query_info->regions_query_info.size();
if (total_local_region_num == 0)
return;
mvcc_query_info->scan_context->total_local_region_num = total_local_region_num;
mvcc_query_info->scan_context->setRegionNumOfCurrentInstance(total_local_region_num);
const auto table_query_infos = generateSelectQueryInfos();
bool has_multiple_partitions = table_query_infos.size() > 1;
ConcatBuilderPool builder_pool{max_streams};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ bool equalSummaries(const ExecutionSummary & left, const ExecutionSummary & righ
(left.num_iterations == right.num_iterations) && //
(left.num_produced_rows == right.num_produced_rows) && //
(left.time_processed_ns == right.time_processed_ns) && //
(left.scan_context->total_dmfile_scanned_rows == right.scan_context->total_dmfile_scanned_rows) && //
(left.scan_context->total_dmfile_skipped_rows == right.scan_context->total_dmfile_skipped_rows);
(left.scan_context->dmfile_data_scanned_rows == right.scan_context->dmfile_data_scanned_rows) && //
(left.scan_context->dmfile_data_skipped_rows == right.scan_context->dmfile_data_skipped_rows);
}

struct MockWriter
Expand All @@ -72,10 +72,12 @@ struct MockWriter
summary.concurrency = 1;
summary.scan_context = std::make_unique<DM::ScanContext>();

summary.scan_context->total_dmfile_scanned_packs = 1;
summary.scan_context->total_dmfile_skipped_packs = 2;
summary.scan_context->total_dmfile_scanned_rows = 8000;
summary.scan_context->total_dmfile_skipped_rows = 15000;
summary.scan_context->dmfile_data_scanned_rows = 8000;
summary.scan_context->dmfile_data_skipped_rows = 15000;
summary.scan_context->dmfile_mvcc_scanned_rows = 8000;
summary.scan_context->dmfile_mvcc_skipped_rows = 15000;
summary.scan_context->dmfile_lm_filter_scanned_rows = 8000;
summary.scan_context->dmfile_lm_filter_skipped_rows = 15000;
summary.scan_context->total_dmfile_rough_set_index_check_time_ns = 10;
summary.scan_context->total_dmfile_read_time_ns = 200;
summary.scan_context->create_snapshot_time_ns = 5;
Expand Down
9 changes: 9 additions & 0 deletions dbms/src/Flash/Statistics/TableScanImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,15 @@ void TableScanStatistics::collectExtraRuntimeDetail()
});
break;
}

if (auto it = dag_context.scan_context_map.find(executor_id); it != dag_context.scan_context_map.end())
{
it->second->setStreamCost(
std::max(local_table_scan_detail.min_stream_cost_ns, 0.0),
std::max(local_table_scan_detail.max_stream_cost_ns, 0.0),
std::max(remote_table_scan_detail.min_stream_cost_ns, 0.0),
std::max(remote_table_scan_detail.max_stream_cost_ns, 0.0));
}
}

TableScanStatistics::TableScanStatistics(const tipb::Executor * executor, DAGContext & dag_context_)
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -949,6 +949,8 @@ int Server::main(const std::vector<std::string> & /*args*/)

TiFlashErrorRegistry::instance(); // This invocation is for initializing

DM::ScanContext::initCurrentInstanceId(config(), log);

const auto disaggregated_mode = getDisaggregatedMode(config());
const auto use_autoscaler = useAutoScaler(config());

Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <IO/WriteHelpers.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileDataProvider_fwd.h>
#include <Storages/DeltaMerge/DMContext_fwd.h>
#include <Storages/DeltaMerge/ReadMode.h>
#include <Storages/DeltaMerge/RowKeyRange.h>
#include <Storages/DeltaMerge/StoragePool/StoragePool_fwd.h>
#include <Storages/Page/PageDefinesBase.h>
Expand Down Expand Up @@ -177,6 +178,8 @@ class ColumnFileReader

/// Create a new reader from current reader with different columns to read.
virtual ColumnFileReaderPtr createNewReader(const ColumnDefinesPtr & col_defs) = 0;

virtual void setReadTag(ReadTag /*read_tag*/) {}
};

std::pair<size_t, size_t> copyColumnsData(
Expand Down
8 changes: 8 additions & 0 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ void ColumnFileBigReader::initStream()

DMFileBlockInputStreamBuilder builder(dm_context.global_context);
file_stream = builder.setTracingID(dm_context.tracing_id)
.setReadTag(read_tag)
.build(
column_file.getFile(),
*col_defs,
Expand Down Expand Up @@ -394,5 +395,12 @@ ColumnFileReaderPtr ColumnFileBigReader::createNewReader(const ColumnDefinesPtr
return std::make_shared<ColumnFileBigReader>(dm_context, column_file, new_col_defs);
}

void ColumnFileBigReader::setReadTag(ReadTag read_tag_)
{
// `read_tag` should be set before `file_stream` is initialized.
RUNTIME_CHECK(file_stream == nullptr);
read_tag = read_tag_;
}

} // namespace DM
} // namespace DB
4 changes: 4 additions & 0 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ class ColumnFileBigReader : public ColumnFileReader
Block cur_block;
Columns cur_block_data; // The references to columns in cur_block, for faster access.

ReadTag read_tag;

private:
void initStream();
std::pair<size_t, size_t> readRowsRepeatedly(
Expand Down Expand Up @@ -192,6 +194,8 @@ class ColumnFileBigReader : public ColumnFileReader
size_t skipNextBlock() override;

ColumnFileReaderPtr createNewReader(const ColumnDefinesPtr & new_col_defs) override;

void setReadTag(ReadTag read_tag_) override;
};

} // namespace DM
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ ColumnFileSetReader::ColumnFileSetReader(
const DMContext & context_,
const ColumnFileSetSnapshotPtr & snapshot_,
const ColumnDefinesPtr & col_defs_,
const RowKeyRange & segment_range_)
const RowKeyRange & segment_range_,
ReadTag read_tag_)
: context(context_)
, snapshot(snapshot_)
, col_defs(col_defs_)
Expand All @@ -98,10 +99,11 @@ ColumnFileSetReader::ColumnFileSetReader(
column_file_rows.push_back(f->getRows());
column_file_rows_end.push_back(total_rows);
column_file_readers.push_back(f->getReader(context, snapshot->getDataProvider(), col_defs));
column_file_readers.back()->setReadTag(read_tag_);
}
}

ColumnFileSetReaderPtr ColumnFileSetReader::createNewReader(const ColumnDefinesPtr & new_col_defs)
ColumnFileSetReaderPtr ColumnFileSetReader::createNewReader(const ColumnDefinesPtr & new_col_defs, ReadTag read_tag)
{
auto * new_reader = new ColumnFileSetReader(context);
new_reader->snapshot = snapshot;
Expand All @@ -111,7 +113,10 @@ ColumnFileSetReaderPtr ColumnFileSetReader::createNewReader(const ColumnDefinesP
new_reader->column_file_rows_end = column_file_rows_end;

for (auto & fr : column_file_readers)
{
new_reader->column_file_readers.push_back(fr->createNewReader(new_col_defs));
new_reader->column_file_readers.back()->setReadTag(read_tag);
}

return std::shared_ptr<ColumnFileSetReader>(new_reader);
}
Expand Down
10 changes: 6 additions & 4 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,12 @@ class ColumnFileSetReader
const DMContext & context_,
const ColumnFileSetSnapshotPtr & snapshot_,
const ColumnDefinesPtr & col_defs_,
const RowKeyRange & segment_range_);
const RowKeyRange & segment_range_,
ReadTag read_tag_);

// If we need to read columns besides pk and version, a ColumnFileSetReader can NOT be used more than once.
// This method create a new reader based on the current one. It will reuse some caches in the current reader.
ColumnFileSetReaderPtr createNewReader(const ColumnDefinesPtr & new_col_defs);
ColumnFileSetReaderPtr createNewReader(const ColumnDefinesPtr & new_col_defs, ReadTag read_tag);

// Use for DeltaMergeBlockInputStream to read rows from MemTableSet to do full compaction with other layer.
// This method will check whether offset and limit are valid. It only return those valid rows.
Expand Down Expand Up @@ -104,8 +105,9 @@ class ColumnFileSetInputStream : public SkippableBlockInputStream
const DMContext & context_,
const ColumnFileSetSnapshotPtr & delta_snap_,
const ColumnDefinesPtr & col_defs_,
const RowKeyRange & segment_range_)
: reader(context_, delta_snap_, col_defs_, segment_range_)
const RowKeyRange & segment_range_,
ReadTag read_tag_)
: reader(context_, delta_snap_, col_defs_, segment_range_, read_tag_)
, column_files(reader.snapshot->getColumnFiles())
, column_files_count(column_files.size())
{}
Expand Down
17 changes: 12 additions & 5 deletions dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h
Original file line number Diff line number Diff line change
Expand Up @@ -446,11 +446,12 @@ class DeltaValueReader
const DMContext & context_,
const DeltaSnapshotPtr & delta_snap_,
const ColumnDefinesPtr & col_defs_,
const RowKeyRange & segment_range_);
const RowKeyRange & segment_range_,
ReadTag read_tag_);

// If we need to read columns besides pk and version, a DeltaValueReader can NOT be used more than once.
// This method create a new reader based on then current one. It will reuse some caches in the current reader.
DeltaValueReaderPtr createNewReader(const ColumnDefinesPtr & new_col_defs);
DeltaValueReaderPtr createNewReader(const ColumnDefinesPtr & new_col_defs, ReadTag read_tag);

void setDeltaIndex(const DeltaIndexCompactedPtr & delta_index_) { compacted_delta_index = delta_index_; }

Expand Down Expand Up @@ -492,9 +493,15 @@ class DeltaValueInputStream : public SkippableBlockInputStream
const DMContext & context_,
const DeltaSnapshotPtr & delta_snap_,
const ColumnDefinesPtr & col_defs_,
const RowKeyRange & segment_range_)
: mem_table_input_stream(context_, delta_snap_->getMemTableSetSnapshot(), col_defs_, segment_range_)
, persisted_files_input_stream(context_, delta_snap_->getPersistedFileSetSnapshot(), col_defs_, segment_range_)
const RowKeyRange & segment_range_,
ReadTag read_tag_)
: mem_table_input_stream(context_, delta_snap_->getMemTableSetSnapshot(), col_defs_, segment_range_, read_tag_)
, persisted_files_input_stream(
context_,
delta_snap_->getPersistedFileSetSnapshot(),
col_defs_,
segment_range_,
read_tag_)
{}

String getName() const override { return "DeltaValue"; }
Expand Down
15 changes: 9 additions & 6 deletions dbms/src/Storages/DeltaMerge/Delta/Snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,29 +71,32 @@ DeltaValueReader::DeltaValueReader(
const DMContext & context,
const DeltaSnapshotPtr & delta_snap_,
const ColumnDefinesPtr & col_defs_,
const RowKeyRange & segment_range_)
const RowKeyRange & segment_range_,
ReadTag read_tag_)
: delta_snap(delta_snap_)
, mem_table_reader(std::make_shared<ColumnFileSetReader>(
context,
delta_snap_->getMemTableSetSnapshot(),
col_defs_,
segment_range_))
segment_range_,
read_tag_))
, persisted_files_reader(std::make_shared<ColumnFileSetReader>(
context,
delta_snap_->getPersistedFileSetSnapshot(),
col_defs_,
segment_range_))
segment_range_,
read_tag_))
, col_defs(col_defs_)
, segment_range(segment_range_)
{}

DeltaValueReaderPtr DeltaValueReader::createNewReader(const ColumnDefinesPtr & new_col_defs)
DeltaValueReaderPtr DeltaValueReader::createNewReader(const ColumnDefinesPtr & new_col_defs, ReadTag read_tag)
{
auto * new_reader = new DeltaValueReader();
new_reader->delta_snap = delta_snap;
new_reader->compacted_delta_index = compacted_delta_index;
new_reader->persisted_files_reader = persisted_files_reader->createNewReader(new_col_defs);
new_reader->mem_table_reader = mem_table_reader->createNewReader(new_col_defs);
new_reader->persisted_files_reader = persisted_files_reader->createNewReader(new_col_defs, read_tag);
new_reader->mem_table_reader = mem_table_reader->createNewReader(new_col_defs, read_tag);
new_reader->col_defs = new_col_defs;
new_reader->segment_range = segment_range;

Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ DMFileBlockInputStreamPtr DMFileBlockInputStreamBuilder::build(
read_one_pack_every_time,
tracing_id,
max_sharing_column_count,
scan_context);
scan_context,
read_tag);

return std::make_shared<DMFileBlockInputStream>(std::move(reader), max_sharing_column_count > 0);
}
Expand Down
7 changes: 7 additions & 0 deletions dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,12 @@ class DMFileBlockInputStreamBuilder
return *this;
}

DMFileBlockInputStreamBuilder & setReadTag(ReadTag read_tag_)
{
read_tag = read_tag_;
return *this;
}

private:
// These methods are called by the ctor

Expand Down Expand Up @@ -194,6 +200,7 @@ class DMFileBlockInputStreamBuilder
size_t max_sharing_column_bytes_for_all = 0;
size_t max_sharing_column_count = 0;
String tracing_id;
ReadTag read_tag = ReadTag::Internal;
};

/**
Expand Down
49 changes: 42 additions & 7 deletions dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ DMFileReader::DMFileReader(
bool read_one_pack_every_time_,
const String & tracing_id_,
size_t max_sharing_column_count,
const ScanContextPtr & scan_context_)
const ScanContextPtr & scan_context_,
const ReadTag read_tag_)
: dmfile(dmfile_)
, read_columns(read_columns_)
, is_common_handle(is_common_handle_)
Expand All @@ -81,6 +82,7 @@ DMFileReader::DMFileReader(
, mark_cache(mark_cache_)
, column_cache(column_cache_)
, scan_context(scan_context_)
, read_tag(read_tag_)
, rows_threshold_per_read(rows_threshold_per_read_)
, file_provider(file_provider_)
, log(Logger::get(tracing_id_))
Expand Down Expand Up @@ -130,8 +132,7 @@ bool DMFileReader::getSkippedRows(size_t & skip_rows)
for (; next_pack_id < use_packs.size() && !use_packs[next_pack_id]; ++next_pack_id)
{
skip_rows += pack_stats[next_pack_id].rows;
scan_context->total_dmfile_skipped_packs += 1;
scan_context->total_dmfile_skipped_rows += pack_stats[next_pack_id].rows;
addSkippedRows(pack_stats[next_pack_id].rows);
}
next_row_offset += skip_rows;
// return false if it is the end of stream.
Expand Down Expand Up @@ -165,10 +166,9 @@ size_t DMFileReader::skipNextBlock()
break;

read_rows += pack_stats[next_pack_id].rows;
scan_context->total_dmfile_skipped_packs += 1;
}

scan_context->total_dmfile_skipped_rows += read_rows;
addSkippedRows(read_rows);
next_row_offset += read_rows;

// When we read dmfile, if the previous pack is not read,
Expand Down Expand Up @@ -357,8 +357,7 @@ Block DMFileReader::read()

size_t read_packs = next_pack_id - start_pack_id;

scan_context->total_dmfile_scanned_packs += read_packs;
scan_context->total_dmfile_scanned_rows += read_rows;
addScannedRows(read_rows);

// TODO: this will need better algorithm: we should separate those packs which can and can not do clean read.
bool do_clean_read_on_normal_mode
Expand Down Expand Up @@ -639,4 +638,40 @@ bool DMFileReader::getCachedPacks(
col_data_cache->del(col_id, next_pack_id);
return found;
}

void DMFileReader::addScannedRows(UInt64 rows)
{
switch (read_tag)
{
case ReadTag::Query:
scan_context->dmfile_data_scanned_rows += rows;
break;
case ReadTag::MVCC:
scan_context->dmfile_mvcc_scanned_rows += rows;
break;
case ReadTag::LMFilter:
scan_context->dmfile_lm_filter_scanned_rows += rows;
break;
default:
break;
}
}

void DMFileReader::addSkippedRows(UInt64 rows)
{
switch (read_tag)
{
case ReadTag::Query:
scan_context->dmfile_data_skipped_rows += rows;
break;
case ReadTag::MVCC:
scan_context->dmfile_mvcc_skipped_rows += rows;
break;
case ReadTag::LMFilter:
scan_context->dmfile_lm_filter_skipped_rows += rows;
break;
default:
break;
}
}
} // namespace DB::DM
Loading

0 comments on commit 596a222

Please sign in to comment.