diff --git a/nucliadb_vectors/src/data_point/mod.rs b/nucliadb_vectors/src/data_point/mod.rs index a29b6dab83..369116f7bc 100644 --- a/nucliadb_vectors/src/data_point/mod.rs +++ b/nucliadb_vectors/src/data_point/mod.rs @@ -236,7 +236,7 @@ impl<'a, Dlog: DeleteLog> Retriever<'a, Dlog> { nodes, delete_log, similarity, - no_nodes: key_value::get_no_elems(nodes), + no_nodes: key_value::elements_in_total(nodes), min_score, } } @@ -441,7 +441,7 @@ impl AsRef for DataPoint { impl DataPoint { pub fn stored_len(&self) -> Option { - if key_value::get_no_elems(&self.nodes) == 0 { + if key_value::elements_in_total(&self.nodes) == 0 { return None; } let node = key_value::get_value(Node, &self.nodes, 0); @@ -481,7 +481,7 @@ impl DataPoint { min_score, ); - let no_nodes = key_value::get_no_elems(&self.nodes); + let no_nodes = key_value::elements_in_total(&self.nodes); let filter = FormulaFilter::new( filter, @@ -540,13 +540,14 @@ impl DataPoint { .iter() .map(|dp| ((dp.0, Node), dp.1.nodes.as_ref())); { + let node_producers: Vec<_> = node_producers.collect(); let mut node_buffer = BufWriter::new(&mut nodes); - key_value::merge(&mut node_buffer, node_producers.collect())?; + key_value::merge(&mut node_buffer, &node_producers)?; node_buffer.flush()?; } let nodes = unsafe { Mmap::map(&nodes)? }; - let no_nodes = key_value::get_no_elems(&nodes); + let no_nodes = key_value::elements_in_total(&nodes); // Creating the FSTs with the new nodes let (label_index, key_index) = if channel == Channel::EXPERIMENTAL { @@ -655,7 +656,7 @@ impl DataPoint { root_dir: &path::Path, nodes: &[u8], ) -> VectorR<(Option, Option)> { - let no_nodes = key_value::get_no_elems(nodes); + let no_nodes = key_value::elements_in_total(nodes); // building the KeyIndex and LabelIndex FSTs let fst_dir = root_dir.join(file_names::FST); @@ -741,7 +742,7 @@ impl DataPoint { nodesf_buffer.flush()?; } let nodes = unsafe { Mmap::map(&nodesf)? }; - let no_nodes = key_value::get_no_elems(&nodes); + let no_nodes = key_value::elements_in_total(&nodes); // Creating the FSTs let (label_index, key_index) = if channel == Channel::EXPERIMENTAL { diff --git a/nucliadb_vectors/src/data_types/key_value.rs b/nucliadb_vectors/src/data_types/key_value.rs index ae080af0f6..29db61dc15 100644 --- a/nucliadb_vectors/src/data_types/key_value.rs +++ b/nucliadb_vectors/src/data_types/key_value.rs @@ -56,7 +56,8 @@ pub fn get_pointer(x: &[u8], i: usize) -> Pointer { } // O(1) -pub fn get_no_elems(x: &[u8]) -> HeaderE { +// Returns how many elements are in x, alive or deleted. +pub fn elements_in_total(x: &[u8]) -> HeaderE { usize_from_slice_le(&x[..HEADER_LEN]) } @@ -133,7 +134,7 @@ where // O(log n) where n is the number of slots in src. #[allow(unused)] pub fn search_by_key(interface: S, src: &[u8], key: &[u8]) -> Option { - let number_of_values = get_no_elems(src); + let number_of_values = elements_in_total(src); let mut start = 0; let mut end = number_of_values; let mut found = None; @@ -168,33 +169,14 @@ pub fn get_keys<'a, S: Slot + Copy + 'a>( interface: S, x: &'a [u8], ) -> impl Iterator { - (0..get_no_elems(x)) + (0..elements_in_total(x)) .map(move |i| get_value(interface, x, i)) .map(move |v| interface.get_key(v)) } -fn transfer_elem( - interface: S, - at: &mut R, - from: &[u8], - id: Pointer, - writen_elems: usize, - crnt_length: usize, -) -> io::Result -where - S: Slot, - R: Write + Seek, -{ - let idx_slot = (HEADER_LEN + (writen_elems * POINTER_LEN)) as u64; - let value = get_value::(interface, from, id); - at.seek(SeekFrom::Start(idx_slot))?; - at.write_all(&crnt_length.to_le_bytes())?; - at.seek(SeekFrom::Start(crnt_length as u64))?; - at.write_all(value)?; - Ok(crnt_length + value.len()) -} +// Returns the number of alive elements in the store and the space they consume. fn get_metrics(interface: S, source: &[u8]) -> (usize, usize) { - let len = get_no_elems(source); + let len = elements_in_total(source); let mut value_space = 0; let mut no_elems = 0; for id in 0..len { @@ -208,92 +190,194 @@ fn get_metrics(interface: S, source: &[u8]) -> (usize, usize) { (no_elems, value_space) } -// Merge algorithm for n key-value stores. +// Merge algorithm + +/// Given the ids of unfinished producers, this function will find the minimum +/// alive element. +fn minimum_alive_value<'a, S: Slot + Copy>( + unfinished: &[usize], + status: &[usize], + producers: &[(S, &'a [u8])], +) -> Option<&'a [u8]> { + let mut minimum = None; + for producer in unfinished.iter().copied() { + let next_elem = status[producer]; + let (interface, store) = producers[producer]; + let element_pointer = get_pointer(store, next_elem); + let (element_slice, _) = interface.read_exact(&store[element_pointer..]); + match minimum { + _ if !interface.keep_in_merge(element_slice) => (), + None => { + minimum = Some(element_slice); + } + Some(minimum_slice) => { + minimum = Some(std::cmp::min_by(minimum_slice, element_slice, |i, j| { + interface.cmp_slot(i, j) + })); + } + } + } + minimum +} + +/// Moves the cursor to the next alive element. +fn advance_producer(interface: S, cursor: &mut usize, length: usize, store: &[u8]) { + while *cursor < length { + // cursor points to an in-range element. + let element_pointer = get_pointer(store, *cursor); + let element_slice = &store[element_pointer..]; + if interface.keep_in_merge(element_slice) { + // The element is alive, we can stop + break; + } + + // The element is not alive, + // the cursor is moved to the next one. + *cursor += 1; + } +} + +/// The status of every producer pointing to the current minimum or to a deleted element +/// will be changed to the next element. +fn advance_merge( + minimum: Option<&[u8]>, + unfinished: &mut Vec, + status: &mut [usize], + producers: &[(S, &[u8])], + lengths: &[usize], +) { + let current_work = std::mem::take(unfinished); + for producer in current_work.iter().copied() { + let next_elem = status[producer]; + let (interface, store) = producers[producer]; + let element_pointer = get_pointer(store, next_elem); + let element_slice = &store[element_pointer..]; + + if minimum.map_or(false, |m| interface.cmp_slot(m, element_slice).is_eq()) { + status[producer] += 1; + } + + advance_producer(interface, &mut status[producer], lengths[producer], store); + + if status[producer] < lengths[producer] { + unfinished.push(producer) + } + } +} + +// Entry point for the merge algorithm. Returns the number of elements merged into the file. // WARNING: In case of keys duplicatied keys it favors the contents of the first slot. -// Returns the number of elements merged into the file. -pub fn merge(recepient: &mut R, producers: Vec<(S, &[u8])>) -> io::Result +pub fn merge(recipient: &mut R, producers: &[(S, &[u8])]) -> io::Result where S: Slot + Copy, R: Write + Seek, { - let lens = producers - .iter() - .copied() - .map(|(_, data)| data) - .map(get_no_elems) - .collect::>(); - - // The number of elements that will remain at the merged file - // needs to be computed so the space is reserved. - let (no_elems, value_space) = producers - .iter() - .copied() - .map(|(interface, p)| get_metrics::(interface, p)) - .fold((0, 0), |(ne, vs), (ne_p, vs_p)| (ne + ne_p, vs + vs_p)); - - // Reserve space - let total_space = HEADER_LEN + (POINTER_LEN * no_elems) + value_space; + // Number of elements that will be transferred to the recipient. + let mut number_of_elements = 0; + // Space required to perform the merge. + let mut value_space = 0; + // Index of the producers that have alive elements. + let mut non_empty_producers = Vec::with_capacity(producers.len()); + // Number of total elements (alive or deleted) each producer has + let mut lens = Vec::with_capacity(producers.len()); + // Per producer, which element should be visited next. + let mut status = Vec::with_capacity(producers.len()); + + // Initializing merge loop parameters + for (id, (interface, store)) in producers.iter().copied().enumerate() { + let total_elements = elements_in_total(store); + let (alive_elements, space) = get_metrics::(interface, store); + number_of_elements += alive_elements; + value_space += space; + + let mut cursor = total_elements; + if alive_elements > 0 { + cursor = 0; + advance_producer(interface, &mut cursor, total_elements, store); + } + if cursor < total_elements { + non_empty_producers.push(id); + } + + status.push(cursor); + lens.push(total_elements); + } + + // Taking deletions into account, how many elements will be on the recipient. + let number_of_elements = number_of_elements; + // Taking deletions into account, the number of bytes required to fit them. + let value_space = value_space; + // Per producer, the number of elements it has (deleted included). + let producer_lengths = lens; + // Unfinished producers, initialized to every non-empty producer. + let mut unfinished = non_empty_producers; + // At most, the merge will need total_space bytes. + let total_space = HEADER_LEN + (POINTER_LEN * number_of_elements) + value_space; + // Number of elements that have been written into the recipient. + let mut written_elements = 0; + // Length of the recipient's written part + let mut recipient_length = HEADER_LEN + (POINTER_LEN * number_of_elements); + + // Reserving enough space to fit the merge result. for _ in 0..total_space { - recepient.write_all(&[0])?; + recipient.write_all(&[0])?; } - // Merge loop - let mut writen_elems = 0; - let mut crnt_length = HEADER_LEN + (POINTER_LEN * no_elems); - let mut ids = vec![0usize; producers.len()]; - - while ids - .iter() - .copied() - .zip(lens.iter().copied()) - .any(|(id, len)| id < len) - { - let min_data = producers - .iter() - .copied() - .zip(ids.iter().copied()) - .zip(lens.iter().copied()) - .filter(|((_, x_id), x_len)| *x_id < *x_len) - .map(|((x, x_id), _)| (x, x_id, get_pointer(x.1, x_id))) - .filter(|((interface, x), _, x_ptr)| interface.keep_in_merge(&x[*x_ptr..])) - .min_by(|(x, _, x_ptr), (y, _, y_ptr)| x.0.cmp_slot(&x.1[*x_ptr..], &y.1[*y_ptr..])); - producers - .iter() - .copied() - .zip(ids.iter_mut()) - .zip(lens.iter().copied()) - .filter(|((_, x_id), x_len)| **x_id < *x_len) - .map(|((x, x_id), _)| (x, get_pointer(x.1, *x_id), x_id)) - .for_each(|((interface, x), x_ptr, x_id)| { - let is_equal = min_data.map(|(min, _, min_ptr)| { - interface.cmp_slot(&x[x_ptr..], &min.1[min_ptr..]).is_eq() - }); - if !interface.keep_in_merge(&x[x_ptr..]) { - *x_id += 1; - } else if is_equal.unwrap_or_default() { - *x_id += 1 - } - }); - if let Some(((interface, min), min_id, _)) = min_data { - crnt_length = - transfer_elem(interface, recepient, min, min_id, writen_elems, crnt_length)?; - writen_elems += 1; + // Merge loop, transfers the contents of each producer into the recipient in order + while !unfinished.is_empty() { + let current_minimum = minimum_alive_value(&unfinished, &status, producers); + if let Some(minimum) = current_minimum { + // There is a minimum that needs to be transferred to the recipient. + let idx_slot = (HEADER_LEN + (written_elements * POINTER_LEN)) as u64; + recipient.seek(SeekFrom::Start(idx_slot))?; + recipient.write_all(&recipient_length.to_le_bytes())?; + recipient.seek(SeekFrom::Start(recipient_length as u64))?; + recipient.write_all(minimum)?; + recipient_length += minimum.len(); + written_elements += 1; } + + // Every producer is advanced. + advance_merge( + current_minimum, + &mut unfinished, + &mut status, + producers, + &producer_lengths, + ); } + // Write the number of elements - recepient.seek(SeekFrom::Start(0))?; - recepient.write_all(&writen_elems.to_le_bytes())?; - recepient.seek(SeekFrom::Start(0)).unwrap(); - recepient.flush()?; - Ok(writen_elems) + recipient.seek(SeekFrom::Start(0))?; + recipient.write_all(&written_elements.to_le_bytes())?; + recipient.seek(SeekFrom::Start(0)).unwrap(); + recipient.flush()?; + Ok(written_elements) } #[cfg(test)] mod tests { use super::*; + use crate::data_types::DeleteLog; + + const ZERO: [u8; 4] = [0, 0, 0, 0]; + const ONE: [u8; 4] = [0, 0, 0, 1]; + const THREE: [u8; 4] = [0, 0, 0, 3]; + const TWO: [u8; 4] = [0, 0, 0, 2]; + const SIX: [u8; 4] = [0, 0, 0, 6]; + const TEN: [u8; 4] = [0, 0, 0, 10]; + + #[derive(Clone, Copy)] + struct GreaterThan([u8; 4]); + impl DeleteLog for GreaterThan { + fn is_deleted(&self, x: &[u8]) -> bool { + TElem.cmp_keys(x, &self.0).is_le() + } + } + // u32 numbers in big-endian (so cmp is faster) #[derive(Clone, Copy)] - pub struct TElem; + struct TElem; impl Slot for TElem { fn get_key<'a>(&self, x: &'a [u8]) -> &'a [u8] { &x[0..4] @@ -307,7 +391,7 @@ mod tests { } fn store_checks(expected: &[impl AsRef<[u8]>], buf: &[u8]) { - let no_values = get_no_elems(buf); + let no_values = elements_in_total(buf); assert_eq!(no_values, expected.len()); for (i, item) in expected.iter().enumerate() { let value_ptr = get_pointer(buf, i); @@ -375,10 +459,217 @@ mod tests { .into_iter() .map(|e| (TElem, e.as_slice())) .collect(); - merge(&mut file, elems).unwrap(); + merge(&mut file, &elems).unwrap(); let mut buf = vec![]; file.read_to_end(&mut buf).unwrap(); store_checks(&expected, &buf); retrieval_checks(&expected, &buf); } + + #[test] + fn merge_some_deleted_different_length() { + let v0: Vec<_> = [0u32, 1, 2].iter().map(|x| x.to_be_bytes()).collect(); + let v1: Vec<_> = [3u32, 4, 5, 11].iter().map(|x| x.to_be_bytes()).collect(); + let v2: Vec<_> = [6u32, 7, 8, 9, 10] + .iter() + .map(|x| x.to_be_bytes()) + .collect(); + let mut v0_store = vec![]; + let mut v1_store = vec![]; + let mut v2_store = vec![]; + create_key_value(&mut v0_store, v0).unwrap(); + create_key_value(&mut v1_store, v1).unwrap(); + create_key_value(&mut v2_store, v2).unwrap(); + + let interface = (GreaterThan(ONE), TElem); + let mut file = tempfile::tempfile().unwrap(); + + let elems = vec![ + // zero and 1 will be removed + (interface, v0_store.as_slice()), + // no element is removed + (interface, v1_store.as_slice()), + // no element is removed + (interface, v2_store.as_slice()), + ]; + merge::<(GreaterThan, TElem), std::fs::File>(&mut file, elems.as_slice()).unwrap(); + let expected: Vec = vec![2, 3, 4, 5, 6, 7, 8, 9, 10, 11]; + let merge_store = unsafe { memmap2::Mmap::map(&file).unwrap() }; + let number_of_elements = elements_in_total(&merge_store); + let values: Vec = (0..number_of_elements) + .map(|i| get_value(TElem, &merge_store, i)) + .map(|s| u32::from_be_bytes(s.try_into().unwrap())) + .collect(); + assert_eq!(values, expected); + } + + #[test] + fn merge_some_elements_deleted() { + let v0: Vec<_> = [0u32, 1, 2].iter().map(|x| x.to_be_bytes()).collect(); + let v1: Vec<_> = [3u32, 4, 5].iter().map(|x| x.to_be_bytes()).collect(); + let v2: Vec<_> = [6u32, 7, 8].iter().map(|x| x.to_be_bytes()).collect(); + let mut v0_store = vec![]; + let mut v1_store = vec![]; + let mut v2_store = vec![]; + create_key_value(&mut v0_store, v0).unwrap(); + create_key_value(&mut v1_store, v1).unwrap(); + create_key_value(&mut v2_store, v2).unwrap(); + + let interface = (GreaterThan(ONE), TElem); + let mut file = tempfile::tempfile().unwrap(); + + let elems = vec![ + // zero and 1 will be removed + (interface, v0_store.as_slice()), + // no element is removed + (interface, v1_store.as_slice()), + // no element is removed + (interface, v2_store.as_slice()), + ]; + merge::<(GreaterThan, TElem), std::fs::File>(&mut file, elems.as_slice()).unwrap(); + let expected: Vec = vec![2, 3, 4, 5, 6, 7, 8]; + let merge_store = unsafe { memmap2::Mmap::map(&file).unwrap() }; + let number_of_elements = elements_in_total(&merge_store); + let values: Vec = (0..number_of_elements) + .map(|i| get_value(TElem, &merge_store, i)) + .map(|s| u32::from_be_bytes(s.try_into().unwrap())) + .collect(); + assert_eq!(values, expected); + } + + #[test] + fn merge_first_deleted() { + let v0: Vec<_> = [0u32, 1, 2].iter().map(|x| x.to_be_bytes()).collect(); + let v1: Vec<_> = [3u32, 4, 5].iter().map(|x| x.to_be_bytes()).collect(); + let v2: Vec<_> = [6u32, 7, 8].iter().map(|x| x.to_be_bytes()).collect(); + let mut v0_store = vec![]; + let mut v1_store = vec![]; + let mut v2_store = vec![]; + create_key_value(&mut v0_store, v0).unwrap(); + create_key_value(&mut v1_store, v1).unwrap(); + create_key_value(&mut v2_store, v2).unwrap(); + + let mut file = tempfile::tempfile().unwrap(); + + let elems = vec![ + // The first element is deleted + ((GreaterThan(ZERO), TElem), v0_store.as_slice()), + // The first element is deleted + ((GreaterThan(THREE), TElem), v1_store.as_slice()), + // The first element is deleted + ((GreaterThan(SIX), TElem), v2_store.as_slice()), + ]; + merge::<(GreaterThan, TElem), std::fs::File>(&mut file, elems.as_slice()).unwrap(); + let expected: Vec = vec![1, 2, 4, 5, 7, 8]; + let merge_store = unsafe { memmap2::Mmap::map(&file).unwrap() }; + let number_of_elements = elements_in_total(&merge_store); + let values: Vec = (0..number_of_elements) + .map(|i| get_value(TElem, &merge_store, i)) + .map(|s| u32::from_be_bytes(s.try_into().unwrap())) + .collect(); + assert_eq!(values, expected); + } + + #[test] + fn merge_one_store_empty() { + let v0: Vec<_> = [0u32, 1, 2].iter().map(|x| x.to_be_bytes()).collect(); + let v1: Vec<_> = [3u32, 4, 5].iter().map(|x| x.to_be_bytes()).collect(); + let v2: Vec<_> = [6u32, 7, 8].iter().map(|x| x.to_be_bytes()).collect(); + let mut v0_store = vec![]; + let mut v1_store = vec![]; + let mut v2_store = vec![]; + create_key_value(&mut v0_store, v0).unwrap(); + create_key_value(&mut v1_store, v1).unwrap(); + create_key_value(&mut v2_store, v2).unwrap(); + + let interface = (GreaterThan(TWO), TElem); + let mut file = tempfile::tempfile().unwrap(); + + let elems = vec![ + // all the elements are deleted + (interface, v0_store.as_slice()), + // no element is removed + (interface, v1_store.as_slice()), + // no element is removed + (interface, v2_store.as_slice()), + ]; + merge::<(GreaterThan, TElem), std::fs::File>(&mut file, elems.as_slice()).unwrap(); + let expected: Vec = vec![3, 4, 5, 6, 7, 8]; + let merge_store = unsafe { memmap2::Mmap::map(&file).unwrap() }; + let number_of_elements = elements_in_total(&merge_store); + let values: Vec = (0..number_of_elements) + .map(|i| get_value(TElem, &merge_store, i)) + .map(|s| u32::from_be_bytes(s.try_into().unwrap())) + .collect(); + assert_eq!(values, expected); + } + + #[test] + fn merge_with_different_interfaces() { + let v0: Vec<_> = [0u32, 1, 2].iter().map(|x| x.to_be_bytes()).collect(); + let v1: Vec<_> = [3u32, 4, 5].iter().map(|x| x.to_be_bytes()).collect(); + let v2: Vec<_> = [6u32, 7, 8].iter().map(|x| x.to_be_bytes()).collect(); + let mut v0_store = vec![]; + let mut v1_store = vec![]; + let mut v2_store = vec![]; + create_key_value(&mut v0_store, v0).unwrap(); + create_key_value(&mut v1_store, v1).unwrap(); + create_key_value(&mut v2_store, v2).unwrap(); + + let greater_than_2 = (GreaterThan(TWO), TElem); + let greater_than_10 = (GreaterThan(TEN), TElem); + let mut file = tempfile::tempfile().unwrap(); + + let elems = vec![ + // all the elements are removed + (greater_than_2, v0_store.as_slice()), + // no element are removed + (greater_than_2, v1_store.as_slice()), + // all the elements are removed + (greater_than_10, v2_store.as_slice()), + ]; + merge::<(GreaterThan, TElem), std::fs::File>(&mut file, elems.as_slice()).unwrap(); + let expected: Vec = vec![3, 4, 5]; + let merge_store = unsafe { memmap2::Mmap::map(&file).unwrap() }; + let number_of_elements = elements_in_total(&merge_store); + let values: Vec = (0..number_of_elements) + .map(|i| get_value(TElem, &merge_store, i)) + .map(|s| u32::from_be_bytes(s.try_into().unwrap())) + .collect(); + assert_eq!(values, expected); + } + + #[test] + fn merge_all_deleted() { + let v0: Vec<_> = [0u32, 1, 2].iter().map(|x| x.to_be_bytes()).collect(); + let v1: Vec<_> = [3u32, 4, 5].iter().map(|x| x.to_be_bytes()).collect(); + let v2: Vec<_> = [6u32, 7, 8].iter().map(|x| x.to_be_bytes()).collect(); + let mut v0_store = vec![]; + let mut v1_store = vec![]; + let mut v2_store = vec![]; + create_key_value(&mut v0_store, v0).unwrap(); + create_key_value(&mut v1_store, v1).unwrap(); + create_key_value(&mut v2_store, v2).unwrap(); + + let interface = (GreaterThan(TEN), TElem); + let mut file = tempfile::tempfile().unwrap(); + + let elems = vec![ + // all the elements are removed + (interface, v0_store.as_slice()), + // all the elements are removed + (interface, v1_store.as_slice()), + // all the elements are removed + (interface, v2_store.as_slice()), + ]; + merge::<(GreaterThan, TElem), std::fs::File>(&mut file, elems.as_slice()).unwrap(); + let expected: Vec = vec![]; + let merge_store = unsafe { memmap2::Mmap::map(&file).unwrap() }; + let number_of_elements = elements_in_total(&merge_store); + let values: Vec = (0..number_of_elements) + .map(|i| get_value(TElem, &merge_store, i)) + .map(|s| u32::from_be_bytes(s.try_into().unwrap())) + .collect(); + assert_eq!(values, expected); + } }