Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] Optimize the root path choosing logic on tablet creation (backport #26238) #26382

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 15 additions & 7 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -529,17 +529,21 @@ CONF_mInt32(max_consumer_num_per_group, "3");
// Max pulsar consumer num in one data consumer group, for routine load.
CONF_mInt32(max_pulsar_consumer_num_per_group, "10");

<<<<<<< HEAD
// The size of thread pool for routine load task.
// this should be larger than FE config 'max_concurrent_task_num_per_be' (default 5).
CONF_Int32(routine_load_thread_pool_size, "10");

// kafka reqeust timeout
=======
// kafka request timeout
>>>>>>> dbf5fc61e ([Enhancement] Optimize the root path choosing logic on tablet creation (#26238))
CONF_Int32(routine_load_kafka_timeout_second, "10");

// pulsar reqeust timeout
// pulsar request timeout
CONF_Int32(routine_load_pulsar_timeout_second, "10");

// Is set to true, index loading failure will not causing BE exit,
// Is set to true, index loading failure will not cause BE exit,
// and the tablet will be marked as bad, so that FE will try to repair it.
// CONF_Bool(auto_recover_index_loading_failure, "false");

Expand Down Expand Up @@ -583,6 +587,10 @@ CONF_mInt32(path_scan_interval_second, "86400");
CONF_mInt32(storage_flood_stage_usage_percent, "95"); // 95%
// The min bytes that should be left of a data dir
CONF_mInt64(storage_flood_stage_left_capacity_bytes, "107374182400"); // 100GB
// When choosing storage root path for tablet creation, disks with usage larger than the
// average value by `storage_high_usage_disk_protect_ratio` won't be chosen at first.
CONF_mDouble(storage_high_usage_disk_protect_ratio, "0.1"); // 10%

// Number of thread for flushing memtable per store.
CONF_mInt32(flush_thread_num_per_store, "2");

Expand All @@ -596,21 +604,21 @@ CONF_Int64(brpc_max_body_size, "2147483648");
CONF_Int64(brpc_socket_max_unwritten_bytes, "1073741824");

// Max number of txns for every txn_partition_map in txn manager.
// this is a self protection to avoid too many txns saving in manager.
// this is a self-protection to avoid too many txns saving in manager.
CONF_mInt64(max_runnings_transactions_per_txn_map, "100");

// The tablet map shard size, the value must be power of two.
// this is a an enhancement for better performance to manage tablet.
// this is an enhancement for better performance to manage tablet.
CONF_Int32(tablet_map_shard_size, "32");

CONF_String(plugin_path, "${STARROCKS_HOME}/plugin");

// txn_map_lock shard size, the value is 2^n, n=0,1,2,3,4
// this is a an enhancement for better performance to manage txn.
// this is an enhancement for better performance to manage txn.
CONF_Int32(txn_map_shard_size, "128");

// txn_lock shard size, the value is 2^n, n=0,1,2,3,4
// this is a an enhancement for better performance to commit and publish txn.
// this is an enhancement for better performance to commit and publish txn.
CONF_Int32(txn_shard_size, "1024");

// Whether to continue to start be when load tablet from header failed.
Expand Down Expand Up @@ -656,7 +664,7 @@ CONF_mInt16(storage_format_version, "2");
// 1 for LZ4_NULL
CONF_mInt16(null_encoding, "0");

// Do pre-aggregate if effect great than the factor, factor range:[1-100].
// Do pre-aggregate if effect greater than the factor, factor range:[1-100].
CONF_Int16(pre_aggregate_factor, "80");

#ifdef __x86_64__
Expand Down
2 changes: 1 addition & 1 deletion be/src/storage/data_dir.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ Status DataDir::update_capacity() {
}

bool DataDir::capacity_limit_reached(int64_t incoming_data_size) {
double used_pct = (_disk_capacity_bytes - _available_bytes + incoming_data_size) / (double)_disk_capacity_bytes;
double used_pct = disk_usage(incoming_data_size);
int64_t left_bytes = _available_bytes - incoming_data_size;

if (used_pct >= config::storage_flood_stage_usage_percent / 100.0 &&
Expand Down
5 changes: 4 additions & 1 deletion be/src/storage/data_dir.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ class DataDir {

int64_t available_bytes() const { return _available_bytes; }
int64_t disk_capacity_bytes() const { return _disk_capacity_bytes; }
double disk_usage(int64_t incoming_data_size) const {
return (double)(_disk_capacity_bytes - _available_bytes + incoming_data_size) / (double)_disk_capacity_bytes;
}

// save a cluster_id file under data path to prevent
// invalid be config for example two be use the same
Expand Down Expand Up @@ -118,7 +121,7 @@ class DataDir {
// TODO(cmy): for now we can not precisely calculate the capacity StarRocks used,
// so in order to avoid running out of disk capacity, we currently use the actual
// disk available capacity and total capacity to do the calculation.
// So that the capacity StarRocks actually used may exceeds the user specified capacity.
// So that the capacity StarRocks actually used may exceed the user specified capacity.
bool capacity_limit_reached(int64_t incoming_data_size);

Status update_capacity();
Expand Down
26 changes: 22 additions & 4 deletions be/src/storage/storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -452,13 +452,31 @@ std::vector<DataDir*> StorageEngine::get_stores_for_create_tablet(TStorageMedium
}
}

// sort by disk usage in asc order
std::sort(stores.begin(), stores.end(),
[](const auto& a, const auto& b) { return a->available_bytes() > b->available_bytes(); });
[](const auto& a, const auto& b) { return a->disk_usage(0) < b->disk_usage(0); });

// compute average usage of all disks
double avg_disk_usage = 0.0;
double usage_sum = 0.0;
for (const auto& v : stores) {
usage_sum += v->disk_usage(0);
}
avg_disk_usage = usage_sum / stores.size();

// find the last root path which will participate in vector shuffle so that all the paths
// before and included can be chosen to create tablet on preferentially
size_t last_candidate_idx = 0;
for (const auto v : stores) {
if (v->disk_usage(0) > avg_disk_usage + config::storage_high_usage_disk_protect_ratio) {
break;
}
last_candidate_idx++;
}

const int mid = stores.size() / 2 + 1;
// TODO(lingbin): should it be a global util func?
// randomize the preferential paths to balance number of tablets each disk has
std::srand(std::random_device()());
std::shuffle(stores.begin(), stores.begin() + mid, std::mt19937(std::random_device()()));
std::shuffle(stores.begin(), stores.begin() + last_candidate_idx, std::mt19937(std::random_device()()));
return stores;
}

Expand Down
5 changes: 3 additions & 2 deletions be/src/storage/storage_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,9 @@ class StorageEngine {
Status get_all_data_dir_info(std::vector<DataDirInfo>* data_dir_infos, bool need_update);

std::vector<string> get_store_paths();
// get root path for creating tablet. The returned vector of root path should be random,
// for avoiding that all the tablet would be deployed one disk.
// Get root path vector for creating tablet. The returned vector is sorted by the disk usage in asc order,
// then the front portion of the vector excluding paths which have high disk usage is shuffled to avoid
// the newly created tablet is distributed on only on specific path.
std::vector<DataDir*> get_stores_for_create_tablet(TStorageMedium::type storage_medium);
DataDir* get_store(const std::string& path);
DataDir* get_store(int64_t path_hash);
Expand Down
Loading