Skip to content

Commit

Permalink
more testing + fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
hermeGarcia committed Nov 27, 2023
1 parent 737024f commit e0e6221
Showing 1 changed file with 87 additions and 22 deletions.
109 changes: 87 additions & 22 deletions nucliadb_vectors/src/data_types/key_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,35 +203,35 @@ fn compute_unfinished(buffer: &mut Vec<usize>, status: &[usize], lens: &[usize])
}

/// Given the ids of unfinished producers, this function will find the minimum
/// alive element. The status of every producer pointing to deleted element will
/// be change to the next element.
/// alive element.
fn minimum_alive_value<'a, S: Slot + Copy>(
unfinished: &[usize],
status: &mut [usize],
status: &[usize],
producers: &[(S, &'a [u8])],
) -> Option<&'a [u8]> {
let mut minimum = None;
for producer in unfinished.iter().copied() {
let next_elem = status[producer];
let interface = producers[producer].0;
let store = producers[producer].1;
let (interface, store) = producers[producer];
let element_pointer = get_pointer(store, next_elem);
let (element_slice, _) = interface.read_exact(&store[element_pointer..]);
if !interface.keep_in_merge(element_slice) {
status[producer] += 1;
} else if let Some(minimum_slice) = minimum {
minimum = Some(std::cmp::min_by(minimum_slice, element_slice, |i, j| {
interface.cmp_slot(i, j)
}));
} else {
minimum = Some(element_slice);
match minimum {
_ if !interface.keep_in_merge(element_slice) => (),
None => {
minimum = Some(element_slice);
}
Some(minimum_slice) => {
minimum = Some(std::cmp::min_by(minimum_slice, element_slice, |i, j| {
interface.cmp_slot(i, j)
}));
}
}
}
minimum
}

/// The status of every producer pointing to the current minimum
/// will be change to the next element.
/// The status of every producer pointing to the current minimum or to a deleted element
/// will be changed to the next element.
fn advance_merge<S: Slot + Copy>(
minimum: &[u8],
unfinished: &[usize],
Expand All @@ -240,12 +240,16 @@ fn advance_merge<S: Slot + Copy>(
) {
for producer in unfinished.iter().copied() {
let next_elem = status[producer];
let interface = producers[producer].0;
let store = producers[producer].1;
let (interface, store) = producers[producer];
let element_pointer = get_pointer(store, next_elem);
let element_slice = &store[element_pointer..];
let cursor_should_be_moved =
// The current element is no longer alive.
!interface.keep_in_merge(element_slice)
// The current element is the equal to the minimum.
|| interface.cmp_slot(minimum, element_slice).is_eq();

if interface.cmp_slot(minimum, element_slice).is_eq() {
if cursor_should_be_moved {
status[producer] += 1;
}
}
Expand All @@ -266,6 +270,8 @@ where
let mut non_empty_producers = Vec::with_capacity(producers.len());
// Number of total elements (alive or deleted) each producer has
let mut lens = Vec::with_capacity(producers.len());
// Per producer, which element should be visited next.
let mut status = Vec::with_capacity(producers.len());

// Initializing merge loop parameters
for (id, (interface, data)) in producers.iter().copied().enumerate() {
Expand All @@ -276,7 +282,12 @@ where

lens.push(total_elements);
if alive_elements > 0 {
// There are elements to look for in this producer.
status.push(0);
non_empty_producers.push(id);
} else {
// The cursor is moved to the end.
status.push(total_elements);
}
}

Expand All @@ -294,8 +305,6 @@ where
let mut written_elements = 0;
// Length of the recipient's written part
let mut recipient_length = HEADER_LEN + (POINTER_LEN * number_of_elements);
// Per producer, which element should be visited next.
let mut status = vec![0usize; producers.len()];

// Reserving enough space to fit the merge result.
for _ in 0..total_space {
Expand All @@ -304,7 +313,7 @@ where

// Merge loop, transfers the contents of each producer into the recipient in order
while !unfinished.is_empty() {
if let Some(minimum) = minimum_alive_value(&unfinished, &mut status, producers) {
if let Some(minimum) = minimum_alive_value(&unfinished, &status, producers) {
// There is a minimum that needs to be transferred to the recipient.
let idx_slot = (HEADER_LEN + (written_elements * POINTER_LEN)) as u64;
recipient.seek(SeekFrom::Start(idx_slot))?;
Expand All @@ -330,9 +339,22 @@ where
#[cfg(test)]
mod tests {
use super::*;
use crate::data_types::DeleteLog;

const TWO: [u8; 4] = [0, 0, 0, 2];
const TEN: [u8; 4] = [0, 0, 0, 10];

#[derive(Clone, Copy)]
struct GreaterThan([u8; 4]);
impl DeleteLog for GreaterThan {
fn is_deleted(&self, x: &[u8]) -> bool {
TElem.cmp_keys(x, &self.0).is_le()
}
}

// u32 numbers in big-endian (so cmp is faster)
#[derive(Clone, Copy)]
pub struct TElem;
struct TElem;
impl Slot for TElem {
fn get_key<'a>(&self, x: &'a [u8]) -> &'a [u8] {
&x[0..4]
Expand Down Expand Up @@ -420,4 +442,47 @@ mod tests {
store_checks(&expected, &buf);
retrieval_checks(&expected, &buf);
}

#[test]
fn merge_test_with_deletes() {
let v0: Vec<_> = [0u32, 1, 2].iter().map(|x| x.to_be_bytes()).collect();
let v1: Vec<_> = [3u32, 4, 5].iter().map(|x| x.to_be_bytes()).collect();
let v2: Vec<_> = [6u32, 7, 8].iter().map(|x| x.to_be_bytes()).collect();
let mut v0_store = vec![];
let mut v1_store = vec![];
let mut v2_store = vec![];
create_key_value(&mut v0_store, v0).unwrap();
create_key_value(&mut v1_store, v1).unwrap();
create_key_value(&mut v2_store, v2).unwrap();

let greater_than_2 = (GreaterThan(TWO), TElem);
let greater_than_10 = (GreaterThan(TEN), TElem);
let mut file = tempfile::tempfile().unwrap();

let elems = vec![
// cero and one will be removed
(greater_than_2, v0_store.as_slice()),
// no element is removed
(greater_than_2, v1_store.as_slice()),
// no element is removed
(greater_than_2, v2_store.as_slice()),
];
merge::<(GreaterThan, TElem), std::fs::File>(&mut file, elems.as_slice()).unwrap();
let merge_store = unsafe { memmap2::Mmap::map(&file).unwrap() };
let number_of_elements = elements_in_total(&merge_store);
assert_eq!(number_of_elements, 6);

let elems = vec![
// cero and one will be removed
(greater_than_2, v0_store.as_slice()),
// no element is removed
(greater_than_2, v1_store.as_slice()),
// no element is removed
(greater_than_10, v2_store.as_slice()),
];
merge::<(GreaterThan, TElem), std::fs::File>(&mut file, elems.as_slice()).unwrap();
let merge_store = unsafe { memmap2::Mmap::map(&file).unwrap() };
let number_of_elements = elements_in_total(&merge_store);
assert_eq!(number_of_elements, 3);
}
}

0 comments on commit e0e6221

Please sign in to comment.