Skip to content

Commit

Permalink
[Enhancement] Add some util in script for TDE debug; Fix memleak in w…
Browse files Browse the repository at this point in the history
…renbind

Signed-off-by: Binglin Chang <[email protected]>
  • Loading branch information
decster committed Oct 14, 2024
1 parent b550d07 commit a3b25d8
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 13 deletions.
23 changes: 23 additions & 0 deletions be/src/script/script.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,17 @@
#include "runtime/exec_env.h"
#include "runtime/mem_tracker.h"
#include "storage/del_vector.h"
#include "storage/lake/tablet.h"
#include "storage/lake/tablet_manager.h"
#include "storage/lake/tablet_metadata.h"
#include "storage/primary_key_dump.h"
#include "storage/storage_engine.h"
#include "storage/tablet.h"
#include "storage/tablet_manager.h"
#include "storage/tablet_meta_manager.h"
#include "storage/tablet_updates.h"
#include "util/stack_util.h"
#include "util/url_coding.h"
#include "wrenbind17/wrenbind17.hpp"

using namespace wrenbind17;
Expand Down Expand Up @@ -289,6 +293,23 @@ class StorageEngineRef {
return ptr;
}

static std::string get_lake_tablet_metadata_json(int64_t tablet_id, int64_t version) {
auto tablet_manager = ExecEnv::GetInstance()->lake_tablet_manager();
RETURN_IF(nullptr == tablet_manager, "");
auto meta_st = tablet_manager->get_tablet_metadata(tablet_id, version, false);
RETURN_IF(!meta_st.ok(), meta_st.status().to_string());
std::string json;
return proto_to_json(*meta_st.value());
}

static std::string decode_encryption_meta(const std::string& meta_base64) {
EncryptionMetaPB pb;
std::string meta_bytes;
RETURN_IF(!base64_decode(meta_base64, &meta_bytes), "bad base64 string");
RETURN_IF(!pb.ParseFromString(meta_bytes), "parse encryption meta failed");
return proto_to_json(pb);
}

static std::shared_ptr<TabletBasicInfo> get_tablet_info(int64_t tablet_id) {
std::vector<TabletBasicInfo> tablet_infos;
auto manager = StorageEngine::instance()->tablet_manager();
Expand Down Expand Up @@ -563,6 +584,8 @@ class StorageEngineRef {
REG_STATIC_METHOD(StorageEngineRef, get_tablet_info);
REG_STATIC_METHOD(StorageEngineRef, get_tablet_infos);
REG_STATIC_METHOD(StorageEngineRef, get_tablet_meta_json);
REG_STATIC_METHOD(StorageEngineRef, get_lake_tablet_metadata_json);
REG_STATIC_METHOD(StorageEngineRef, decode_encryption_meta);
REG_STATIC_METHOD(StorageEngineRef, reset_delvec);
REG_STATIC_METHOD(StorageEngineRef, get_tablet);
REG_STATIC_METHOD(StorageEngineRef, drop_tablet);
Expand Down
18 changes: 10 additions & 8 deletions be/src/storage/lake/rowset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -420,13 +420,14 @@ Status Rowset::load_segments(std::vector<SegmentPtr>* segments, SegmentReadOptio
int index = 0;

std::vector<std::future<std::pair<StatusOr<SegmentPtr>, std::string>>> segment_futures;
auto check_status = [&](StatusOr<SegmentPtr>& segment_or, const std::string& seg_name) -> Status {
auto check_status = [&](StatusOr<SegmentPtr>& segment_or, const std::string& seg_name, int seg_id) -> Status {
if (segment_or.ok()) {
segments->emplace_back(std::move(segment_or.value()));
} else if (segment_or.status().is_not_found() && ignore_lost_segment) {
LOG(WARNING) << "Ignored lost segment " << seg_name;
} else {
return segment_or.status();
return segment_or.status().clone_and_prepend(fmt::format(
"load_segments failed tablet:{} rowset:{} segid:{}", _tablet_id, metadata().id(), seg_id));
}
return Status::OK();
};
Expand Down Expand Up @@ -470,25 +471,26 @@ Status Rowset::load_segments(std::vector<SegmentPtr>* segments, SegmentReadOptio
<< ", try to load segment serially, seg_id: " << seg_id;
auto segment_or = _tablet_mgr->load_segment(segment_info, seg_id, &footer_size_hint, lake_io_opts,
lake_io_opts.fill_metadata_cache, _tablet_schema);
if (auto status = check_status(segment_or, seg_name); !status.ok()) {
if (auto status = check_status(segment_or, seg_name, seg_id); !status.ok()) {
return status;
}
}
seg_id++;
segment_futures.push_back(task->get_future());
} else {
auto segment_or = _tablet_mgr->load_segment(segment_info, seg_id++, &footer_size_hint, lake_io_opts,
auto segment_or = _tablet_mgr->load_segment(segment_info, seg_id, &footer_size_hint, lake_io_opts,
lake_io_opts.fill_metadata_cache, _tablet_schema);
if (auto status = check_status(segment_or, seg_name); !status.ok()) {
if (auto status = check_status(segment_or, seg_name, seg_id); !status.ok()) {
return status;
}
seg_id++;
}
}

for (auto& fut : segment_futures) {
auto result_pair = fut.get();
for (int i = 0; i < segment_futures.size(); i++) {
auto result_pair = segment_futures[i].get();
auto segment_or = result_pair.first;
if (auto status = check_status(segment_or, result_pair.second); !status.ok()) {
if (auto status = check_status(segment_or, result_pair.second, i); !status.ok()) {
return status;
}
}
Expand Down
7 changes: 5 additions & 2 deletions be/src/storage/rowset/rowset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,12 @@ Status Rowset::do_load() {
auto res = Segment::open(fs, seg_info, seg_id, _schema, &footer_size_hint,
rowset_meta()->partial_rowset_footer(seg_id));
if (!res.ok()) {
LOG(WARNING) << "Fail to open " << seg_path << ": " << res.status();
auto st = res.status().clone_and_prepend(fmt::format(
"Load rowset failed tablet:{} rowset:{} rssid:{} seg:{} path:{}", _rowset_meta->tablet_id(),
rowset_id().to_string(), _rowset_meta->get_rowset_seg_id(), seg_id, seg_path));
LOG(WARNING) << st.message();
_segments.clear();
return res.status();
return st;
}
_segments.push_back(std::move(res).value());
}
Expand Down
3 changes: 0 additions & 3 deletions be/src/thirdparty/wrenbind17/wrenbind17/vm.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,6 @@ class VM {
data->config.heapGrowthPercent = heapGrowth;
data->config.userData = data.get();
#if WREN_VERSION_NUMBER >= 4000 // >= 0.4.0
data->config.reallocateFn = [](void* memory, size_t newSize, void* userData) -> void* {
return std::realloc(memory, newSize);
};
data->config.loadModuleFn = [](WrenVM* vm, const char* name) -> WrenLoadModuleResult {
auto res = WrenLoadModuleResult();
auto& self = *reinterpret_cast<VM::Data*>(wrenGetUserData(vm));
Expand Down
9 changes: 9 additions & 0 deletions be/test/fs/key_cache_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,15 @@ TEST_F(KeyCacheTest, AddKey) {
key->set_id(2);
cache.add_key(key);
ASSERT_EQ(2, cache.size());
std::string result;
ASSERT_TRUE(execute_script("System.print(ExecEnv.key_cache_info())", result).ok());
ASSERT_TRUE(execute_script("System.print(StorageEngine.decode_encryption_meta("
"\"Ch4IARiztZ64BiAAKAE6EHp8EnQlAIgiy8dgbPRP53kKPAgCEAEYs7WeuAYgACgBMiyZegO5j9P16bHelpUAU"
"xEj1c5P4xWQsJSy6sc2yIKC0g/rRPqsGNumdy6WQgo0EAIgACgBMixDCSo3rP5l8oiZLcgtts8x7xJ+M4+/"
"INZvGPhCOA1m9zf2vpCRbjbVoOl2EQ==\"))",
result)
.ok());
ASSERT_TRUE(result.find("keyHierarchy") != result.npos);
}

static void wrap_unwrap_test(int num_level) {
Expand Down
3 changes: 3 additions & 0 deletions be/test/storage/lake/tablet_manager_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
// NOTE: intend to put the following header to the end of the include section
// so that our `gutil/dynamic_annotations.h` takes precedence of the absl's.
// NOLINTNEXTLINE
#include "script/script.h"
#include "service/staros_worker.h"

namespace starrocks {
Expand Down Expand Up @@ -87,6 +88,8 @@ TEST_F(LakeTabletManagerTest, tablet_meta_write_and_read) {
rowset_meta_pb->set_num_rows(5);
EXPECT_OK(_tablet_manager->put_tablet_metadata(metadata));
EXPECT_OK(_tablet_manager->tablet_metadata_exists(12345, 2));
string result;
ASSERT_TRUE(execute_script("System.print(StorageEngine.get_lake_tablet_metadata_json(12345,2))", result).ok());
auto res = _tablet_manager->get_tablet_metadata(12345, 2);
EXPECT_TRUE(res.ok());
EXPECT_EQ(res.value()->id(), 12345);
Expand Down
5 changes: 5 additions & 0 deletions be/test/storage/tablet_updates_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,11 @@ void TabletUpdatesTest::test_writeread(bool enable_persistent_index) {
ASSERT_TRUE(_tablet->rowset_commit(2, rs0).ok());
ASSERT_EQ(2, _tablet->updates()->max_version());

string o;
ASSERT_TRUE(execute_script(fmt::format("StorageEngine.reset_delvec({}, {}, 2)", _tablet->tablet_id(), 0), o).ok());
ASSERT_TRUE(execute_script("System.print(ExecEnv.grep_log_as_string(0,0,\"I\",\"tablet_manager\",1))", o).ok());
LOG(INFO) << "grep log: " << o;

auto rs1 = create_rowset(_tablet, keys);
ASSERT_TRUE(_tablet->rowset_commit(3, rs1).ok());
ASSERT_EQ(3, _tablet->updates()->max_version());
Expand Down

0 comments on commit a3b25d8

Please sign in to comment.