Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
change config

fix complie

fix index build

fix

fix regression

fix

add log

fix

fix variant schema

fix

fix

fix

fix size
  • Loading branch information
csun5285 committed Oct 11, 2024
1 parent 5dbbdac commit 7876eaa
Show file tree
Hide file tree
Showing 27 changed files with 412 additions and 375 deletions.
15 changes: 8 additions & 7 deletions be/src/cloud/cloud_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,14 @@ Status CloudRowsetWriter::build(RowsetSharedPtr& rowset) {
} else {
_rowset_meta->add_segments_file_size(seg_file_size.value());
}

if (auto idx_files_info = _idx_files_info.get_inverted_files_info(_segment_start_id);
!idx_files_info.has_value()) [[unlikely]] {
LOG(ERROR) << "expected inverted index files info, but none presents: "
<< idx_files_info.error();
} else {
_rowset_meta->add_inverted_index_files_info(idx_files_info.value());
if (rowset_schema->has_inverted_index()) {
if (auto idx_files_info = _idx_files.inverted_index_file_info(_segment_start_id);
!idx_files_info.has_value()) [[unlikely]] {
LOG(ERROR) << "expected inverted index files info, but none presents: "
<< idx_files_info.error();
} else {
_rowset_meta->add_inverted_index_files_info(idx_files_info.value());
}
}

RETURN_NOT_OK_STATUS_WITH_WARN(RowsetFactory::create_rowset(rowset_schema, _context.tablet_path,
Expand Down
2 changes: 1 addition & 1 deletion be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1036,7 +1036,7 @@ DEFINE_Int32(inverted_index_read_buffer_size, "4096");
// tree depth for bkd index
DEFINE_Int32(max_depth_in_bkd_tree, "32");
// index compaction
DEFINE_mBool(inverted_index_compaction_enable, "false");
DEFINE_mBool(inverted_index_compaction_enable, "true");
// Only for debug, do not use in production
DEFINE_mBool(debug_inverted_index_compaction, "false");
// index by RAM directory
Expand Down
197 changes: 44 additions & 153 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ Status Compaction::merge_input_rowsets() {
Status res;
{
SCOPED_TIMER(_merge_rowsets_latency_timer);
// 1. Merge segment files and write bkd inverted index
if (_is_vertical) {
res = Merger::vertical_merge_rowsets(_tablet, compaction_type(), *_cur_tablet_schema,
input_rs_readers, _output_rs_writer.get(),
Expand All @@ -198,21 +199,25 @@ Status Compaction::merge_input_rowsets() {
res = Merger::vmerge_rowsets(_tablet, compaction_type(), *_cur_tablet_schema,
input_rs_readers, _output_rs_writer.get(), &_stats);
}
}

_tablet->last_compaction_status = res;

if (!res.ok()) {
return res;
_tablet->last_compaction_status = res;
if (!res.ok()) {
return res;
}
// 2. Merge the remaining inverted index files of the string type
RETURN_IF_ERROR(do_inverted_index_compaction());
}

COUNTER_UPDATE(_merged_rows_counter, _stats.merged_rows);
COUNTER_UPDATE(_filtered_rows_counter, _stats.filtered_rows);

// 3. In the `build`, `_close_file_writers` is called to close the inverted index file writer and write the final compound index file.
RETURN_NOT_OK_STATUS_WITH_WARN(_output_rs_writer->build(_output_rowset),
fmt::format("rowset writer build failed. output_version: {}",
_output_version.to_string()));

// RETURN_IF_ERROR(_tablet->check_rowid_conversion(_output_rowset, _location_map));

//RETURN_IF_ERROR(_engine.meta_mgr().commit_rowset(*_output_rowset->rowset_meta().get()));

// Now we support delete in cumu compaction, to make all data in rowsets whose version
Expand Down Expand Up @@ -454,8 +459,6 @@ Status CompactionMixin::execute_compact_impl(int64_t permits) {

RETURN_IF_ERROR(merge_input_rowsets());

RETURN_IF_ERROR(do_inverted_index_compaction());

RETURN_IF_ERROR(modify_rowsets());

auto* cumu_policy = tablet()->cumulative_compaction_policy();
Expand Down Expand Up @@ -488,40 +491,6 @@ Status Compaction::do_inverted_index_compaction() {

OlapStopWatch inverted_watch;

int64_t cur_max_version = 0;
{
std::shared_lock rlock(_tablet->get_header_lock());
cur_max_version = _tablet->max_version_unlocked();
}

DeleteBitmap output_rowset_delete_bitmap(_tablet->tablet_id());
std::set<RowLocation> missed_rows;
std::map<RowsetSharedPtr, std::list<std::pair<RowLocation, RowLocation>>> location_map;
// Convert the delete bitmap of the input rowsets to output rowset.
_tablet->calc_compaction_output_rowset_delete_bitmap(
_input_rowsets, *_rowid_conversion, 0, cur_max_version + 1, &missed_rows, &location_map,
_tablet->tablet_meta()->delete_bitmap(), &output_rowset_delete_bitmap);

if (!_allow_delete_in_cumu_compaction) {
if (compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION &&
_stats.merged_rows != missed_rows.size() && _tablet->tablet_state() == TABLET_RUNNING) {
std::string err_msg = fmt::format(
"cumulative compaction: the merged rows({}) is not equal to missed "
"rows({}) in rowid conversion, tablet_id: {}, table_id:{}",
_stats.merged_rows, missed_rows.size(), _tablet->tablet_id(),
_tablet->table_id());
if (config::enable_mow_compaction_correctness_check_core) {
CHECK(false) << err_msg;
} else {
DCHECK(false) << err_msg;
}
// log here just for debugging, do not return error
LOG(WARNING) << err_msg;
}
}

RETURN_IF_ERROR(_tablet->check_rowid_conversion(_output_rowset, location_map));

// translation vec
// <<dest_idx_num, dest_docId>>
// the first level vector: index indicates src segment.
Expand Down Expand Up @@ -645,58 +614,9 @@ Status Compaction::do_inverted_index_compaction() {

// dest index files
// format: rowsetId_segmentId
std::vector<std::unique_ptr<InvertedIndexFileWriter>> inverted_index_file_writers(
dest_segment_num);

// Some columns have already been indexed
// key: seg_id, value: inverted index file size
std::unordered_map<int, int64_t> compacted_idx_file_size;
for (int seg_id = 0; seg_id < dest_segment_num; ++seg_id) {
std::string index_path_prefix {
InvertedIndexDescriptor::get_index_file_path_prefix(ctx.segment_path(seg_id))};
auto inverted_index_file_reader = std::make_unique<InvertedIndexFileReader>(
ctx.fs(), index_path_prefix,
_cur_tablet_schema->get_inverted_index_storage_format());
bool open_idx_file_cache = false;
auto st = inverted_index_file_reader->init(config::inverted_index_read_buffer_size,
open_idx_file_cache);
if (st.ok()) {
auto index_not_need_to_compact =
DORIS_TRY(inverted_index_file_reader->get_all_directories());
// V1: each index is a separate file
// V2: all indexes are in a single file
if (_cur_tablet_schema->get_inverted_index_storage_format() !=
doris::InvertedIndexStorageFormatPB::V1) {
int64_t fsize = 0;
st = ctx.fs()->file_size(
InvertedIndexDescriptor::get_index_file_path_v2(index_path_prefix), &fsize);
if (!st.ok()) {
LOG(ERROR) << "file size error in index compaction, error:" << st.msg();
return st;
}
compacted_idx_file_size[seg_id] = fsize;
}
auto inverted_index_file_writer = std::make_unique<InvertedIndexFileWriter>(
ctx.fs(), index_path_prefix, ctx.rowset_id.to_string(), seg_id,
_cur_tablet_schema->get_inverted_index_storage_format());
RETURN_IF_ERROR(inverted_index_file_writer->initialize(index_not_need_to_compact));
inverted_index_file_writers[seg_id] = std::move(inverted_index_file_writer);
} else if (st.is<ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND>()) {
auto inverted_index_file_writer = std::make_unique<InvertedIndexFileWriter>(
ctx.fs(), index_path_prefix, ctx.rowset_id.to_string(), seg_id,
_cur_tablet_schema->get_inverted_index_storage_format());
inverted_index_file_writers[seg_id] = std::move(inverted_index_file_writer);
// no index file
compacted_idx_file_size[seg_id] = 0;
} else {
LOG(ERROR) << "inverted_index_file_reader init failed in index compaction, error:"
<< st;
return st;
}
}
for (const auto& writer : inverted_index_file_writers) {
writer->set_file_writer_opts(ctx.get_file_writer_options());
}
auto& inverted_index_file_writers = dynamic_cast<BaseBetaRowsetWriter*>(_output_rs_writer.get())
->inverted_index_file_writers();
DCHECK_EQ(inverted_index_file_writers.size(), dest_segment_num);

// use tmp file dir to store index files
auto tmp_file_dir = ExecEnv::GetInstance()->get_tmp_file_dirs()->get_tmp_file_dir();
Expand All @@ -722,29 +642,6 @@ Status Compaction::do_inverted_index_compaction() {
auto col = _cur_tablet_schema->column_by_uid(column_uniq_id);
const auto* index_meta = _cur_tablet_schema->get_inverted_index(col);

// if index properties are different, index compaction maybe needs to be skipped.
bool is_continue = false;
std::optional<std::map<std::string, std::string>> first_properties;
for (const auto& rowset : _input_rowsets) {
const auto* tablet_index = rowset->tablet_schema()->get_inverted_index(col);
const auto& properties = tablet_index->properties();
if (!first_properties.has_value()) {
first_properties = properties;
} else {
if (properties != first_properties.value()) {
error_handler(index_meta->index_id(), column_uniq_id);
status = Status::Error<INVERTED_INDEX_COMPACTION_ERROR>(
"if index properties are different, index compaction needs to be "
"skipped.");
is_continue = true;
break;
}
}
}
if (is_continue) {
continue;
}

std::vector<lucene::store::Directory*> dest_index_dirs(dest_segment_num);
std::vector<lucene::store::Directory*> src_index_dirs(src_segment_num);
try {
Expand All @@ -770,40 +667,13 @@ Status Compaction::do_inverted_index_compaction() {
}
}

std::vector<InvertedIndexFileInfo> all_inverted_index_file_info(dest_segment_num);
uint64_t inverted_index_file_size = 0;
for (int seg_id = 0; seg_id < dest_segment_num; ++seg_id) {
auto inverted_index_file_writer = inverted_index_file_writers[seg_id].get();
if (Status st = inverted_index_file_writer->close(); !st.ok()) {
status = Status::Error<INVERTED_INDEX_COMPACTION_ERROR>(st.msg());
} else {
inverted_index_file_size += inverted_index_file_writer->get_index_file_total_size();
inverted_index_file_size -= compacted_idx_file_size[seg_id];
}
all_inverted_index_file_info[seg_id] = inverted_index_file_writer->get_index_file_info();
}
// check index compaction status. If status is not ok, we should return error and end this compaction round.
if (!status.ok()) {
return status;
}

// index compaction should update total disk size and index disk size
_output_rowset->rowset_meta()->set_data_disk_size(_output_rowset->data_disk_size() +
inverted_index_file_size);
_output_rowset->rowset_meta()->set_total_disk_size(_output_rowset->data_disk_size() +
inverted_index_file_size);
_output_rowset->rowset_meta()->set_index_disk_size(_output_rowset->index_disk_size() +
inverted_index_file_size);

_output_rowset->rowset_meta()->update_inverted_index_files_info(all_inverted_index_file_info);
COUNTER_UPDATE(_output_rowset_data_size_counter, _output_rowset->data_disk_size());

LOG(INFO) << "succeed to do index compaction"
<< ". tablet=" << _tablet->tablet_id() << ", input row number=" << _input_row_num
<< ", output row number=" << _output_rowset->num_rows()
<< ", input_rowset_size=" << _input_rowsets_size
<< ", output_rowset_size=" << _output_rowset->data_disk_size()
<< ", inverted index file size=" << inverted_index_file_size
<< ". tablet=" << _tablet->tablet_id()
<< ". elapsed time=" << inverted_watch.get_elapse_second() << "s.";

return Status::OK();
Expand All @@ -814,8 +684,35 @@ void Compaction::construct_skip_inverted_index(RowsetWriterContext& ctx) {
if (index.index_type() != IndexType::INVERTED) {
continue;
}

auto col_unique_id = index.col_unique_ids()[0];

// if index properties are different, index compaction maybe needs to be skipped.
bool is_continue = false;
std::optional<std::map<std::string, std::string>> first_properties;
for (const auto& rowset : _input_rowsets) {
const auto* tablet_index =
rowset->tablet_schema()->get_inverted_index(col_unique_id, "");
// no inverted index
if (tablet_index == nullptr) {
ctx.skip_inverted_index.insert(col_unique_id);
is_continue = true;
break;
}
const auto& properties = tablet_index->properties();
if (!first_properties.has_value()) {
first_properties = properties;
} else {
if (properties != first_properties.value()) {
ctx.skip_inverted_index.insert(col_unique_id);
is_continue = true;
break;
}
}
}
if (is_continue) {
continue;
}

auto has_inverted_index = [&](const RowsetSharedPtr& src_rs) {
auto* rowset = static_cast<BetaRowset*>(src_rs.get());
if (rowset->is_skip_index_compaction(col_unique_id)) {
Expand Down Expand Up @@ -909,9 +806,7 @@ Status CompactionMixin::construct_output_rowset_writer(RowsetWriterContext& ctx)
if (config::inverted_index_compaction_enable &&
(((_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
_tablet->enable_unique_key_merge_on_write()) ||
_tablet->keys_type() == KeysType::DUP_KEYS)) &&
_cur_tablet_schema->get_inverted_index_storage_format() ==
InvertedIndexStorageFormatPB::V1) {
_tablet->keys_type() == KeysType::DUP_KEYS))) {
construct_skip_inverted_index(ctx);
}
ctx.version = _output_version;
Expand Down Expand Up @@ -1177,8 +1072,6 @@ Status CloudCompactionMixin::execute_compact_impl(int64_t permits) {

RETURN_IF_ERROR(merge_input_rowsets());

RETURN_IF_ERROR(do_inverted_index_compaction());

RETURN_IF_ERROR(_engine.meta_mgr().commit_rowset(*_output_rowset->rowset_meta().get()));

// 4. modify rowsets in memory
Expand All @@ -1205,9 +1098,7 @@ Status CloudCompactionMixin::construct_output_rowset_writer(RowsetWriterContext&
if (config::inverted_index_compaction_enable &&
(((_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
_tablet->enable_unique_key_merge_on_write()) ||
_tablet->keys_type() == KeysType::DUP_KEYS)) &&
_cur_tablet_schema->get_inverted_index_storage_format() ==
InvertedIndexStorageFormatPB::V1) {
_tablet->keys_type() == KeysType::DUP_KEYS))) {
construct_skip_inverted_index(ctx);
}

Expand Down
1 change: 1 addition & 0 deletions be/src/olap/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class Compaction {
protected:
Status merge_input_rowsets();

// merge inverted index files
Status do_inverted_index_compaction();

void construct_skip_inverted_index(RowsetWriterContext& ctx);
Expand Down
Loading

0 comments on commit 7876eaa

Please sign in to comment.