Skip to content

Commit

Permalink
improve spill counters
Browse files Browse the repository at this point in the history
  • Loading branch information
jacktengg committed Oct 11, 2024
1 parent 9f4c269 commit 625dfa2
Show file tree
Hide file tree
Showing 22 changed files with 566 additions and 289 deletions.
1 change: 0 additions & 1 deletion be/src/pipeline/exec/aggregation_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ Status AggSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) {

_build_timer = ADD_TIMER(Base::profile(), "BuildTime");
_serialize_key_timer = ADD_TIMER(Base::profile(), "SerializeKeyTime");
_exec_timer = ADD_TIMER(Base::profile(), "ExecTime");
_merge_timer = ADD_TIMER(Base::profile(), "MergeTime");
_expr_timer = ADD_TIMER(Base::profile(), "ExprTime");
_serialize_data_timer = ADD_TIMER(Base::profile(), "SerializeDataTime");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ Status DistinctStreamingAggLocalState::init(RuntimeState* state, LocalStateInfo&
SCOPED_TIMER(Base::exec_time_counter());
SCOPED_TIMER(Base::_init_timer);
_build_timer = ADD_TIMER(Base::profile(), "BuildTime");
_exec_timer = ADD_TIMER(Base::profile(), "ExecTime");
_hash_table_compute_timer = ADD_TIMER(Base::profile(), "HashTableComputeTime");
_hash_table_emplace_timer = ADD_TIMER(Base::profile(), "HashTableEmplaceTime");
_hash_table_input_counter = ADD_COUNTER(Base::profile(), "HashTableInputCount", TUnit::UNIT);
Expand Down
249 changes: 211 additions & 38 deletions be/src/pipeline/exec/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -287,28 +287,115 @@ class PipelineXSpillLocalState : public PipelineXLocalState<SharedStateArg> {

Status init(RuntimeState* state, LocalStateInfo& info) override {
RETURN_IF_ERROR(PipelineXLocalState<SharedStateArg>::init(state, info));
_spill_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillTime", 1);

_spill_total_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillTotalTime", 1);

_spill_write_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteTime", 1);

_spill_write_wait_in_queue_task_count = ADD_COUNTER_WITH_LEVEL(
Base::profile(), "SpillWriteTaskWaitInQueueCount", TUnit::UNIT, 1);
_spill_writing_task_count =
ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteTaskCount", TUnit::UNIT, 1);
_spill_write_wait_in_queue_timer =
ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteTaskWaitInQueueTime", 1);

_spill_write_file_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteFileTime", 1);

_spill_write_serialize_block_timer =
ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteSerializeBlockTime", 1);
_spill_write_block_count =
ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteBlockCount", TUnit::UNIT, 1);
_spill_write_block_data_size =
ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteBlockDataSize", TUnit::BYTES, 1);
_spill_write_data_size =
ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteDataSize", TUnit::BYTES, 1);
_spill_write_rows_count =
ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteRows", TUnit::UNIT, 1);
_spill_file_total_count =
ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteFileTotalCount", TUnit::UNIT, 1);
_spill_file_current_count = ADD_COUNTER_WITH_LEVEL(
Base::profile(), "SpillWriteFileCurrentCount", TUnit::UNIT, 1);
_spill_file_total_size =
ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteFileTotalSize", TUnit::BYTES, 1);
_spill_file_current_size = ADD_COUNTER_WITH_LEVEL(
Base::profile(), "SpillWriteFileCurrentSize", TUnit::BYTES, 1);

// Spill read counters
_spill_recover_time = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillRecoverTime", 1);
_spill_read_data_time = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillReadDataTime", 1);
_spill_deserialize_time = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillDeserializeTime", 1);
_spill_read_bytes =
ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillReadDataSize", TUnit::BYTES, 1);
_spill_wait_in_queue_timer =
ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillWaitInQueueTime", 1);
_spill_write_wait_io_timer =
ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteWaitIOTime", 1);
_spill_read_wait_io_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillReadWaitIOTime", 1);

_spill_read_wait_in_queue_task_count = ADD_COUNTER_WITH_LEVEL(
Base::profile(), "SpillReadTaskWaitInQueueCount", TUnit::UNIT, 1);
_spill_reading_task_count =
ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillReadTaskCount", TUnit::UNIT, 1);
_spill_read_wait_in_queue_timer =
ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillReadTaskWaitInQueueTime", 1);

_spill_read_file_time = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillReadFileTime", 1);
_spill_read_derialize_block_timer =
ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillReadDerializeBlockTime", 1);

_spill_read_block_count =
ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillReadBlockCount", TUnit::UNIT, 1);
_spill_read_block_data_size =
ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillReadBlockDataSize", TUnit::BYTES, 1);
_spill_read_file_size =
ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillReadFileSize", TUnit::BYTES, 1);
_spill_read_rows_count =
ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillReadRows", TUnit::UNIT, 1);
_spill_read_file_count =
ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillReadFileCount", TUnit::UNIT, 1);
return Status::OK();
}

RuntimeProfile::Counter* _spill_timer = nullptr;
RuntimeProfile::Counter* _spill_recover_time;
RuntimeProfile::Counter* _spill_read_data_time;
RuntimeProfile::Counter* _spill_deserialize_time;
RuntimeProfile::Counter* _spill_read_bytes;
RuntimeProfile::Counter* _spill_write_wait_io_timer = nullptr;
RuntimeProfile::Counter* _spill_read_wait_io_timer = nullptr;
RuntimeProfile::Counter* _spill_wait_in_queue_timer = nullptr;
// Total time of spill, including serialize block time, write disk file,
// read disk file, deserialize block time,
// and wait in queue time, etc.
RuntimeProfile::Counter* _spill_total_timer = nullptr;

// Spill write counters
// Total time of spill write, including serialize block time, write disk file,
// and wait in queue time, etc.
RuntimeProfile::Counter* _spill_write_timer = nullptr;

RuntimeProfile::Counter* _spill_write_wait_in_queue_task_count = nullptr;
RuntimeProfile::Counter* _spill_writing_task_count = nullptr;
RuntimeProfile::Counter* _spill_write_wait_in_queue_timer = nullptr;

// Total time of writing file
RuntimeProfile::Counter* _spill_write_file_timer = nullptr;
RuntimeProfile::Counter* _spill_write_serialize_block_timer = nullptr;
// Original count of spilled Blocks
// One Big Block maybe split into multiple small Blocks when actually written to disk file.
RuntimeProfile::Counter* _spill_write_block_count = nullptr;
// Total bytes of spill data in Block format(in memory format)
RuntimeProfile::Counter* _spill_write_block_data_size = nullptr;
// Total bytes of spill data written to disk file(after serialized)
RuntimeProfile::Counter* _spill_write_data_size = nullptr;
RuntimeProfile::Counter* _spill_write_rows_count = nullptr;
RuntimeProfile::Counter* _spill_file_total_count = nullptr;
RuntimeProfile::Counter* _spill_file_current_count = nullptr;
// Spilled file total size
RuntimeProfile::Counter* _spill_file_total_size = nullptr;
// Current spilled file size
RuntimeProfile::Counter* _spill_file_current_size = nullptr;

// Spill read counters
// Total time of recovring spilled data, including read file time, deserialize time, etc.
RuntimeProfile::Counter* _spill_recover_time = nullptr;

RuntimeProfile::Counter* _spill_read_wait_in_queue_task_count = nullptr;
RuntimeProfile::Counter* _spill_reading_task_count = nullptr;
RuntimeProfile::Counter* _spill_read_wait_in_queue_timer = nullptr;

RuntimeProfile::Counter* _spill_read_file_time = nullptr;
RuntimeProfile::Counter* _spill_read_derialize_block_timer = nullptr;
RuntimeProfile::Counter* _spill_read_block_count = nullptr;
// Total bytes of read data in Block format(in memory format)
RuntimeProfile::Counter* _spill_read_block_data_size = nullptr;
// Total bytes of spill data written to disk file(after serialized)
RuntimeProfile::Counter* _spill_read_file_size = nullptr;
RuntimeProfile::Counter* _spill_read_rows_count = nullptr;
RuntimeProfile::Counter* _spill_read_file_count = nullptr;
};

class DataSinkOperatorXBase;
Expand Down Expand Up @@ -587,19 +674,63 @@ class PipelineXSpillSinkLocalState : public PipelineXSinkLocalState<SharedStateA
Status init(RuntimeState* state, LocalSinkStateInfo& info) override {
RETURN_IF_ERROR(Base::init(state, info));

_spill_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillTime", 1);
_spill_serialize_block_timer =
ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillSerializeBlockTime", 1);
_spill_write_disk_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteDiskTime", 1);
_spill_data_size =
ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteDataSize", TUnit::BYTES, 1);
_spill_block_count =
_spill_total_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillTotalTime", 1);

_spill_write_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteTime", 1);

_spill_write_wait_in_queue_task_count = ADD_COUNTER_WITH_LEVEL(
Base::profile(), "SpillWriteTaskWaitInQueueCount", TUnit::UNIT, 1);
_spill_writing_task_count =
ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteTaskCount", TUnit::UNIT, 1);
_spill_write_wait_in_queue_timer =
ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteTaskWaitInQueueTime", 1);

_spill_write_file_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteFileTime", 1);

_spill_write_serialize_block_timer =
ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteSerializeBlockTime", 1);
_spill_write_block_count =
ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteBlockCount", TUnit::UNIT, 1);
_spill_wait_in_queue_timer =
ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillWaitInQueueTime", 1);
_spill_write_wait_io_timer =
ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteWaitIOTime", 1);
_spill_read_wait_io_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillReadWaitIOTime", 1);
_spill_write_block_data_size =
ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteBlockDataSize", TUnit::BYTES, 1);
_spill_write_data_size =
ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteDataSize", TUnit::BYTES, 1);
_spill_write_rows_count =
ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteRows", TUnit::UNIT, 1);
_spill_file_total_count =
ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteFileTotalCount", TUnit::UNIT, 1);
_spill_file_current_count = ADD_COUNTER_WITH_LEVEL(
Base::profile(), "SpillWriteFileCurrentCount", TUnit::UNIT, 1);
_spill_file_total_size =
ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteFileTotalSize", TUnit::BYTES, 1);
_spill_file_current_size = ADD_COUNTER_WITH_LEVEL(
Base::profile(), "SpillWriteFileCurrentSize", TUnit::BYTES, 1);

// Spill read counters
_spill_recover_time = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillRecoverTime", 1);

_spill_read_wait_in_queue_task_count = ADD_COUNTER_WITH_LEVEL(
Base::profile(), "SpillReadTaskWaitInQueueCount", TUnit::UNIT, 1);
_spill_reading_task_count =
ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillReadTaskCount", TUnit::UNIT, 1);
_spill_read_wait_in_queue_timer =
ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillReadTaskWaitInQueueTime", 1);

_spill_read_file_time = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillReadFileTime", 1);
_spill_read_derialize_block_timer =
ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillReadDerializeBlockTime", 1);

_spill_read_block_count =
ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillReadBlockCount", TUnit::UNIT, 1);
_spill_read_block_data_size =
ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillReadBlockDataSize", TUnit::BYTES, 1);
_spill_read_file_size =
ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillReadFileSize", TUnit::BYTES, 1);
_spill_read_rows_count =
ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillReadRows", TUnit::UNIT, 1);
_spill_read_file_count =
ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillReadFileCount", TUnit::UNIT, 1);

_spill_max_rows_of_partition =
ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillMaxRowsOfPartition", TUnit::UNIT, 1);
_spill_min_rows_of_partition =
Expand Down Expand Up @@ -631,14 +762,56 @@ class PipelineXSpillSinkLocalState : public PipelineXSinkLocalState<SharedStateA

std::vector<int64_t> _rows_in_partitions;

RuntimeProfile::Counter* _spill_timer = nullptr;
RuntimeProfile::Counter* _spill_serialize_block_timer = nullptr;
RuntimeProfile::Counter* _spill_write_disk_timer = nullptr;
RuntimeProfile::Counter* _spill_data_size = nullptr;
RuntimeProfile::Counter* _spill_block_count = nullptr;
RuntimeProfile::Counter* _spill_wait_in_queue_timer = nullptr;
RuntimeProfile::Counter* _spill_write_wait_io_timer = nullptr;
RuntimeProfile::Counter* _spill_read_wait_io_timer = nullptr;
// Total time of spill, including serialize block time, write disk file,
// read disk file, deserialize block time,
// and wait in queue time, etc.
RuntimeProfile::Counter* _spill_total_timer = nullptr;

// Spill write counters
// Total time of spill write, including serialize block time, write disk file,
// and wait in queue time, etc.
RuntimeProfile::Counter* _spill_write_timer = nullptr;

RuntimeProfile::Counter* _spill_write_wait_in_queue_task_count = nullptr;
RuntimeProfile::Counter* _spill_writing_task_count = nullptr;
RuntimeProfile::Counter* _spill_write_wait_in_queue_timer = nullptr;

// Total time of writing file
RuntimeProfile::Counter* _spill_write_file_timer = nullptr;
RuntimeProfile::Counter* _spill_write_serialize_block_timer = nullptr;
// Original count of spilled Blocks
// One Big Block maybe split into multiple small Blocks when actually written to disk file.
RuntimeProfile::Counter* _spill_write_block_count = nullptr;
// Total bytes of spill data in Block format(in memory format)
RuntimeProfile::Counter* _spill_write_block_data_size = nullptr;
// Total bytes of spill data written to disk file(after serialized)
RuntimeProfile::Counter* _spill_write_data_size = nullptr;
RuntimeProfile::Counter* _spill_write_rows_count = nullptr;
RuntimeProfile::Counter* _spill_file_total_count = nullptr;
RuntimeProfile::Counter* _spill_file_current_count = nullptr;
// Spilled file total size
RuntimeProfile::Counter* _spill_file_total_size = nullptr;
// Current spilled file size
RuntimeProfile::Counter* _spill_file_current_size = nullptr;

// Spill read counters
// Total time of recovring spilled data, including read file time, deserialize time, etc.
RuntimeProfile::Counter* _spill_recover_time = nullptr;

RuntimeProfile::Counter* _spill_read_wait_in_queue_task_count = nullptr;
RuntimeProfile::Counter* _spill_reading_task_count = nullptr;
RuntimeProfile::Counter* _spill_read_wait_in_queue_timer = nullptr;

RuntimeProfile::Counter* _spill_read_file_time = nullptr;
RuntimeProfile::Counter* _spill_read_derialize_block_timer = nullptr;
RuntimeProfile::Counter* _spill_read_block_count = nullptr;
// Total bytes of read data in Block format(in memory format)
RuntimeProfile::Counter* _spill_read_block_data_size = nullptr;
// Total bytes of spill data written to disk file(after serialized)
RuntimeProfile::Counter* _spill_read_file_size = nullptr;
RuntimeProfile::Counter* _spill_read_rows_count = nullptr;
RuntimeProfile::Counter* _spill_read_file_count = nullptr;

RuntimeProfile::Counter* _spill_max_rows_of_partition = nullptr;
RuntimeProfile::Counter* _spill_min_rows_of_partition = nullptr;
};
Expand Down
13 changes: 10 additions & 3 deletions be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -301,17 +301,24 @@ Status PartitionedAggSinkLocalState::revoke_memory(

state->get_query_ctx()->increase_revoking_tasks_count();
auto spill_runnable = std::make_shared<SpillRunnable>(
state, _shared_state->shared_from_this(),
state, true, _shared_state->shared_from_this(),
[this, &parent, state, query_id, size_to_revoke, spill_context, submit_timer] {
auto submit_elapsed_time = submit_timer.elapsed_time();
_spill_write_wait_in_queue_timer->update(submit_elapsed_time);
exec_time_counter()->update(submit_elapsed_time);
_spill_total_timer->update(submit_elapsed_time);

SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_spill_total_timer);
SCOPED_TIMER(_spill_write_timer);

DBUG_EXECUTE_IF("fault_inject::partitioned_agg_sink::revoke_memory_cancel", {
auto st = Status::InternalError(
"fault_inject partitioned_agg_sink "
"revoke_memory canceled");
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id, st);
return st;
});
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
SCOPED_TIMER(Base::_spill_timer);
Defer defer {[&]() {
if (!_shared_state->sink_status.ok() || state->is_cancelled()) {
if (!_shared_state->sink_status.ok()) {
Expand Down
14 changes: 4 additions & 10 deletions be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,6 @@ class PartitionedAggSinkLocalState
auto status = spill_partition->get_spill_stream(state, Base::_parent->node_id(),
Base::profile(), spill_stream);
RETURN_IF_ERROR(status);
spill_stream->set_write_counters(Base::_spill_serialize_block_timer,
Base::_spill_block_count, Base::_spill_data_size,
Base::_spill_write_disk_timer,
Base::_spill_write_wait_io_timer, memory_used_counter());

status = to_block(context, keys, values, null_key_data);
RETURN_IF_ERROR(status);
Expand All @@ -168,15 +164,13 @@ class PartitionedAggSinkLocalState
keys.clear();
values.clear();
}
status = spill_stream->prepare_spill();
RETURN_IF_ERROR(status);

{
SCOPED_TIMER(_spill_write_disk_timer);
SCOPED_TIMER(_spill_total_timer);
status = spill_stream->spill_block(state, block_, false);

RETURN_IF_ERROR(status);
status = spill_partition->flush_if_full();
}
RETURN_IF_ERROR(status);
status = spill_partition->flush_if_full();
_reset_tmp_data();
return status;
}
Expand Down
Loading

0 comments on commit 625dfa2

Please sign in to comment.