From 2236077a0baedccd5447b00bdf55b79df86d52cb Mon Sep 17 00:00:00 2001 From: csun5285 Date: Wed, 9 Oct 2024 19:48:05 +0800 Subject: [PATCH] refactor change config fix complie fix index build fix fix regression fix add log fix fix variant schema fix fix fix fix size --- be/src/cloud/cloud_rowset_writer.cpp | 15 +- be/src/common/config.cpp | 2 +- be/src/olap/compaction.cpp | 197 ++++-------------- be/src/olap/compaction.h | 1 + be/src/olap/rowset/beta_rowset_writer.cpp | 117 +++++++++-- be/src/olap/rowset/beta_rowset_writer.h | 86 ++++---- be/src/olap/rowset/beta_rowset_writer_v2.cpp | 2 +- be/src/olap/rowset/beta_rowset_writer_v2.h | 3 +- be/src/olap/rowset/rowset_meta.cpp | 10 +- be/src/olap/rowset/rowset_meta.h | 5 +- be/src/olap/rowset/rowset_writer.h | 19 ++ be/src/olap/rowset/segcompaction.cpp | 4 + be/src/olap/rowset/segcompaction.h | 5 +- be/src/olap/rowset/segment_creator.cpp | 68 +++--- be/src/olap/rowset/segment_creator.h | 17 +- .../segment_v2/inverted_index_file_writer.cpp | 14 +- .../segment_v2/inverted_index_file_writer.h | 39 +++- .../olap/rowset/segment_v2/segment_writer.cpp | 27 +-- .../olap/rowset/segment_v2/segment_writer.h | 24 ++- .../segment_v2/vertical_segment_writer.cpp | 29 +-- .../segment_v2/vertical_segment_writer.h | 25 ++- .../rowset/vertical_beta_rowset_writer.cpp | 36 ++-- be/src/olap/tablet_schema.cpp | 3 + be/src/olap/tablet_schema.h | 5 + be/src/olap/task/index_builder.cpp | 24 ++- be/src/runtime/load_stream_writer.cpp | 7 +- .../olap/delete_bitmap_calculator_test.cpp | 3 +- 27 files changed, 412 insertions(+), 375 deletions(-) diff --git a/be/src/cloud/cloud_rowset_writer.cpp b/be/src/cloud/cloud_rowset_writer.cpp index 642077b7e983ec..ebc411697ee4b1 100644 --- a/be/src/cloud/cloud_rowset_writer.cpp +++ b/be/src/cloud/cloud_rowset_writer.cpp @@ -115,13 +115,14 @@ Status CloudRowsetWriter::build(RowsetSharedPtr& rowset) { } else { _rowset_meta->add_segments_file_size(seg_file_size.value()); } - - if (auto idx_files_info = _idx_files_info.get_inverted_files_info(_segment_start_id); - !idx_files_info.has_value()) [[unlikely]] { - LOG(ERROR) << "expected inverted index files info, but none presents: " - << idx_files_info.error(); - } else { - _rowset_meta->add_inverted_index_files_info(idx_files_info.value()); + if (rowset_schema->has_inverted_index()) { + if (auto idx_files_info = _idx_files.inverted_index_file_info(_segment_start_id); + !idx_files_info.has_value()) [[unlikely]] { + LOG(ERROR) << "expected inverted index files info, but none presents: " + << idx_files_info.error(); + } else { + _rowset_meta->add_inverted_index_files_info(idx_files_info.value()); + } } RETURN_NOT_OK_STATUS_WITH_WARN(RowsetFactory::create_rowset(rowset_schema, _context.tablet_path, diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 5527ab07885f12..70ac92fe3a0aea 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1039,7 +1039,7 @@ DEFINE_Int32(inverted_index_read_buffer_size, "4096"); // tree depth for bkd index DEFINE_Int32(max_depth_in_bkd_tree, "32"); // index compaction -DEFINE_mBool(inverted_index_compaction_enable, "false"); +DEFINE_mBool(inverted_index_compaction_enable, "true"); // Only for debug, do not use in production DEFINE_mBool(debug_inverted_index_compaction, "false"); // index by RAM directory diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index 8b37e9ba174f2c..aadca01c7bd5b4 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -186,6 +186,7 @@ Status Compaction::merge_input_rowsets() { Status res; { SCOPED_TIMER(_merge_rowsets_latency_timer); + // 1. Merge segment files and write bkd inverted index if (_is_vertical) { res = Merger::vertical_merge_rowsets(_tablet, compaction_type(), *_cur_tablet_schema, input_rs_readers, _output_rs_writer.get(), @@ -198,21 +199,25 @@ Status Compaction::merge_input_rowsets() { res = Merger::vmerge_rowsets(_tablet, compaction_type(), *_cur_tablet_schema, input_rs_readers, _output_rs_writer.get(), &_stats); } - } - - _tablet->last_compaction_status = res; - if (!res.ok()) { - return res; + _tablet->last_compaction_status = res; + if (!res.ok()) { + return res; + } + // 2. Merge the remaining inverted index files of the string type + RETURN_IF_ERROR(do_inverted_index_compaction()); } COUNTER_UPDATE(_merged_rows_counter, _stats.merged_rows); COUNTER_UPDATE(_filtered_rows_counter, _stats.filtered_rows); + // 3. In the `build`, `_close_file_writers` is called to close the inverted index file writer and write the final compound index file. RETURN_NOT_OK_STATUS_WITH_WARN(_output_rs_writer->build(_output_rowset), fmt::format("rowset writer build failed. output_version: {}", _output_version.to_string())); + // RETURN_IF_ERROR(_tablet->check_rowid_conversion(_output_rowset, _location_map)); + //RETURN_IF_ERROR(_engine.meta_mgr().commit_rowset(*_output_rowset->rowset_meta().get())); // Now we support delete in cumu compaction, to make all data in rowsets whose version @@ -454,8 +459,6 @@ Status CompactionMixin::execute_compact_impl(int64_t permits) { RETURN_IF_ERROR(merge_input_rowsets()); - RETURN_IF_ERROR(do_inverted_index_compaction()); - RETURN_IF_ERROR(modify_rowsets()); auto* cumu_policy = tablet()->cumulative_compaction_policy(); @@ -488,40 +491,6 @@ Status Compaction::do_inverted_index_compaction() { OlapStopWatch inverted_watch; - int64_t cur_max_version = 0; - { - std::shared_lock rlock(_tablet->get_header_lock()); - cur_max_version = _tablet->max_version_unlocked(); - } - - DeleteBitmap output_rowset_delete_bitmap(_tablet->tablet_id()); - std::set missed_rows; - std::map>> location_map; - // Convert the delete bitmap of the input rowsets to output rowset. - _tablet->calc_compaction_output_rowset_delete_bitmap( - _input_rowsets, *_rowid_conversion, 0, cur_max_version + 1, &missed_rows, &location_map, - _tablet->tablet_meta()->delete_bitmap(), &output_rowset_delete_bitmap); - - if (!_allow_delete_in_cumu_compaction) { - if (compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION && - _stats.merged_rows != missed_rows.size() && _tablet->tablet_state() == TABLET_RUNNING) { - std::string err_msg = fmt::format( - "cumulative compaction: the merged rows({}) is not equal to missed " - "rows({}) in rowid conversion, tablet_id: {}, table_id:{}", - _stats.merged_rows, missed_rows.size(), _tablet->tablet_id(), - _tablet->table_id()); - if (config::enable_mow_compaction_correctness_check_core) { - CHECK(false) << err_msg; - } else { - DCHECK(false) << err_msg; - } - // log here just for debugging, do not return error - LOG(WARNING) << err_msg; - } - } - - RETURN_IF_ERROR(_tablet->check_rowid_conversion(_output_rowset, location_map)); - // translation vec // <> // the first level vector: index indicates src segment. @@ -645,58 +614,9 @@ Status Compaction::do_inverted_index_compaction() { // dest index files // format: rowsetId_segmentId - std::vector> inverted_index_file_writers( - dest_segment_num); - - // Some columns have already been indexed - // key: seg_id, value: inverted index file size - std::unordered_map compacted_idx_file_size; - for (int seg_id = 0; seg_id < dest_segment_num; ++seg_id) { - std::string index_path_prefix { - InvertedIndexDescriptor::get_index_file_path_prefix(ctx.segment_path(seg_id))}; - auto inverted_index_file_reader = std::make_unique( - ctx.fs(), index_path_prefix, - _cur_tablet_schema->get_inverted_index_storage_format()); - bool open_idx_file_cache = false; - auto st = inverted_index_file_reader->init(config::inverted_index_read_buffer_size, - open_idx_file_cache); - if (st.ok()) { - auto index_not_need_to_compact = - DORIS_TRY(inverted_index_file_reader->get_all_directories()); - // V1: each index is a separate file - // V2: all indexes are in a single file - if (_cur_tablet_schema->get_inverted_index_storage_format() != - doris::InvertedIndexStorageFormatPB::V1) { - int64_t fsize = 0; - st = ctx.fs()->file_size( - InvertedIndexDescriptor::get_index_file_path_v2(index_path_prefix), &fsize); - if (!st.ok()) { - LOG(ERROR) << "file size error in index compaction, error:" << st.msg(); - return st; - } - compacted_idx_file_size[seg_id] = fsize; - } - auto inverted_index_file_writer = std::make_unique( - ctx.fs(), index_path_prefix, ctx.rowset_id.to_string(), seg_id, - _cur_tablet_schema->get_inverted_index_storage_format()); - RETURN_IF_ERROR(inverted_index_file_writer->initialize(index_not_need_to_compact)); - inverted_index_file_writers[seg_id] = std::move(inverted_index_file_writer); - } else if (st.is()) { - auto inverted_index_file_writer = std::make_unique( - ctx.fs(), index_path_prefix, ctx.rowset_id.to_string(), seg_id, - _cur_tablet_schema->get_inverted_index_storage_format()); - inverted_index_file_writers[seg_id] = std::move(inverted_index_file_writer); - // no index file - compacted_idx_file_size[seg_id] = 0; - } else { - LOG(ERROR) << "inverted_index_file_reader init failed in index compaction, error:" - << st; - return st; - } - } - for (const auto& writer : inverted_index_file_writers) { - writer->set_file_writer_opts(ctx.get_file_writer_options()); - } + auto& inverted_index_file_writers = dynamic_cast(_output_rs_writer.get()) + ->inverted_index_file_writers(); + DCHECK_EQ(inverted_index_file_writers.size(), dest_segment_num); // use tmp file dir to store index files auto tmp_file_dir = ExecEnv::GetInstance()->get_tmp_file_dirs()->get_tmp_file_dir(); @@ -722,29 +642,6 @@ Status Compaction::do_inverted_index_compaction() { auto col = _cur_tablet_schema->column_by_uid(column_uniq_id); const auto* index_meta = _cur_tablet_schema->get_inverted_index(col); - // if index properties are different, index compaction maybe needs to be skipped. - bool is_continue = false; - std::optional> first_properties; - for (const auto& rowset : _input_rowsets) { - const auto* tablet_index = rowset->tablet_schema()->get_inverted_index(col); - const auto& properties = tablet_index->properties(); - if (!first_properties.has_value()) { - first_properties = properties; - } else { - if (properties != first_properties.value()) { - error_handler(index_meta->index_id(), column_uniq_id); - status = Status::Error( - "if index properties are different, index compaction needs to be " - "skipped."); - is_continue = true; - break; - } - } - } - if (is_continue) { - continue; - } - std::vector dest_index_dirs(dest_segment_num); std::vector src_index_dirs(src_segment_num); try { @@ -770,40 +667,13 @@ Status Compaction::do_inverted_index_compaction() { } } - std::vector all_inverted_index_file_info(dest_segment_num); - uint64_t inverted_index_file_size = 0; - for (int seg_id = 0; seg_id < dest_segment_num; ++seg_id) { - auto inverted_index_file_writer = inverted_index_file_writers[seg_id].get(); - if (Status st = inverted_index_file_writer->close(); !st.ok()) { - status = Status::Error(st.msg()); - } else { - inverted_index_file_size += inverted_index_file_writer->get_index_file_total_size(); - inverted_index_file_size -= compacted_idx_file_size[seg_id]; - } - all_inverted_index_file_info[seg_id] = inverted_index_file_writer->get_index_file_info(); - } // check index compaction status. If status is not ok, we should return error and end this compaction round. if (!status.ok()) { return status; } - // index compaction should update total disk size and index disk size - _output_rowset->rowset_meta()->set_data_disk_size(_output_rowset->data_disk_size() + - inverted_index_file_size); - _output_rowset->rowset_meta()->set_total_disk_size(_output_rowset->data_disk_size() + - inverted_index_file_size); - _output_rowset->rowset_meta()->set_index_disk_size(_output_rowset->index_disk_size() + - inverted_index_file_size); - - _output_rowset->rowset_meta()->update_inverted_index_files_info(all_inverted_index_file_info); - COUNTER_UPDATE(_output_rowset_data_size_counter, _output_rowset->data_disk_size()); - LOG(INFO) << "succeed to do index compaction" - << ". tablet=" << _tablet->tablet_id() << ", input row number=" << _input_row_num - << ", output row number=" << _output_rowset->num_rows() - << ", input_rowset_size=" << _input_rowsets_size - << ", output_rowset_size=" << _output_rowset->data_disk_size() - << ", inverted index file size=" << inverted_index_file_size + << ". tablet=" << _tablet->tablet_id() << ". elapsed time=" << inverted_watch.get_elapse_second() << "s."; return Status::OK(); @@ -814,8 +684,35 @@ void Compaction::construct_skip_inverted_index(RowsetWriterContext& ctx) { if (index.index_type() != IndexType::INVERTED) { continue; } - auto col_unique_id = index.col_unique_ids()[0]; + + // if index properties are different, index compaction maybe needs to be skipped. + bool is_continue = false; + std::optional> first_properties; + for (const auto& rowset : _input_rowsets) { + const auto* tablet_index = + rowset->tablet_schema()->get_inverted_index(col_unique_id, ""); + // no inverted index + if (tablet_index == nullptr) { + ctx.skip_inverted_index.insert(col_unique_id); + is_continue = true; + break; + } + const auto& properties = tablet_index->properties(); + if (!first_properties.has_value()) { + first_properties = properties; + } else { + if (properties != first_properties.value()) { + ctx.skip_inverted_index.insert(col_unique_id); + is_continue = true; + break; + } + } + } + if (is_continue) { + continue; + } + auto has_inverted_index = [&](const RowsetSharedPtr& src_rs) { auto* rowset = static_cast(src_rs.get()); if (rowset->is_skip_index_compaction(col_unique_id)) { @@ -909,9 +806,7 @@ Status CompactionMixin::construct_output_rowset_writer(RowsetWriterContext& ctx) if (config::inverted_index_compaction_enable && (((_tablet->keys_type() == KeysType::UNIQUE_KEYS && _tablet->enable_unique_key_merge_on_write()) || - _tablet->keys_type() == KeysType::DUP_KEYS)) && - _cur_tablet_schema->get_inverted_index_storage_format() == - InvertedIndexStorageFormatPB::V1) { + _tablet->keys_type() == KeysType::DUP_KEYS))) { construct_skip_inverted_index(ctx); } ctx.version = _output_version; @@ -1177,8 +1072,6 @@ Status CloudCompactionMixin::execute_compact_impl(int64_t permits) { RETURN_IF_ERROR(merge_input_rowsets()); - RETURN_IF_ERROR(do_inverted_index_compaction()); - RETURN_IF_ERROR(_engine.meta_mgr().commit_rowset(*_output_rowset->rowset_meta().get())); // 4. modify rowsets in memory @@ -1205,9 +1098,7 @@ Status CloudCompactionMixin::construct_output_rowset_writer(RowsetWriterContext& if (config::inverted_index_compaction_enable && (((_tablet->keys_type() == KeysType::UNIQUE_KEYS && _tablet->enable_unique_key_merge_on_write()) || - _tablet->keys_type() == KeysType::DUP_KEYS)) && - _cur_tablet_schema->get_inverted_index_storage_format() == - InvertedIndexStorageFormatPB::V1) { + _tablet->keys_type() == KeysType::DUP_KEYS))) { construct_skip_inverted_index(ctx); } diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h index 38d50595ca8f6e..52e87266c46725 100644 --- a/be/src/olap/compaction.h +++ b/be/src/olap/compaction.h @@ -67,6 +67,7 @@ class Compaction { protected: Status merge_input_rowsets(); + // merge inverted index files Status do_inverted_index_compaction(); void construct_skip_inverted_index(RowsetWriterContext& ctx); diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index 5d1b80f8cd7b23..c8d5cb10980fdd 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -189,13 +189,75 @@ Result> SegmentFileCollection::segments_file_size(int seg_id return ResultError(st); } +InvertedIndexFileCollection::~InvertedIndexFileCollection() = default; + +Status InvertedIndexFileCollection::add(int seg_id, InvertedIndexFileWriterPtr&& index_writer) { + std::lock_guard lock(_lock); + + _inverted_index_file_writers.emplace(seg_id, std::move(index_writer)); + return Status::OK(); +} + +Status InvertedIndexFileCollection::close() { + std::lock_guard lock(_lock); + for (auto&& [id, writer] : _inverted_index_file_writers) { + RETURN_IF_ERROR(writer->close()); + _total_size += writer->get_index_file_total_size(); + } + + return Status::OK(); +} + +Result> +InvertedIndexFileCollection::inverted_index_file_info(int seg_id_offset) { + std::lock_guard lock(_lock); + + Status st; + std::vector idx_file_info(_inverted_index_file_writers.size()); + bool succ = std::all_of( + _inverted_index_file_writers.begin(), _inverted_index_file_writers.end(), + [&](auto&& it) { + auto&& [seg_id, writer] = it; + + int idx = seg_id - seg_id_offset; + if (idx >= idx_file_info.size()) [[unlikely]] { + auto err_msg = + fmt::format("invalid seg_id={} num_file_writers={} seg_id_offset={}", + seg_id, idx_file_info.size(), seg_id_offset); + DCHECK(false) << err_msg; + st = Status::InternalError(err_msg); + return false; + } + idx_file_info[idx] = _inverted_index_file_writers[seg_id]->get_index_file_info(); + // no inverted index info means something wrong + // if (!(idx_file_info[idx]->has_index_size()) && + // !(idx_file_info[idx]->index_info_size() > 0)) { + // auto err_msg = fmt::format( + // "invalid index file info, seg_id={} num_file_writers={} " + // "seg_id_offset={}, index writer={}", + // seg_id, idx_file_info.size(), seg_id_offset, + // _inverted_index_file_writers[seg_id]->debug_string()); + // DCHECK(false) << err_msg; + // st = Status::InternalError(err_msg); + // return false; + // } + return true; + }); + + if (succ) { + return idx_file_info; + } + + return ResultError(st); +} + BaseBetaRowsetWriter::BaseBetaRowsetWriter() : _num_segment(0), _segment_start_id(0), _num_rows_written(0), _total_data_size(0), _total_index_size(0), - _segment_creator(_context, _seg_files, _idx_files_info) {} + _segment_creator(_context, _seg_files, _idx_files) {} BetaRowsetWriter::BetaRowsetWriter(StorageEngine& engine) : _engine(engine), _segcompaction_worker(std::make_shared(this)) {} @@ -728,7 +790,6 @@ Status BetaRowsetWriter::_close_file_writers() { Status BetaRowsetWriter::build(RowsetSharedPtr& rowset) { RETURN_IF_ERROR(_close_file_writers()); - const auto total_segment_num = _num_segment - _segcompacted_point + 1 + _num_segcompacted; RETURN_NOT_OK_STATUS_WITH_WARN(_check_segment_number_limit(total_segment_num), "too many segments when build new rowset"); @@ -748,12 +809,14 @@ Status BetaRowsetWriter::build(RowsetSharedPtr& rowset) { : _context.tablet_schema; _rowset_meta->set_tablet_schema(rowset_schema); - if (auto idx_files_info = _idx_files_info.get_inverted_files_info(_segment_start_id); - !idx_files_info.has_value()) [[unlikely]] { - LOG(ERROR) << "expected inverted index files info, but none presents: " - << idx_files_info.error(); - } else { - _rowset_meta->add_inverted_index_files_info(idx_files_info.value()); + if (rowset_schema->has_inverted_index()) { + if (auto idx_files_info = _idx_files.inverted_index_file_info(_segment_start_id); + !idx_files_info.has_value()) [[unlikely]] { + LOG(ERROR) << "expected inverted index files info, but none presents: " + << idx_files_info.error(); + } else { + _rowset_meta->add_inverted_index_files_info(idx_files_info.value()); + } } RETURN_NOT_OK_STATUS_WITH_WARN(RowsetFactory::create_rowset(rowset_schema, _context.tablet_path, @@ -830,7 +893,8 @@ Status BaseBetaRowsetWriter::_build_rowset_meta(RowsetMeta* rowset_meta, bool ch rowset_meta->set_num_segments(segment_num); rowset_meta->set_num_rows(num_rows_written + _num_rows_written); - rowset_meta->set_total_disk_size(total_data_size + _total_data_size); + rowset_meta->set_total_disk_size(total_data_size + _total_data_size + total_index_size + + _total_index_size); rowset_meta->set_data_disk_size(total_data_size + _total_data_size); rowset_meta->set_index_disk_size(total_index_size + _total_index_size); rowset_meta->set_segments_key_bounds(segments_encoded_key_bounds); @@ -891,6 +955,14 @@ Status BaseBetaRowsetWriter::create_file_writer(uint32_t segment_id, io::FileWri fmt::format("failed to create file = {}, file type = {}", segment_path, file_type)); } +Status BaseBetaRowsetWriter::create_inverted_index_file_writer( + uint32_t segment_id, InvertedIndexFileWriterPtr* index_file_writer) { + RETURN_IF_ERROR(RowsetWriter::create_inverted_index_file_writer(segment_id, index_file_writer)); + // used for inverted index format v1 + (*index_file_writer)->set_file_writer_opts(_context.get_file_writer_options()); + return Status::OK(); +} + Status BetaRowsetWriter::_create_segment_writer_for_segcompaction( std::unique_ptr* writer, int64_t begin, int64_t end) { DCHECK(begin >= 0 && end >= 0); @@ -898,6 +970,21 @@ Status BetaRowsetWriter::_create_segment_writer_for_segcompaction( _context.rowset_id, begin, end); io::FileWriterPtr file_writer; RETURN_IF_ERROR(_create_file_writer(path, file_writer)); + std::string prefix = std::string {InvertedIndexDescriptor::get_index_file_path_prefix(path)}; + std::string index_path = InvertedIndexDescriptor::get_index_file_path_v2(prefix); + + InvertedIndexFileWriterPtr index_file_writer; + if (_context.tablet_schema->has_inverted_index()) { + io::FileWriterPtr idx_file_writer; + if (_context.tablet_schema->get_inverted_index_storage_format() != + InvertedIndexStorageFormatPB::V1) { + RETURN_IF_ERROR(_create_file_writer(index_path, idx_file_writer)); + } + index_file_writer = std::make_unique( + _context.fs(), path, _context.rowset_id.to_string(), _num_segcompacted, + _context.tablet_schema->get_inverted_index_storage_format(), + std::move(idx_file_writer)); + } segment_v2::SegmentWriterOptions writer_options; writer_options.enable_unique_key_merge_on_write = _context.enable_unique_key_merge_on_write; @@ -907,15 +994,15 @@ Status BetaRowsetWriter::_create_segment_writer_for_segcompaction( writer_options.max_rows_per_segment = _context.max_rows_per_segment; writer_options.mow_ctx = _context.mow_context; - *writer = std::make_unique(file_writer.get(), _num_segcompacted, - _context.tablet_schema, _context.tablet, - _context.data_dir, writer_options); + *writer = std::make_unique( + file_writer.get(), _num_segcompacted, _context.tablet_schema, _context.tablet, + _context.data_dir, writer_options, index_file_writer.get()); if (auto& seg_writer = _segcompaction_worker->get_file_writer(); seg_writer != nullptr && seg_writer->state() != io::FileWriter::State::CLOSED) { RETURN_IF_ERROR(_segcompaction_worker->get_file_writer()->close()); } _segcompaction_worker->get_file_writer().reset(file_writer.release()); - + _segcompaction_worker->get_idx_file_writer().reset(index_file_writer.release()); return Status::OK(); } @@ -1008,8 +1095,8 @@ Status BetaRowsetWriter::flush_segment_writer_for_segcompaction( SegmentStatistics segstat; segstat.row_num = row_num; - segstat.data_size = segment_size + (*writer)->get_inverted_index_total_size(); - segstat.index_size = index_size + (*writer)->get_inverted_index_total_size(); + segstat.data_size = segment_size; + segstat.index_size = index_size; segstat.key_bounds = key_bounds; { std::lock_guard lock(_segid_statistics_map_mutex); diff --git a/be/src/olap/rowset/beta_rowset_writer.h b/be/src/olap/rowset/beta_rowset_writer.h index 82e4c9409b4853..4539959fab506b 100644 --- a/be/src/olap/rowset/beta_rowset_writer.h +++ b/be/src/olap/rowset/beta_rowset_writer.h @@ -42,6 +42,7 @@ #include "olap/rowset/rowset_writer.h" #include "olap/rowset/rowset_writer_context.h" #include "olap/rowset/segment_creator.h" +#include "segment_v2/inverted_index_file_writer.h" #include "segment_v2/segment.h" #include "util/spinlock.h" @@ -84,58 +85,33 @@ class SegmentFileCollection { bool _closed {false}; }; -// Collect the size of the inverted index files -class InvertedIndexFilesInfo { +class InvertedIndexFileCollection { public: + ~InvertedIndexFileCollection(); + + // `seg_id` -> inverted index file writer + Status add(int seg_id, InvertedIndexFileWriterPtr&& writer); + + // Close all file writers + // If the inverted index file writer is not closed, an error will be thrown during destruction + Status close(); + // Get inverted index file info in segment id order. - // Return the info of inverted index files from seg_id_offset to the last one. - Result> get_inverted_files_info(int seg_id_offset) { - std::lock_guard lock(_lock); - - Status st; - std::vector inverted_files_info(_inverted_index_files_info.size()); - bool succ = std::all_of( - _inverted_index_files_info.begin(), _inverted_index_files_info.end(), - [&](auto&& it) { - auto&& [seg_id, info] = it; - - int idx = seg_id - seg_id_offset; - if (idx >= inverted_files_info.size()) [[unlikely]] { - auto err_msg = fmt::format( - "invalid seg_id={} num_inverted_files_info={} seg_id_offset={}", - seg_id, inverted_files_info.size(), seg_id_offset); - DCHECK(false) << err_msg; - st = Status::InternalError(err_msg); - return false; - } - - auto& finfo = inverted_files_info[idx]; - if (finfo.has_index_size() || finfo.index_info_size() > 0) [[unlikely]] { - // File size should not been set - auto err_msg = fmt::format("duplicate seg_id={}", seg_id); - DCHECK(false) << err_msg; - st = Status::InternalError(err_msg); - return false; - } - finfo = info; - return true; - }); - - if (succ) { - return inverted_files_info; - } - - return ResultError(st); - } + // `seg_id_offset` is the offset of the segment id relative to the subscript of `_inverted_index_file_writers`, + // for more details, see `Tablet::create_transient_rowset_writer`. + Result> inverted_index_file_info(int seg_id_offset); - void add_file_info(int seg_id, InvertedIndexFileInfo file_info) { - std::lock_guard lock(_lock); - _inverted_index_files_info.emplace(seg_id, file_info); + // return all inverted index file writers + std::unordered_map& get_file_writers() { + return _inverted_index_file_writers; } + int64_t get_total_index_size() const { return _total_size; } + private: - std::unordered_map _inverted_index_files_info; mutable SpinLock _lock; + std::unordered_map _inverted_index_file_writers; + int64_t _total_size = 0; }; class BaseBetaRowsetWriter : public RowsetWriter { @@ -156,6 +132,9 @@ class BaseBetaRowsetWriter : public RowsetWriter { Status create_file_writer(uint32_t segment_id, io::FileWriterPtr& writer, FileType file_type = FileType::SEGMENT_FILE) override; + Status create_inverted_index_file_writer(uint32_t segment_id, + InvertedIndexFileWriterPtr* writer) override; + Status add_segment(uint32_t segment_id, const SegmentStatistics& segstat, TabletSchemaSPtr flush_schema) override; @@ -215,7 +194,9 @@ class BaseBetaRowsetWriter : public RowsetWriter { return _seg_files.get_file_writers(); } - InvertedIndexFilesInfo& get_inverted_index_files_info() { return _idx_files_info; } + std::unordered_map& inverted_index_file_writers() { + return this->_idx_files.get_file_writers(); + } private: void update_rowset_schema(TabletSchemaSPtr flush_schema); @@ -235,6 +216,15 @@ class BaseBetaRowsetWriter : public RowsetWriter { std::lock_guard l(_segid_statistics_map_mutex); return std::accumulate(_segment_num_rows.begin(), _segment_num_rows.end(), uint64_t(0)); } + // Only during vertical compaction is this method called + // Some index files are written during normal compaction and some files are written during index compaction. + // After all index writes are completed, call this method to write the final compound index file. + Status _close_inverted_index_file_writers() { + RETURN_NOT_OK_STATUS_WITH_WARN(_idx_files.close(), + "failed to close index file when build new rowset"); + this->_total_index_size += _idx_files.get_total_index_size(); + return Status::OK(); + } std::atomic _num_segment; // number of consecutive flushed segments roaring::Roaring _segment_set; // bitmap set to record flushed segment id @@ -242,6 +232,7 @@ class BaseBetaRowsetWriter : public RowsetWriter { int32_t _segment_start_id; // basic write start from 0, partial update may be different SegmentFileCollection _seg_files; + InvertedIndexFileCollection _idx_files; // record rows number of every segment already written, using for rowid // conversion when compaction in unique key with MoW model @@ -269,9 +260,6 @@ class BaseBetaRowsetWriter : public RowsetWriter { int64_t _delete_bitmap_ns = 0; int64_t _segment_writer_ns = 0; - - // map - InvertedIndexFilesInfo _idx_files_info; }; class SegcompactionWorker; diff --git a/be/src/olap/rowset/beta_rowset_writer_v2.cpp b/be/src/olap/rowset/beta_rowset_writer_v2.cpp index 0d0ad435b9efd1..cb5dd5a5ee272d 100644 --- a/be/src/olap/rowset/beta_rowset_writer_v2.cpp +++ b/be/src/olap/rowset/beta_rowset_writer_v2.cpp @@ -58,7 +58,7 @@ namespace doris { using namespace ErrorCode; BetaRowsetWriterV2::BetaRowsetWriterV2(const std::vector>& streams) - : _segment_creator(_context, _seg_files, _idx_files_info), _streams(streams) {} + : _segment_creator(_context, _seg_files, _idx_files), _streams(streams) {} BetaRowsetWriterV2::~BetaRowsetWriterV2() = default; diff --git a/be/src/olap/rowset/beta_rowset_writer_v2.h b/be/src/olap/rowset/beta_rowset_writer_v2.h index a9e41e603cef63..78ec4a7dce703c 100644 --- a/be/src/olap/rowset/beta_rowset_writer_v2.h +++ b/be/src/olap/rowset/beta_rowset_writer_v2.h @@ -154,11 +154,10 @@ class BetaRowsetWriterV2 : public RowsetWriter { std::vector _segments_encoded_key_bounds; SegmentFileCollection _seg_files; + InvertedIndexFileCollection _idx_files; SegmentCreator _segment_creator; - InvertedIndexFilesInfo _idx_files_info; - fmt::memory_buffer vlog_buffer; std::vector> _streams; diff --git a/be/src/olap/rowset/rowset_meta.cpp b/be/src/olap/rowset/rowset_meta.cpp index 1843fb8a41ee08..1571105fa73471 100644 --- a/be/src/olap/rowset/rowset_meta.cpp +++ b/be/src/olap/rowset/rowset_meta.cpp @@ -273,20 +273,14 @@ InvertedIndexFileInfo RowsetMeta::inverted_index_file_info(int seg_id) { } void RowsetMeta::add_inverted_index_files_info( - const std::vector& idx_file_info) { + const std::vector& idx_file_info) { _rowset_meta_pb.set_enable_inverted_index_file_info(true); for (auto finfo : idx_file_info) { auto* new_file_info = _rowset_meta_pb.add_inverted_index_file_info(); - *new_file_info = finfo; + *new_file_info = *finfo; } } -void RowsetMeta::update_inverted_index_files_info( - const std::vector& idx_file_info) { - _rowset_meta_pb.clear_inverted_index_file_info(); - add_inverted_index_files_info(idx_file_info); -} - bool operator==(const RowsetMeta& a, const RowsetMeta& b) { if (a._rowset_id != b._rowset_id) return false; if (a._is_removed_from_rowset_meta != b._is_removed_from_rowset_meta) return false; diff --git a/be/src/olap/rowset/rowset_meta.h b/be/src/olap/rowset/rowset_meta.h index 164d42cbb16230..46121aeae2be6d 100644 --- a/be/src/olap/rowset/rowset_meta.h +++ b/be/src/olap/rowset/rowset_meta.h @@ -364,9 +364,8 @@ class RowsetMeta : public MetadataAdder { return _rowset_meta_pb.inverted_index_file_info(); } - void add_inverted_index_files_info(const std::vector& idx_file_info); - - void update_inverted_index_files_info(const std::vector& idx_file_info); + void add_inverted_index_files_info( + const std::vector& idx_file_info); int64_t get_metadata_size() const override; diff --git a/be/src/olap/rowset/rowset_writer.h b/be/src/olap/rowset/rowset_writer.h index 6861b8ab7e2ce6..96657e95fe6112 100644 --- a/be/src/olap/rowset/rowset_writer.h +++ b/be/src/olap/rowset/rowset_writer.h @@ -31,6 +31,7 @@ #include "olap/column_mapping.h" #include "olap/rowset/rowset.h" #include "olap/rowset/rowset_writer_context.h" +#include "olap/rowset/segment_v2/inverted_index_file_writer.h" #include "olap/tablet_fwd.h" #include "olap/tablet_schema.h" #include "vec/core/block.h" @@ -95,6 +96,24 @@ class RowsetWriter { return Status::NotSupported("RowsetWriter does not support create_file_writer"); } + virtual Status create_inverted_index_file_writer( + uint32_t segment_id, InvertedIndexFileWriterPtr* index_file_writer) { + std::string segment_prefix {InvertedIndexDescriptor::get_index_file_path_prefix( + _context.segment_path(segment_id))}; + // Create file writer for the inverted index format v2. + io::FileWriterPtr idx_file_v2_ptr; + if (_context.tablet_schema->get_inverted_index_storage_format() != + InvertedIndexStorageFormatPB::V1) { + RETURN_IF_ERROR( + create_file_writer(segment_id, idx_file_v2_ptr, FileType::INVERTED_INDEX_FILE)); + } + *index_file_writer = std::make_unique( + _context.fs(), segment_prefix, _context.rowset_id.to_string(), segment_id, + _context.tablet_schema->get_inverted_index_storage_format(), + std::move(idx_file_v2_ptr)); + return Status::OK(); + } + // explicit flush all buffered rows into segment file. // note that `add_row` could also trigger flush when certain conditions are met virtual Status flush() = 0; diff --git a/be/src/olap/rowset/segcompaction.cpp b/be/src/olap/rowset/segcompaction.cpp index e5d043d8a22486..f051511a80449d 100644 --- a/be/src/olap/rowset/segcompaction.cpp +++ b/be/src/olap/rowset/segcompaction.cpp @@ -318,6 +318,10 @@ Status SegcompactionWorker::_do_compact_segments(SegCompactionCandidatesSharedPt RETURN_IF_ERROR(_file_writer->close()); } + if (_idx_file_writer != nullptr) { + RETURN_IF_ERROR(_idx_file_writer->close()); + } + RETURN_IF_ERROR(_delete_original_segments(begin, end)); if (_rowid_conversion != nullptr) { convert_segment_delete_bitmap(ctx.mow_context->delete_bitmap, begin, end, diff --git a/be/src/olap/rowset/segcompaction.h b/be/src/olap/rowset/segcompaction.h index 54c5c3758c20c8..22dba48b8988c3 100644 --- a/be/src/olap/rowset/segcompaction.h +++ b/be/src/olap/rowset/segcompaction.h @@ -23,6 +23,7 @@ #include "common/status.h" #include "io/fs/file_reader_writer_fwd.h" #include "olap/merger.h" +#include "olap/rowset/segment_v2/inverted_index_file_writer.h" #include "olap/simple_rowid_conversion.h" #include "olap/tablet.h" #include "segment_v2/segment.h" @@ -41,7 +42,6 @@ struct OlapReaderStatistics; using SegCompactionCandidates = std::vector; using SegCompactionCandidatesSharedPtr = std::shared_ptr; - class BetaRowsetWriter; class SegcompactionWorker { @@ -70,6 +70,8 @@ class SegcompactionWorker { io::FileWriterPtr& get_file_writer() { return _file_writer; } + InvertedIndexFileWriterPtr& get_idx_file_writer() { return _idx_file_writer; } + // set the cancel flag, tasks already started will not be cancelled. bool cancel(); @@ -96,6 +98,7 @@ class SegcompactionWorker { // Currently cloud storage engine doesn't need segcompaction BetaRowsetWriter* _writer = nullptr; io::FileWriterPtr _file_writer; + InvertedIndexFileWriterPtr _idx_file_writer; // for unique key mow table std::unique_ptr _rowid_conversion = nullptr; diff --git a/be/src/olap/rowset/segment_creator.cpp b/be/src/olap/rowset/segment_creator.cpp index 1afd3215db42f6..19904490d34e81 100644 --- a/be/src/olap/rowset/segment_creator.cpp +++ b/be/src/olap/rowset/segment_creator.cpp @@ -53,8 +53,8 @@ namespace doris { using namespace ErrorCode; SegmentFlusher::SegmentFlusher(RowsetWriterContext& context, SegmentFileCollection& seg_files, - InvertedIndexFilesInfo& idx_files_info) - : _context(context), _seg_files(seg_files), _idx_files_info(idx_files_info) {} + InvertedIndexFileCollection& idx_files) + : _context(context), _seg_files(seg_files), _idx_files(idx_files) {} SegmentFlusher::~SegmentFlusher() = default; @@ -140,13 +140,10 @@ Status SegmentFlusher::_create_segment_writer(std::unique_ptrcreate(segment_id, segment_file_writer)); - io::FileWriterPtr inverted_file_writer; - if (_context.tablet_schema->has_inverted_index() && - _context.tablet_schema->get_inverted_index_storage_format() >= - InvertedIndexStorageFormatPB::V2 && - _context.memtable_on_sink_support_index_v2) { - RETURN_IF_ERROR(_context.file_writer_creator->create(segment_id, inverted_file_writer, - FileType::INVERTED_INDEX_FILE)); + InvertedIndexFileWriterPtr inverted_index_file_writer; + if (_context.tablet_schema->has_inverted_index()) { + RETURN_IF_ERROR( + _context.file_writer_creator->create(segment_id, &inverted_index_file_writer)); } segment_v2::SegmentWriterOptions writer_options; @@ -161,8 +158,11 @@ Status SegmentFlusher::_create_segment_writer(std::unique_ptr( segment_file_writer.get(), segment_id, _context.tablet_schema, _context.tablet, - _context.data_dir, writer_options, std::move(inverted_file_writer)); + _context.data_dir, writer_options, inverted_index_file_writer.get()); RETURN_IF_ERROR(_seg_files.add(segment_id, std::move(segment_file_writer))); + if (_context.tablet_schema->has_inverted_index()) { + RETURN_IF_ERROR(_idx_files.add(segment_id, std::move(inverted_index_file_writer))); + } auto s = writer->init(); if (!s.ok()) { LOG(WARNING) << "failed to init segment writer: " << s.to_string(); @@ -178,13 +178,10 @@ Status SegmentFlusher::_create_segment_writer( io::FileWriterPtr segment_file_writer; RETURN_IF_ERROR(_context.file_writer_creator->create(segment_id, segment_file_writer)); - io::FileWriterPtr inverted_file_writer; - if (_context.tablet_schema->has_inverted_index() && - _context.tablet_schema->get_inverted_index_storage_format() >= - InvertedIndexStorageFormatPB::V2 && - _context.memtable_on_sink_support_index_v2) { - RETURN_IF_ERROR(_context.file_writer_creator->create(segment_id, inverted_file_writer, - FileType::INVERTED_INDEX_FILE)); + InvertedIndexFileWriterPtr inverted_index_file_writer; + if (_context.tablet_schema->has_inverted_index()) { + RETURN_IF_ERROR( + _context.file_writer_creator->create(segment_id, &inverted_index_file_writer)); } segment_v2::VerticalSegmentWriterOptions writer_options; @@ -198,8 +195,11 @@ Status SegmentFlusher::_create_segment_writer( writer = std::make_unique( segment_file_writer.get(), segment_id, _context.tablet_schema, _context.tablet, - _context.data_dir, writer_options, std::move(inverted_file_writer)); + _context.data_dir, writer_options, inverted_index_file_writer.get()); RETURN_IF_ERROR(_seg_files.add(segment_id, std::move(segment_file_writer))); + if (_context.tablet_schema->has_inverted_index()) { + RETURN_IF_ERROR(_idx_files.add(segment_id, std::move(inverted_index_file_writer))); + } auto s = writer->init(); if (!s.ok()) { LOG(WARNING) << "failed to init segment writer: " << s.to_string(); @@ -231,6 +231,13 @@ Status SegmentFlusher::_flush_segment_writer( if (!s.ok()) { return Status::Error(s.code(), "failed to finalize segment: {}", s.to_string()); } + + int64_t inverted_index_file_size = 0; + s = writer->close_inverted_index(&inverted_index_file_size); + if (!s.ok()) { + return Status::Error(s.code(), "failed to finalize segment: {}", s.to_string()); + } + VLOG_DEBUG << "tablet_id:" << _context.tablet_id << " flushing filename: " << writer->data_dir_path() << " rowset_id:" << _context.rowset_id; @@ -245,17 +252,16 @@ Status SegmentFlusher::_flush_segment_writer( uint32_t segment_id = writer->segment_id(); SegmentStatistics segstat; segstat.row_num = row_num; - segstat.data_size = segment_size + writer->get_inverted_index_total_size(); - segstat.index_size = index_size + writer->get_inverted_index_total_size(); + segstat.data_size = segment_size; + segstat.index_size = inverted_index_file_size; segstat.key_bounds = key_bounds; - _idx_files_info.add_file_info(segment_id, writer->get_inverted_index_file_info()); writer.reset(); RETURN_IF_ERROR(_context.segment_collector->add(segment_id, segstat, flush_schema)); if (flush_size) { - *flush_size = segment_size + index_size; + *flush_size = segment_size + inverted_index_file_size; } return Status::OK(); } @@ -277,6 +283,13 @@ Status SegmentFlusher::_flush_segment_writer(std::unique_ptrclose_inverted_index(&inverted_index_file_size); + if (!s.ok()) { + return Status::Error(s.code(), "failed to finalize segment: {}", s.to_string()); + } + VLOG_DEBUG << "tablet_id:" << _context.tablet_id << " flushing rowset_dir: " << _context.tablet_path << " rowset_id:" << _context.rowset_id; @@ -291,17 +304,16 @@ Status SegmentFlusher::_flush_segment_writer(std::unique_ptrget_segment_id(); SegmentStatistics segstat; segstat.row_num = row_num; - segstat.data_size = segment_size + writer->get_inverted_index_total_size(); - segstat.index_size = index_size + writer->get_inverted_index_total_size(); + segstat.data_size = segment_size; + segstat.index_size = inverted_index_file_size; segstat.key_bounds = key_bounds; - _idx_files_info.add_file_info(segment_id, writer->get_inverted_index_file_info()); writer.reset(); RETURN_IF_ERROR(_context.segment_collector->add(segment_id, segstat, flush_schema)); if (flush_size) { - *flush_size = segment_size + index_size; + *flush_size = segment_size + inverted_index_file_size; } return Status::OK(); } @@ -330,8 +342,8 @@ int64_t SegmentFlusher::Writer::max_row_to_add(size_t row_avg_size_in_bytes) { } SegmentCreator::SegmentCreator(RowsetWriterContext& context, SegmentFileCollection& seg_files, - InvertedIndexFilesInfo& idx_files_info) - : _segment_flusher(context, seg_files, idx_files_info) {} + InvertedIndexFileCollection& idx_files) + : _segment_flusher(context, seg_files, idx_files) {} Status SegmentCreator::add_block(const vectorized::Block* block) { if (block->rows() == 0) { diff --git a/be/src/olap/rowset/segment_creator.h b/be/src/olap/rowset/segment_creator.h index c862fce87a43bd..f8afd5798927d4 100644 --- a/be/src/olap/rowset/segment_creator.h +++ b/be/src/olap/rowset/segment_creator.h @@ -29,6 +29,7 @@ #include "io/fs/file_reader_writer_fwd.h" #include "olap/olap_common.h" #include "olap/rowset/rowset_writer_context.h" +#include "olap/rowset/segment_v2/inverted_index_file_writer.h" #include "olap/tablet_fwd.h" #include "util/spinlock.h" #include "vec/core/block.h" @@ -46,7 +47,7 @@ class VerticalSegmentWriter; struct SegmentStatistics; class BetaRowsetWriter; class SegmentFileCollection; -class InvertedIndexFilesInfo; +class InvertedIndexFileCollection; class FileWriterCreator { public: @@ -54,9 +55,12 @@ class FileWriterCreator { virtual Status create(uint32_t segment_id, io::FileWriterPtr& file_writer, FileType file_type = FileType::SEGMENT_FILE) = 0; + + virtual Status create(uint32_t segment_id, InvertedIndexFileWriterPtr* file_writer) = 0; }; template + requires std::is_base_of_v class FileWriterCreatorT : public FileWriterCreator { public: explicit FileWriterCreatorT(T* t) : _t(t) {} @@ -66,6 +70,10 @@ class FileWriterCreatorT : public FileWriterCreator { return _t->create_file_writer(segment_id, file_writer, file_type); } + Status create(uint32_t segment_id, InvertedIndexFileWriterPtr* file_writer) override { + return _t->create_inverted_index_file_writer(segment_id, file_writer); + } + private: T* _t = nullptr; }; @@ -79,6 +87,7 @@ class SegmentCollector { }; template + requires std::is_base_of_v class SegmentCollectorT : public SegmentCollector { public: explicit SegmentCollectorT(T* t) : _t(t) {} @@ -95,7 +104,7 @@ class SegmentCollectorT : public SegmentCollector { class SegmentFlusher { public: SegmentFlusher(RowsetWriterContext& context, SegmentFileCollection& seg_files, - InvertedIndexFilesInfo& idx_files_info); + InvertedIndexFileCollection& idx_files); ~SegmentFlusher(); @@ -164,7 +173,7 @@ class SegmentFlusher { private: RowsetWriterContext& _context; SegmentFileCollection& _seg_files; - InvertedIndexFilesInfo& _idx_files_info; + InvertedIndexFileCollection& _idx_files; // written rows by add_block/add_row std::atomic _num_rows_written = 0; @@ -177,7 +186,7 @@ class SegmentFlusher { class SegmentCreator { public: SegmentCreator(RowsetWriterContext& context, SegmentFileCollection& seg_files, - InvertedIndexFilesInfo& idx_files_info); + InvertedIndexFileCollection& idx_files); ~SegmentCreator() = default; diff --git a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp index d11b9fa54d0421..835bf51c10d667 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp @@ -110,6 +110,8 @@ int64_t InvertedIndexFileWriter::headerLength() { } Status InvertedIndexFileWriter::close() { + DCHECK(!_closed) << debug_string(); + _closed = true; if (_indices_dirs.empty()) { return Status::OK(); } @@ -358,14 +360,10 @@ int64_t InvertedIndexFileWriter::write_v2() { out_dir->set_file_writer_opts(_opts); std::unique_ptr compound_file_output; - // idx v2 writer != nullptr means memtable on sink node now - if (_idx_v2_writer != nullptr) { - compound_file_output = std::unique_ptr( - out_dir->createOutputV2(_idx_v2_writer.get())); - } else { - compound_file_output = std::unique_ptr( - out_dir->createOutput(index_path.filename().c_str())); - } + + DCHECK(_idx_v2_writer != nullptr) << "inverted index file writer v2 is nullptr"; + compound_file_output = std::unique_ptr( + out_dir->createOutputV2(_idx_v2_writer.get())); // Write the version number compound_file_output->writeInt(InvertedIndexStorageFormatPB::V2); diff --git a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.h b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.h index 2aceb671d809a7..ccd6953cdd7abd 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.h +++ b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.h @@ -38,6 +38,9 @@ class DorisFSDirectory; using InvertedIndexDirectoryMap = std::map, std::unique_ptr>; +class InvertedIndexFileWriter; +using InvertedIndexFileWriterPtr = std::unique_ptr; + class FileInfo { public: std::string filename; @@ -65,8 +68,14 @@ class InvertedIndexFileWriter { int64_t write_v1(); Status close(); int64_t headerLength(); - InvertedIndexFileInfo get_index_file_info() const { return _file_info; } - int64_t get_index_file_total_size() const { return _total_file_size; } + const InvertedIndexFileInfo* get_index_file_info() const { + DCHECK(_closed) << debug_string(); + return &_file_info; + } + int64_t get_index_file_total_size() const { + DCHECK(_closed) << debug_string(); + return _total_file_size; + } const io::FileSystemSPtr& get_fs() const { return _fs; } void sort_files(std::vector& file_infos); void copyFile(const char* fileName, lucene::store::Directory* dir, @@ -75,6 +84,20 @@ class InvertedIndexFileWriter { void set_file_writer_opts(const io::FileWriterOptions& opts) { _opts = opts; } + std::string debug_string() const { + std::stringstream indices_dirs; + for (const auto& [index, dir] : _indices_dirs) { + indices_dirs << "index id is: " << index.first << " , index suffix is: " << index.second + << " , index dir is: " << dir->toString(); + } + return fmt::format( + "inverted index file writer debug string: index storage format is: {}, index path " + "prefix is: {}, rowset id is: {}, seg id is: {}, closed is: {}, total file size " + "is: {}, index dirs is: {}", + _storage_format, _index_path_prefix, _rowset_id, _seg_id, _closed, _total_file_size, + indices_dirs.str()); + } + private: InvertedIndexDirectoryMap _indices_dirs; const io::FileSystemSPtr _fs; @@ -82,14 +105,18 @@ class InvertedIndexFileWriter { std::string _rowset_id; int64_t _seg_id; InvertedIndexStorageFormatPB _storage_format; - // v1: all file size - // v2: file size - int64_t _total_file_size = 0; + // write to disk or stream - io::FileWriterPtr _idx_v2_writer; + io::FileWriterPtr _idx_v2_writer = nullptr; io::FileWriterOptions _opts; + // v1: all file size + // v2: file size + int64_t _total_file_size = 0; InvertedIndexFileInfo _file_info; + + // only once + bool _closed = false; }; } // namespace segment_v2 } // namespace doris diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index c21021eb9d26cd..426c9332146e7b 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -85,13 +85,14 @@ inline std::string segment_mem_tracker_name(uint32_t segment_id) { SegmentWriter::SegmentWriter(io::FileWriter* file_writer, uint32_t segment_id, TabletSchemaSPtr tablet_schema, BaseTabletSPtr tablet, DataDir* data_dir, const SegmentWriterOptions& opts, - io::FileWriterPtr inverted_file_writer) + InvertedIndexFileWriter* inverted_file_writer) : _segment_id(segment_id), _tablet_schema(std::move(tablet_schema)), _tablet(std::move(tablet)), _data_dir(data_dir), _opts(opts), _file_writer(file_writer), + _inverted_index_file_writer(inverted_file_writer), _mem_tracker(std::make_unique(segment_mem_tracker_name(segment_id))), _mow_context(std::move(opts.mow_ctx)) { CHECK_NOTNULL(file_writer); @@ -132,17 +133,6 @@ SegmentWriter::SegmentWriter(io::FileWriter* file_writer, uint32_t segment_id, } } } - if (_tablet_schema->has_inverted_index()) { - _inverted_index_file_writer = std::make_unique( - _opts.rowset_ctx->fs(), - std::string {InvertedIndexDescriptor::get_index_file_path_prefix( - file_writer->path().c_str())}, - _opts.rowset_ctx->rowset_id.to_string(), segment_id, - _tablet_schema->get_inverted_index_storage_format(), - std::move(inverted_file_writer)); - _inverted_index_file_writer->set_file_writer_opts( - _opts.rowset_ctx->get_file_writer_options()); - } } SegmentWriter::~SegmentWriter() { @@ -223,7 +213,7 @@ Status SegmentWriter::_create_column_writer(uint32_t cid, const TabletColumn& co opts.need_bloom_filter = false; opts.need_bitmap_index = false; } - opts.inverted_index_file_writer = _inverted_index_file_writer.get(); + opts.inverted_index_file_writer = _inverted_index_file_writer; for (const auto* index : opts.indexes) { if (!skip_inverted_index && index->index_type() == IndexType::INVERTED) { opts.inverted_index = index; @@ -1347,10 +1337,6 @@ Status SegmentWriter::finalize_footer(uint64_t* segment_file_size) { if (*segment_file_size == 0) { return Status::Corruption("Bad segment, file size = 0"); } - if (_inverted_index_file_writer != nullptr) { - RETURN_IF_ERROR(_inverted_index_file_writer->close()); - _inverted_index_file_info = _inverted_index_file_writer->get_index_file_info(); - } return Status::OK(); } @@ -1591,13 +1577,6 @@ Status SegmentWriter::_generate_short_key_index( return Status::OK(); } -int64_t SegmentWriter::get_inverted_index_total_size() { - if (_inverted_index_file_writer != nullptr) { - return _inverted_index_file_writer->get_index_file_total_size(); - } - return 0; -} - inline bool SegmentWriter::_is_mow() { return _tablet_schema->keys_type() == UNIQUE_KEYS && _opts.enable_unique_key_merge_on_write; } diff --git a/be/src/olap/rowset/segment_v2/segment_writer.h b/be/src/olap/rowset/segment_v2/segment_writer.h index 3e0ae13fa272b3..a623b527c9150b 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.h +++ b/be/src/olap/rowset/segment_v2/segment_writer.h @@ -34,6 +34,7 @@ #include "gutil/strings/substitute.h" #include "olap/olap_define.h" #include "olap/rowset/segment_v2/column_writer.h" +#include "olap/rowset/segment_v2/inverted_index_file_writer.h" #include "olap/tablet.h" #include "olap/tablet_schema.h" #include "util/faststring.h" @@ -61,7 +62,6 @@ class FileWriter; } // namespace io namespace segment_v2 { -class InvertedIndexFileWriter; extern const char* k_segment_magic; extern const uint32_t k_segment_magic_length; @@ -84,7 +84,7 @@ class SegmentWriter { explicit SegmentWriter(io::FileWriter* file_writer, uint32_t segment_id, TabletSchemaSPtr tablet_schema, BaseTabletSPtr tablet, DataDir* data_dir, const SegmentWriterOptions& opts, - io::FileWriterPtr inverted_file_writer = nullptr); + InvertedIndexFileWriter* inverted_file_writer); ~SegmentWriter(); Status init(); @@ -133,9 +133,6 @@ class SegmentWriter { uint64_t estimate_segment_size(); - InvertedIndexFileInfo get_inverted_index_file_info() const { return _inverted_index_file_info; } - int64_t get_inverted_index_total_size(); - uint32_t num_rows_written() const { return _num_rows_written; } // for partial update @@ -167,6 +164,17 @@ class SegmentWriter { void set_mow_context(std::shared_ptr mow_context); + Status close_inverted_index(int64_t* inverted_index_file_size) { + // no inverted index + if (_inverted_index_file_writer == nullptr) { + *inverted_index_file_size = 0; + return Status::OK(); + } + RETURN_IF_ERROR(_inverted_index_file_writer->close()); + *inverted_index_file_size = _inverted_index_file_writer->get_index_file_total_size(); + return Status::OK(); + } + private: DISALLOW_COPY_AND_ASSIGN(SegmentWriter); Status _create_column_writer(uint32_t cid, const TabletColumn& column, @@ -222,13 +230,15 @@ class SegmentWriter { // Not owned. owned by RowsetWriter or SegmentFlusher io::FileWriter* _file_writer = nullptr; - std::unique_ptr _inverted_index_file_writer; + // Not owned. owned by RowsetWriter or SegmentFlusher + InvertedIndexFileWriter* _inverted_index_file_writer = nullptr; + SegmentFooterPB _footer; // for mow tables with cluster key, the sort key is the cluster keys not unique keys // for other tables, the sort key is the keys size_t _num_sort_key_columns; size_t _num_short_key_columns; - InvertedIndexFileInfo _inverted_index_file_info; + std::unique_ptr _short_key_index_builder; std::unique_ptr _primary_key_index_builder; std::vector> _column_writers; diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp index 8f7f614d7b332b..d0383d4b976bd9 100644 --- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp @@ -90,13 +90,14 @@ VerticalSegmentWriter::VerticalSegmentWriter(io::FileWriter* file_writer, uint32 TabletSchemaSPtr tablet_schema, BaseTabletSPtr tablet, DataDir* data_dir, const VerticalSegmentWriterOptions& opts, - io::FileWriterPtr inverted_file_writer) + InvertedIndexFileWriter* inverted_file_writer) : _segment_id(segment_id), _tablet_schema(std::move(tablet_schema)), _tablet(std::move(tablet)), _data_dir(data_dir), _opts(opts), _file_writer(file_writer), + _inverted_index_file_writer(inverted_file_writer), _mem_tracker(std::make_unique( vertical_segment_writer_mem_tracker_name(segment_id))), _mow_context(std::move(opts.mow_ctx)) { @@ -138,17 +139,6 @@ VerticalSegmentWriter::VerticalSegmentWriter(io::FileWriter* file_writer, uint32 } } } - if (_tablet_schema->has_inverted_index()) { - _inverted_index_file_writer = std::make_unique( - _opts.rowset_ctx->fs(), - std::string {InvertedIndexDescriptor::get_index_file_path_prefix( - _opts.rowset_ctx->segment_path(segment_id))}, - _opts.rowset_ctx->rowset_id.to_string(), segment_id, - _tablet_schema->get_inverted_index_storage_format(), - std::move(inverted_file_writer)); - _inverted_index_file_writer->set_file_writer_opts( - _opts.rowset_ctx->get_file_writer_options()); - } } VerticalSegmentWriter::~VerticalSegmentWriter() { @@ -225,7 +215,7 @@ Status VerticalSegmentWriter::_create_column_writer(uint32_t cid, const TabletCo break; } } - opts.inverted_index_file_writer = _inverted_index_file_writer.get(); + opts.inverted_index_file_writer = _inverted_index_file_writer; #define CHECK_FIELD_TYPE(TYPE, type_name) \ if (column.type() == FieldType::OLAP_FIELD_TYPE_##TYPE) { \ @@ -1385,9 +1375,6 @@ Status VerticalSegmentWriter::finalize_columns_index(uint64_t* index_size) { *index_size = _file_writer->bytes_appended() - index_start; } - if (_inverted_index_file_writer != nullptr) { - _inverted_index_file_info = _inverted_index_file_writer->get_index_file_info(); - } // reset all column writers and data_conveter clear(); @@ -1462,9 +1449,6 @@ Status VerticalSegmentWriter::_write_inverted_index() { for (auto& column_writer : _column_writers) { RETURN_IF_ERROR(column_writer->write_inverted_index()); } - if (_inverted_index_file_writer != nullptr) { - RETURN_IF_ERROR(_inverted_index_file_writer->close()); - } return Status::OK(); } @@ -1551,13 +1535,6 @@ void VerticalSegmentWriter::_set_max_key(const Slice& key) { _max_key.append(key.get_data(), key.get_size()); } -int64_t VerticalSegmentWriter::get_inverted_index_total_size() { - if (_inverted_index_file_writer != nullptr) { - return _inverted_index_file_writer->get_index_file_total_size(); - } - return 0; -} - inline bool VerticalSegmentWriter::_is_mow() { return _tablet_schema->keys_type() == UNIQUE_KEYS && _opts.enable_unique_key_merge_on_write; } diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.h b/be/src/olap/rowset/segment_v2/vertical_segment_writer.h index 881a6cee5b41e1..951e9c2e2838c3 100644 --- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.h +++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.h @@ -34,6 +34,7 @@ #include "gutil/strings/substitute.h" #include "olap/olap_define.h" #include "olap/rowset/segment_v2/column_writer.h" +#include "olap/rowset/segment_v2/inverted_index_file_writer.h" #include "olap/tablet.h" #include "olap/tablet_schema.h" #include "util/faststring.h" @@ -82,7 +83,7 @@ class VerticalSegmentWriter { explicit VerticalSegmentWriter(io::FileWriter* file_writer, uint32_t segment_id, TabletSchemaSPtr tablet_schema, BaseTabletSPtr tablet, DataDir* data_dir, const VerticalSegmentWriterOptions& opts, - io::FileWriterPtr inverted_file_writer = nullptr); + InvertedIndexFileWriter* inverted_file_writer); ~VerticalSegmentWriter(); VerticalSegmentWriter(const VerticalSegmentWriter&) = delete; @@ -99,9 +100,7 @@ class VerticalSegmentWriter { [[nodiscard]] std::string data_dir_path() const { return _data_dir == nullptr ? "" : _data_dir->path(); } - [[nodiscard]] InvertedIndexFileInfo get_inverted_index_file_info() const { - return _inverted_index_file_info; - } + [[nodiscard]] uint32_t num_rows_written() const { return _num_rows_written; } // for partial update @@ -122,10 +121,19 @@ class VerticalSegmentWriter { TabletSchemaSPtr flush_schema() const { return _flush_schema; }; - int64_t get_inverted_index_total_size(); - void clear(); + Status close_inverted_index(int64_t* inverted_index_file_size) { + // no inverted index + if (_inverted_index_file_writer == nullptr) { + *inverted_index_file_size = 0; + return Status::OK(); + } + RETURN_IF_ERROR(_inverted_index_file_writer->close()); + *inverted_index_file_size = _inverted_index_file_writer->get_index_file_total_size(); + return Status::OK(); + } + private: void _init_column_meta(ColumnMetaPB* meta, uint32_t column_id, const TabletColumn& column); Status _create_column_writer(uint32_t cid, const TabletColumn& column, @@ -213,14 +221,15 @@ class VerticalSegmentWriter { // Not owned. owned by RowsetWriter io::FileWriter* _file_writer = nullptr; - std::unique_ptr _inverted_index_file_writer; + // Not owned. owned by RowsetWriter or SegmentFlusher + InvertedIndexFileWriter* _inverted_index_file_writer = nullptr; SegmentFooterPB _footer; // for mow tables with cluster key, the sort key is the cluster keys not unique keys // for other tables, the sort key is the keys size_t _num_sort_key_columns; size_t _num_short_key_columns; - InvertedIndexFileInfo _inverted_index_file_info; + std::unique_ptr _short_key_index_builder; std::unique_ptr _primary_key_index_builder; std::vector> _column_writers; diff --git a/be/src/olap/rowset/vertical_beta_rowset_writer.cpp b/be/src/olap/rowset/vertical_beta_rowset_writer.cpp index ced0fb880c41fb..fb8f6622685856 100644 --- a/be/src/olap/rowset/vertical_beta_rowset_writer.cpp +++ b/be/src/olap/rowset/vertical_beta_rowset_writer.cpp @@ -164,26 +164,28 @@ Status VerticalBetaRowsetWriter::_create_segment_writer( int seg_id = this->_num_segment.fetch_add(1, std::memory_order_relaxed); - io::FileWriterPtr file_writer; - io::FileWriterOptions opts = this->_context.get_file_writer_options(); - - auto path = context.segment_path(seg_id); - auto& fs = context.fs_ref(); - Status st = fs.create_file(path, &file_writer, &opts); - if (!st.ok()) { - LOG(WARNING) << "failed to create writable file. path=" << path << ", err: " << st; - return st; + io::FileWriterPtr segment_file_writer; + RETURN_IF_ERROR(BaseBetaRowsetWriter::create_file_writer(seg_id, segment_file_writer)); + DCHECK(segment_file_writer != nullptr); + + InvertedIndexFileWriterPtr inverted_index_file_writer; + if (context.tablet_schema->has_inverted_index()) { + RETURN_IF_ERROR(RowsetWriter::create_inverted_index_file_writer( + seg_id, &inverted_index_file_writer)); } - DCHECK(file_writer != nullptr); segment_v2::SegmentWriterOptions writer_options; writer_options.enable_unique_key_merge_on_write = context.enable_unique_key_merge_on_write; writer_options.rowset_ctx = &context; writer_options.max_rows_per_segment = context.max_rows_per_segment; - *writer = std::make_unique(file_writer.get(), seg_id, - context.tablet_schema, context.tablet, - context.data_dir, writer_options); - RETURN_IF_ERROR(this->_seg_files.add(seg_id, std::move(file_writer))); + *writer = std::make_unique( + segment_file_writer.get(), seg_id, context.tablet_schema, context.tablet, + context.data_dir, writer_options, inverted_index_file_writer.get()); + + RETURN_IF_ERROR(this->_seg_files.add(seg_id, std::move(segment_file_writer))); + if (context.tablet_schema->has_inverted_index()) { + RETURN_IF_ERROR(this->_idx_files.add(seg_id, std::move(inverted_index_file_writer))); + } auto s = (*writer)->init(column_ids, is_key); if (!s.ok()) { @@ -205,10 +207,7 @@ Status VerticalBetaRowsetWriter::final_flush() { LOG(WARNING) << "Fail to finalize segment footer, " << st; return st; } - this->_total_data_size += segment_size + segment_writer->get_inverted_index_total_size(); - this->_total_index_size += segment_writer->get_inverted_index_total_size(); - this->_idx_files_info.add_file_info(segment_writer->get_segment_id(), - segment_writer->get_inverted_index_file_info()); + this->_total_data_size += segment_size; segment_writer.reset(); } return Status::OK(); @@ -217,6 +216,7 @@ Status VerticalBetaRowsetWriter::final_flush() { template requires std::is_base_of_v Status VerticalBetaRowsetWriter::_close_file_writers() { + RETURN_IF_ERROR(BaseBetaRowsetWriter::_close_inverted_index_file_writers()); return this->_seg_files.close(); } diff --git a/be/src/olap/tablet_schema.cpp b/be/src/olap/tablet_schema.cpp index c88a23a0c360cf..46bd111c6c1cc2 100644 --- a/be/src/olap/tablet_schema.cpp +++ b/be/src/olap/tablet_schema.cpp @@ -1363,6 +1363,9 @@ void TabletSchema::update_tablet_columns(const TabletSchema& tablet_schema, } bool TabletSchema::has_inverted_index(const TabletColumn& col) const { + // if (col.is_variant_type()) { + // return false; + // } // TODO use more efficient impl int32_t col_unique_id = col.is_extracted_column() ? col.parent_unique_id() : col.unique_id(); const std::string& suffix_path = diff --git a/be/src/olap/tablet_schema.h b/be/src/olap/tablet_schema.h index ebe2c63c7f30d2..8c89bad17d3401 100644 --- a/be/src/olap/tablet_schema.h +++ b/be/src/olap/tablet_schema.h @@ -377,9 +377,14 @@ class TabletSchema : public MetadataAdder { long row_store_page_size() const { return _row_store_page_size; } const std::vector& indexes() const { return _indexes; } + // when finding the index file, we should ignore variant column bool has_inverted_index() const { for (const auto& index : _indexes) { if (index.index_type() == IndexType::INVERTED) { + // if (const auto& column = column_by_uid(index.col_unique_ids()[0]); + // column.is_variant_type() && check_variant) { + // continue; + // } return true; } } diff --git a/be/src/olap/task/index_builder.cpp b/be/src/olap/task/index_builder.cpp index 38a52d1d2118aa..5d37bc1f1fddee 100644 --- a/be/src/olap/task/index_builder.cpp +++ b/be/src/olap/task/index_builder.cpp @@ -292,10 +292,20 @@ Status IndexBuilder::handle_single_rowset(RowsetMetaSharedPtr output_rowset_meta _tablet->tablet_path(), output_rowset_meta->rowset_id().to_string(), seg_ptr->id()))}; + std::string index_path = + InvertedIndexDescriptor::get_index_file_path_v2(index_path_prefix); + io::FileWriterPtr file_writer; + Status st = fs->create_file(index_path, &file_writer); + if (!st.ok()) { + LOG(WARNING) << "failed to create writable file. path=" << index_path + << ", err: " << st; + return st; + } auto inverted_index_file_writer = std::make_unique( fs, std::move(index_path_prefix), output_rowset_meta->rowset_id().to_string(), seg_ptr->id(), - output_rowset_schema->get_inverted_index_storage_format()); + output_rowset_schema->get_inverted_index_storage_format(), + std::move(file_writer)); RETURN_IF_ERROR(inverted_index_file_writer->initialize(dirs)); // create inverted index writer for (auto& index_meta : _dropped_inverted_indexes) { @@ -346,10 +356,20 @@ Status IndexBuilder::handle_single_rowset(RowsetMetaSharedPtr output_rowset_meta << seg_ptr->id() << " cannot be found"; continue; } + std::string index_path = + InvertedIndexDescriptor::get_index_file_path_v2(index_path_prefix); + io::FileWriterPtr file_writer; + Status st = fs->create_file(index_path, &file_writer); + if (!st.ok()) { + LOG(WARNING) << "failed to create writable file. path=" << index_path + << ", err: " << st; + return st; + } auto dirs = DORIS_TRY(idx_file_reader_iter->second->get_all_directories()); inverted_index_file_writer = std::make_unique( fs, index_path_prefix, output_rowset_meta->rowset_id().to_string(), - seg_ptr->id(), output_rowset_schema->get_inverted_index_storage_format()); + seg_ptr->id(), output_rowset_schema->get_inverted_index_storage_format(), + std::move(file_writer)); RETURN_IF_ERROR(inverted_index_file_writer->initialize(dirs)); } else { inverted_index_file_writer = std::make_unique( diff --git a/be/src/runtime/load_stream_writer.cpp b/be/src/runtime/load_stream_writer.cpp index 37243fab14bdb3..733bb3cd5594ef 100644 --- a/be/src/runtime/load_stream_writer.cpp +++ b/be/src/runtime/load_stream_writer.cpp @@ -201,11 +201,12 @@ Status LoadStreamWriter::add_segment(uint32_t segid, const SegmentStatistics& st } DBUG_EXECUTE_IF("LoadStreamWriter.add_segment.size_not_match", { segment_file_size++; }); - if (segment_file_size + inverted_file_size != stat.data_size) { + if (segment_file_size != stat.data_size || inverted_file_size != stat.index_size) { return Status::Corruption( "add_segment failed, segment stat {} does not match, file size={}, inverted file " - "size={}, stat.data_size={}, tablet id={}", - segid, segment_file_size, inverted_file_size, stat.data_size, _req.tablet_id); + "size={}, stat.data_size={}, stat.index_size={}, tablet id={}", + segid, segment_file_size, inverted_file_size, stat.data_size, stat.index_size, + _req.tablet_id); } return _rowset_writer->add_segment(segid, stat, flush_schema); diff --git a/be/test/olap/delete_bitmap_calculator_test.cpp b/be/test/olap/delete_bitmap_calculator_test.cpp index 7e527078613cd2..e3f9244234802b 100644 --- a/be/test/olap/delete_bitmap_calculator_test.cpp +++ b/be/test/olap/delete_bitmap_calculator_test.cpp @@ -103,7 +103,8 @@ class DeleteBitmapCalculatorTest : public testing::Test { io::FileWriterPtr file_writer; Status st = fs->create_file(path, &file_writer); EXPECT_TRUE(st.ok()); - SegmentWriter writer(file_writer.get(), segment_id, build_schema, nullptr, nullptr, opts); + SegmentWriter writer(file_writer.get(), segment_id, build_schema, nullptr, nullptr, opts, + nullptr); st = writer.init(); EXPECT_TRUE(st.ok());