Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(metric engine): label mismatch in metric engine #3927

Merged
merged 6 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 47 additions & 26 deletions src/mito2/src/memtable/partition_tree/dict.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::metrics::MEMTABLE_DICT_BYTES;
/// Maximum keys in a [DictBlock].
const MAX_KEYS_PER_BLOCK: u16 = 256;

type PkIndexMap = BTreeMap<Vec<u8>, PkIndex>;
type PkIndexMap = BTreeMap<Vec<u8>, (PkIndex, Option<Vec<u8>>)>;
v0y4g3r marked this conversation as resolved.
Show resolved Hide resolved

/// Builder to build a key dictionary.
pub struct KeyDictBuilder {
Expand Down Expand Up @@ -66,10 +66,15 @@ impl KeyDictBuilder {
///
/// # Panics
/// Panics if the builder is full.
pub fn insert_key(&mut self, key: &[u8], metrics: &mut WriteMetrics) -> PkIndex {
pub fn insert_key(
&mut self,
full_primary_key: &[u8],
sparse_key: Option<&[u8]>,
metrics: &mut WriteMetrics,
) -> PkIndex {
assert!(!self.is_full());

if let Some(pk_index) = self.pk_to_index.get(key).copied() {
if let Some(pk_index) = self.pk_to_index.get(full_primary_key).map(|v| v.0) {
// Already in the builder.
return pk_index;
}
Expand All @@ -81,16 +86,22 @@ impl KeyDictBuilder {
}

// Safety: we have checked the buffer length.
let pk_index = self.key_buffer.push_key(key);
self.pk_to_index.insert(key.to_vec(), pk_index);
let pk_index = self.key_buffer.push_key(full_primary_key);
let (sparse_key, sparse_key_len) = if let Some(sparse_key) = sparse_key {
(Some(sparse_key.to_vec()), sparse_key.len())
} else {
(None, 0)
};
self.pk_to_index
.insert(full_primary_key.to_vec(), (pk_index, sparse_key));
self.num_keys += 1;

// Since we store the key twice so the bytes usage doubled.
metrics.key_bytes += key.len() * 2;
self.key_bytes_in_index += key.len();
metrics.key_bytes += full_primary_key.len() * 2 + sparse_key_len;
self.key_bytes_in_index += full_primary_key.len();

// Adds key size of index to the metrics.
MEMTABLE_DICT_BYTES.add(key.len() as i64);
MEMTABLE_DICT_BYTES.add((full_primary_key.len() + sparse_key_len) as i64);

pk_index
}
Expand All @@ -108,37 +119,47 @@ impl KeyDictBuilder {
}

/// Finishes the builder.
pub fn finish(&mut self, pk_to_index: &mut BTreeMap<Vec<u8>, PkIndex>) -> Option<KeyDict> {
pub fn finish(&mut self) -> Option<(KeyDict, BTreeMap<Vec<u8>, PkIndex>)> {
v0y4g3r marked this conversation as resolved.
Show resolved Hide resolved
if self.key_buffer.is_empty() {
return None;
}
let mut pk_to_index_map = BTreeMap::new();

// Finishes current dict block and resets the pk index.
let dict_block = self.key_buffer.finish(true);
self.dict_blocks.push(dict_block);
// Computes key position and then alter pk index.
let mut key_positions = vec![0; self.pk_to_index.len()];
for (i, pk_index) in self.pk_to_index.values_mut().enumerate() {

for (i, (full_pk, (pk_index, sparse_key))) in (std::mem::take(&mut self.pk_to_index))
.into_iter()
.enumerate()
{
// The position of the i-th key is the old pk index.
key_positions[i] = *pk_index;
// Overwrites the pk index.
*pk_index = i as PkIndex;
key_positions[i] = pk_index;
pk_to_index_map.insert(full_pk, i as PkIndex);
v0y4g3r marked this conversation as resolved.
Show resolved Hide resolved
if let Some(sparse_key) = sparse_key {
pk_to_index_map.insert(sparse_key, i as PkIndex);
}
}

self.num_keys = 0;
let key_bytes_in_index = self.key_bytes_in_index;
self.key_bytes_in_index = 0;
*pk_to_index = std::mem::take(&mut self.pk_to_index);

Some(KeyDict {
dict_blocks: std::mem::take(&mut self.dict_blocks),
key_positions,
key_bytes_in_index,
})
Some((
KeyDict {
dict_blocks: std::mem::take(&mut self.dict_blocks),
key_positions,
key_bytes_in_index,
},
pk_to_index_map,
))
}

/// Reads the builder.
pub fn read(&self) -> DictBuilderReader {
let sorted_pk_indices = self.pk_to_index.values().copied().collect();
let sorted_pk_indices = self.pk_to_index.values().map(|v| v.0).collect();
let block = self.key_buffer.finish_cloned();
let mut blocks = Vec::with_capacity(self.dict_blocks.len() + 1);
blocks.extend_from_slice(&self.dict_blocks);
Expand Down Expand Up @@ -394,7 +415,7 @@ mod tests {
let mut metrics = WriteMetrics::default();
for key in &keys {
assert!(!builder.is_full());
let pk_index = builder.insert_key(key, &mut metrics);
let pk_index = builder.insert_key(key, None, &mut metrics);
last_pk_index = Some(pk_index);
}
assert_eq!(num_keys - 1, last_pk_index.unwrap());
Expand Down Expand Up @@ -426,14 +447,14 @@ mod tests {
for i in 0..num_keys {
// Each key is 5 bytes.
let key = format!("{i:05}");
builder.insert_key(key.as_bytes(), &mut metrics);
builder.insert_key(key.as_bytes(), None, &mut metrics);
}
let key_bytes = num_keys as usize * 5;
assert_eq!(key_bytes * 2, metrics.key_bytes);
assert_eq!(key_bytes, builder.key_bytes_in_index);
assert_eq!(8850, builder.memory_size());

let dict = builder.finish(&mut BTreeMap::new()).unwrap();
let (dict, _) = builder.finish().unwrap();
assert_eq!(0, builder.key_bytes_in_index);
assert_eq!(key_bytes, dict.key_bytes_in_index);
assert!(dict.shared_memory_size() > key_bytes);
Expand All @@ -446,12 +467,12 @@ mod tests {
for i in 0..MAX_KEYS_PER_BLOCK * 2 {
let key = format!("{i:010}");
assert!(!builder.is_full());
builder.insert_key(key.as_bytes(), &mut metrics);
builder.insert_key(key.as_bytes(), None, &mut metrics);
}
assert!(builder.is_full());
builder.finish(&mut BTreeMap::new());
builder.finish();

assert!(!builder.is_full());
assert_eq!(0, builder.insert_key(b"a0", &mut metrics));
assert_eq!(0, builder.insert_key(b"a0", None, &mut metrics));
}
}
11 changes: 7 additions & 4 deletions src/mito2/src/memtable/partition_tree/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,15 +89,18 @@ impl Partition {
let sparse_key = primary_key.clone();
primary_key.clear();
row_codec.encode_to_vec(key_value.primary_keys(), primary_key)?;
let pk_id = inner
.shard_builder
.write_with_key(primary_key, &key_value, metrics);
let pk_id = inner.shard_builder.write_with_key(
primary_key,
Some(&sparse_key),
&key_value,
metrics,
);
inner.pk_to_pk_id.insert(sparse_key, pk_id);
evenyag marked this conversation as resolved.
Show resolved Hide resolved
} else {
// `primary_key` is already the full primary key.
let pk_id = inner
.shard_builder
.write_with_key(primary_key, &key_value, metrics);
.write_with_key(primary_key, None, &key_value, metrics);
inner.pk_to_pk_id.insert(std::mem::take(primary_key), pk_id);
};

Expand Down
5 changes: 2 additions & 3 deletions src/mito2/src/memtable/partition_tree/shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,6 @@ impl Node for ShardNode {

#[cfg(test)]
mod tests {
use std::collections::BTreeMap;
use std::sync::Arc;

use super::*;
Expand Down Expand Up @@ -488,10 +487,10 @@ mod tests {
encode_keys(&metadata, kvs, &mut keys);
}
for key in &keys {
dict_builder.insert_key(key, &mut metrics);
dict_builder.insert_key(key, None, &mut metrics);
}

let dict = dict_builder.finish(&mut BTreeMap::new()).unwrap();
let (dict, _) = dict_builder.finish().unwrap();
let data_parts = DataParts::new(metadata, DATA_INIT_CAP, true);

Shard::new(
Expand Down
24 changes: 13 additions & 11 deletions src/mito2/src/memtable/partition_tree/shard_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

//! Builder of a shard.

use std::collections::{BTreeMap, HashMap};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};

Expand Down Expand Up @@ -71,12 +71,15 @@ impl ShardBuilder {
/// Write a key value with its encoded primary key.
pub fn write_with_key(
&mut self,
primary_key: &[u8],
full_primary_key: &[u8],
sparse_key: Option<&[u8]>,
key_value: &KeyValue,
metrics: &mut WriteMetrics,
) -> PkId {
// Safety: we check whether the builder need to freeze before.
let pk_index = self.dict_builder.insert_key(primary_key, metrics);
let pk_index = self
.dict_builder
.insert_key(full_primary_key, sparse_key, metrics);
self.data_buffer.write_row(pk_index, key_value);
PkId {
shard_id: self.current_shard_id,
Expand Down Expand Up @@ -106,10 +109,8 @@ impl ShardBuilder {
return Ok(None);
}

let mut pk_to_index = BTreeMap::new();
let key_dict = self.dict_builder.finish(&mut pk_to_index);
let data_part = match &key_dict {
Some(dict) => {
let (data_part, key_dict) = match self.dict_builder.finish() {
Some((dict, pk_to_index)) => {
v0y4g3r marked this conversation as resolved.
Show resolved Hide resolved
// Adds mapping to the map.
pk_to_pk_id.reserve(pk_to_index.len());
for (k, pk_index) in pk_to_index {
Expand All @@ -123,11 +124,12 @@ impl ShardBuilder {
}

let pk_weights = dict.pk_weights_to_sort_data();
self.data_buffer.freeze(Some(&pk_weights), true)?
let part = self.data_buffer.freeze(Some(&pk_weights), true)?;
(part, Some(dict))
}
None => {
let pk_weights = [0];
self.data_buffer.freeze(Some(&pk_weights), true)?
(self.data_buffer.freeze(Some(&pk_weights), true)?, None)
}
};

Expand Down Expand Up @@ -367,7 +369,7 @@ mod tests {
for key_values in &input {
for kv in key_values.iter() {
let key = encode_key_by_kv(&kv);
shard_builder.write_with_key(&key, &kv, &mut metrics);
shard_builder.write_with_key(&key, None, &kv, &mut metrics);
}
}
let shard = shard_builder
Expand All @@ -389,7 +391,7 @@ mod tests {
for key_values in &input {
for kv in key_values.iter() {
let key = encode_key_by_kv(&kv);
shard_builder.write_with_key(&key, &kv, &mut metrics);
shard_builder.write_with_key(&key, None, &kv, &mut metrics);
}
}

Expand Down