Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
jacktengg committed Oct 12, 2024
1 parent 19092cb commit 31c91a3
Show file tree
Hide file tree
Showing 15 changed files with 107 additions and 150 deletions.
1 change: 1 addition & 0 deletions be/src/pipeline/dependency.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ Status AggSpillPartition::get_spill_stream(RuntimeState* state, int node_id,
std::numeric_limits<int32_t>::max(), std::numeric_limits<size_t>::max(), profile));
spill_streams_.emplace_back(spilling_stream_);
spill_stream = spilling_stream_;
spill_stream->set_write_counters(profile);
return Status::OK();
}
void AggSpillPartition::close() {
Expand Down
91 changes: 23 additions & 68 deletions be/src/pipeline/exec/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,12 @@ class PipelineXSpillLocalState : public PipelineXLocalState<SharedStateArg> {
RETURN_IF_ERROR(PipelineXLocalState<SharedStateArg>::init(state, info));

_spill_total_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillTotalTime", 1);
init_spill_read_counters();

return Status::OK();
}

void init_spill_write_counters() {
_spill_write_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteTime", 1);

_spill_write_wait_in_queue_task_count = ADD_COUNTER_WITH_LEVEL(
Expand All @@ -307,19 +312,19 @@ class PipelineXSpillLocalState : public PipelineXLocalState<SharedStateArg> {
ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteBlockCount", TUnit::UNIT, 1);
_spill_write_block_data_size =
ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteBlockDataSize", TUnit::BYTES, 1);
_spill_write_data_size =
ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteDataSize", TUnit::BYTES, 1);
_spill_write_file_data_size =
ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteFileTotalSize", TUnit::BYTES, 1);
_spill_write_file_current_size = ADD_COUNTER_WITH_LEVEL(
Base::profile(), "SpillWriteFileCurrentSize", TUnit::BYTES, 1);
_spill_write_rows_count =
ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteRows", TUnit::UNIT, 1);
_spill_file_total_count =
ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteFileTotalCount", TUnit::UNIT, 1);
_spill_file_current_count = ADD_COUNTER_WITH_LEVEL(
Base::profile(), "SpillWriteFileCurrentCount", TUnit::UNIT, 1);
_spill_file_total_size =
ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteFileTotalSize", TUnit::BYTES, 1);
_spill_file_current_size = ADD_COUNTER_WITH_LEVEL(
Base::profile(), "SpillWriteFileCurrentSize", TUnit::BYTES, 1);
}

void init_spill_read_counters() {
// Spill read counters
_spill_recover_time = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillRecoverTime", 1);

Expand All @@ -344,12 +349,11 @@ class PipelineXSpillLocalState : public PipelineXLocalState<SharedStateArg> {
ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillReadRows", TUnit::UNIT, 1);
_spill_read_file_count =
ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillReadFileCount", TUnit::UNIT, 1);
return Status::OK();
}

// Total time of spill, including serialize block time, write disk file,
// read disk file, deserialize block time,
// and wait in queue time, etc.
// Total time of spill, including spill task scheduling time,
// serialize block time, write disk file time,
// and read disk file time, deserialize block time etc.
RuntimeProfile::Counter* _spill_total_timer = nullptr;

// Spill write counters
Expand All @@ -370,14 +374,14 @@ class PipelineXSpillLocalState : public PipelineXLocalState<SharedStateArg> {
// Total bytes of spill data in Block format(in memory format)
RuntimeProfile::Counter* _spill_write_block_data_size = nullptr;
// Total bytes of spill data written to disk file(after serialized)
RuntimeProfile::Counter* _spill_write_data_size = nullptr;
RuntimeProfile::Counter* _spill_write_file_data_size = nullptr;
RuntimeProfile::Counter* _spill_write_rows_count = nullptr;
RuntimeProfile::Counter* _spill_file_total_count = nullptr;
RuntimeProfile::Counter* _spill_file_current_count = nullptr;
// Spilled file total size
RuntimeProfile::Counter* _spill_file_total_size = nullptr;
// Current spilled file size
RuntimeProfile::Counter* _spill_file_current_size = nullptr;
RuntimeProfile::Counter* _spill_write_file_current_size = nullptr;

// Spill read counters
// Total time of recovring spilled data, including read file time, deserialize time, etc.
Expand All @@ -392,7 +396,7 @@ class PipelineXSpillLocalState : public PipelineXLocalState<SharedStateArg> {
RuntimeProfile::Counter* _spill_read_block_count = nullptr;
// Total bytes of read data in Block format(in memory format)
RuntimeProfile::Counter* _spill_read_block_data_size = nullptr;
// Total bytes of spill data written to disk file(after serialized)
// Total bytes of spill data read from disk file
RuntimeProfile::Counter* _spill_read_file_size = nullptr;
RuntimeProfile::Counter* _spill_read_rows_count = nullptr;
RuntimeProfile::Counter* _spill_read_file_count = nullptr;
Expand Down Expand Up @@ -695,43 +699,14 @@ class PipelineXSpillSinkLocalState : public PipelineXSinkLocalState<SharedStateA
ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteBlockCount", TUnit::UNIT, 1);
_spill_write_block_data_size =
ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteBlockDataSize", TUnit::BYTES, 1);
_spill_write_data_size =
ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteDataSize", TUnit::BYTES, 1);
_spill_write_file_data_size =
ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteFileTotalSize", TUnit::BYTES, 1);
_spill_write_rows_count =
ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteRows", TUnit::UNIT, 1);
_spill_file_total_count =
ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteFileTotalCount", TUnit::UNIT, 1);
_spill_file_current_count = ADD_COUNTER_WITH_LEVEL(
Base::profile(), "SpillWriteFileCurrentCount", TUnit::UNIT, 1);
_spill_file_total_size =
ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteFileTotalSize", TUnit::BYTES, 1);
_spill_file_current_size = ADD_COUNTER_WITH_LEVEL(
Base::profile(), "SpillWriteFileCurrentSize", TUnit::BYTES, 1);

// Spill read counters
_spill_recover_time = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillRecoverTime", 1);

_spill_read_wait_in_queue_task_count = ADD_COUNTER_WITH_LEVEL(
Base::profile(), "SpillReadTaskWaitInQueueCount", TUnit::UNIT, 1);
_spill_reading_task_count =
ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillReadTaskCount", TUnit::UNIT, 1);
_spill_read_wait_in_queue_timer =
ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillReadTaskWaitInQueueTime", 1);

_spill_read_file_time = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillReadFileTime", 1);
_spill_read_derialize_block_timer =
ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillReadDerializeBlockTime", 1);

_spill_read_block_count =
ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillReadBlockCount", TUnit::UNIT, 1);
_spill_read_block_data_size =
ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillReadBlockDataSize", TUnit::BYTES, 1);
_spill_read_file_size =
ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillReadFileSize", TUnit::BYTES, 1);
_spill_read_rows_count =
ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillReadRows", TUnit::UNIT, 1);
_spill_read_file_count =
ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillReadFileCount", TUnit::UNIT, 1);

_spill_max_rows_of_partition =
ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillMaxRowsOfPartition", TUnit::UNIT, 1);
Expand Down Expand Up @@ -764,9 +739,9 @@ class PipelineXSpillSinkLocalState : public PipelineXSinkLocalState<SharedStateA

std::vector<int64_t> _rows_in_partitions;

// Total time of spill, including serialize block time, write disk file,
// read disk file, deserialize block time,
// and wait in queue time, etc.
// Total time of spill, including spill task scheduling time,
// serialize block time, write disk file time,
// and read disk file time, deserialize block time etc.
RuntimeProfile::Counter* _spill_total_timer = nullptr;

// Spill write counters
Expand All @@ -787,32 +762,12 @@ class PipelineXSpillSinkLocalState : public PipelineXSinkLocalState<SharedStateA
// Total bytes of spill data in Block format(in memory format)
RuntimeProfile::Counter* _spill_write_block_data_size = nullptr;
// Total bytes of spill data written to disk file(after serialized)
RuntimeProfile::Counter* _spill_write_data_size = nullptr;
RuntimeProfile::Counter* _spill_write_file_data_size = nullptr;
RuntimeProfile::Counter* _spill_write_rows_count = nullptr;
RuntimeProfile::Counter* _spill_file_total_count = nullptr;
RuntimeProfile::Counter* _spill_file_current_count = nullptr;
// Spilled file total size
RuntimeProfile::Counter* _spill_file_total_size = nullptr;
// Current spilled file size
RuntimeProfile::Counter* _spill_file_current_size = nullptr;

// Spill read counters
// Total time of recovring spilled data, including read file time, deserialize time, etc.
RuntimeProfile::Counter* _spill_recover_time = nullptr;

RuntimeProfile::Counter* _spill_read_wait_in_queue_task_count = nullptr;
RuntimeProfile::Counter* _spill_reading_task_count = nullptr;
RuntimeProfile::Counter* _spill_read_wait_in_queue_timer = nullptr;

RuntimeProfile::Counter* _spill_read_file_time = nullptr;
RuntimeProfile::Counter* _spill_read_derialize_block_timer = nullptr;
RuntimeProfile::Counter* _spill_read_block_count = nullptr;
// Total bytes of read data in Block format(in memory format)
RuntimeProfile::Counter* _spill_read_block_data_size = nullptr;
// Total bytes of spill data written to disk file(after serialized)
RuntimeProfile::Counter* _spill_read_file_size = nullptr;
RuntimeProfile::Counter* _spill_read_rows_count = nullptr;
RuntimeProfile::Counter* _spill_read_file_count = nullptr;

RuntimeProfile::Counter* _spill_max_rows_of_partition = nullptr;
RuntimeProfile::Counter* _spill_min_rows_of_partition = nullptr;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ Status PartitionedAggSinkLocalState::revoke_memory(

state->get_query_ctx()->increase_revoking_tasks_count();
auto spill_runnable = std::make_shared<SpillRunnable>(
state, true, _shared_state->shared_from_this(),
state, _profile, true, _shared_state->shared_from_this(),
[this, &parent, state, query_id, size_to_revoke, spill_context, submit_timer] {
auto submit_elapsed_time = submit_timer.elapsed_time();
_spill_write_wait_in_queue_timer->update(submit_elapsed_time);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,11 +250,11 @@ Status PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, b
while (!state->is_cancelled() && !has_agg_data &&
!_shared_state->spill_partitions.empty()) {
for (auto& stream : _shared_state->spill_partitions[0]->spill_streams_) {
stream->set_read_counters(profile());
vectorized::Block block;
bool eos = false;
while (!eos && !state->is_cancelled()) {
{
SCOPED_TIMER(Base::_spill_recover_time);
DBUG_EXECUTE_IF("fault_inject::partitioned_agg_source::recover_spill_data",
{
_status = Status::Error<INTERNAL_ERROR>(
Expand Down Expand Up @@ -323,7 +323,8 @@ Status PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, b
});
_spill_dependency->block();
return ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit(
std::make_shared<SpillRunnable>(state, false, _shared_state->shared_from_this(),
std::make_shared<SpillRunnable>(state, _runtime_profile.get(), false,
_shared_state->shared_from_this(),
exception_catch_func));
}
} // namespace doris::pipeline
19 changes: 14 additions & 5 deletions be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ PartitionedHashJoinProbeLocalState::PartitionedHashJoinProbeLocalState(RuntimeSt

Status PartitionedHashJoinProbeLocalState::init(RuntimeState* state, LocalStateInfo& info) {
RETURN_IF_ERROR(PipelineXSpillLocalState::init(state, info));
init_spill_write_counters();

SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_init_timer);
_internal_runtime_profile.reset(new RuntimeProfile("internal_profile"));
Expand Down Expand Up @@ -203,6 +205,7 @@ Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(
state, spilling_stream, print_id(state->query_id()), "hash_probe",
_parent->node_id(), std::numeric_limits<int32_t>::max(),
std::numeric_limits<size_t>::max(), _runtime_profile.get()));
spilling_stream->set_write_counters(_runtime_profile.get());
}

auto merged_block = vectorized::MutableBlock::create_unique(blocks[0].clone_empty());
Expand Down Expand Up @@ -281,8 +284,9 @@ Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(
"fault_inject partitioned_hash_join_probe spill_probe_blocks submit_func failed");
});

auto spill_runnable = std::make_shared<SpillRunnable>(
state, true, _shared_state->shared_from_this(), exception_catch_func);
auto spill_runnable = std::make_shared<SpillRunnable>(state, _runtime_profile.get(), true,
_shared_state->shared_from_this(),
exception_catch_func);
return spill_io_pool->submit(std::move(spill_runnable));
}

Expand All @@ -291,6 +295,7 @@ Status PartitionedHashJoinProbeLocalState::finish_spilling(uint32_t partition_in

if (probe_spilling_stream) {
RETURN_IF_ERROR(probe_spilling_stream->spill_eof());
probe_spilling_stream->set_read_counters(profile());
}

return Status::OK();
Expand All @@ -307,6 +312,7 @@ Status PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim
if (!spilled_stream) {
return Status::OK();
}
spilled_stream->set_read_counters(profile());

std::weak_ptr<PartitionedHashJoinSharedState> shared_state_holder =
_shared_state->shared_from_this();
Expand Down Expand Up @@ -434,8 +440,9 @@ Status PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim
"fault_inject partitioned_hash_join_probe "
"recovery_build_blocks submit_func failed");
});
auto spill_runnable = std::make_shared<SpillRunnable>(
state, false, _shared_state->shared_from_this(), exception_catch_func);
auto spill_runnable = std::make_shared<SpillRunnable>(state, _runtime_profile.get(), false,
_shared_state->shared_from_this(),
exception_catch_func);
VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", node: " << _parent->node_id()
<< ", task id: " << state->task_id() << ", partition: " << partition_index
<< " recover_build_blocks_from_disk submit func";
Expand Down Expand Up @@ -473,6 +480,7 @@ Status PartitionedHashJoinProbeLocalState::recover_probe_blocks_from_disk(Runtim
return Status::OK();
}

spilled_stream->set_read_counters(profile());
auto& blocks = _probe_blocks[partition_index];

auto query_id = state->query_id();
Expand Down Expand Up @@ -555,7 +563,8 @@ Status PartitionedHashJoinProbeLocalState::recover_probe_blocks_from_disk(Runtim
"recovery_probe_blocks submit_func failed");
});
return spill_io_pool->submit(std::make_shared<SpillRunnable>(
state, false, _shared_state->shared_from_this(), exception_catch_func));
state, _runtime_profile.get(), false, _shared_state->shared_from_this(),
exception_catch_func));
}

PartitionedHashJoinProbeOperatorX::PartitionedHashJoinProbeOperatorX(ObjectPool* pool,
Expand Down
5 changes: 3 additions & 2 deletions be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ Status PartitionedHashJoinSinkLocalState::open(RuntimeState* state) {
state, spilling_stream, print_id(state->query_id()),
fmt::format("hash_build_sink_{}", i), _parent->node_id(),
std::numeric_limits<int32_t>::max(), std::numeric_limits<size_t>::max(), _profile));
spilling_stream->set_write_counters(_profile);
}
return p._partitioner->clone(state, _partitioner);
}
Expand Down Expand Up @@ -318,7 +319,7 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
};

auto spill_runnable = std::make_shared<SpillRunnable>(
state, true, _shared_state->shared_from_this(), exception_catch_func);
state, _profile, true, _shared_state->shared_from_this(), exception_catch_func);

auto* thread_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();

Expand Down Expand Up @@ -378,7 +379,7 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(
// so that when a stream finished, it should desc -1
state->get_query_ctx()->increase_revoking_tasks_count();
auto spill_runnable = std::make_shared<SpillRunnable>(
state, true, _shared_state->shared_from_this(),
state, _profile, true, _shared_state->shared_from_this(),
[this, query_id, spilling_stream, i, submit_timer, spill_context] {
auto submit_elapsed_time = submit_timer.elapsed_time();
_spill_write_wait_in_queue_timer->update(submit_elapsed_time);
Expand Down
4 changes: 3 additions & 1 deletion be/src/pipeline/exec/spill_sort_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state,
_shared_state->spill_block_batch_row_count,
SpillSortSharedState::SORT_BLOCK_SPILL_BATCH_BYTES, profile());
RETURN_IF_ERROR(status);
_spilling_stream->set_write_counters(_profile);

_shared_state->sorted_streams.emplace_back(_spilling_stream);

Expand Down Expand Up @@ -303,7 +304,8 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state,
if (status.ok()) {
state->get_query_ctx()->increase_revoking_tasks_count();
status = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit(
std::make_shared<SpillRunnable>(state, true, _shared_state->shared_from_this(),
std::make_shared<SpillRunnable>(state, _profile, true,
_shared_state->shared_from_this(),
exception_catch_func));
}
if (!status.ok()) {
Expand Down
Loading

0 comments on commit 31c91a3

Please sign in to comment.