From 7428401f9486d6c644f70bc17b3ee06bb30ce6b9 Mon Sep 17 00:00:00 2001 From: DarkSky <25152247+darkskygit@users.noreply.github.com> Date: Fri, 17 Nov 2023 12:15:03 +0800 Subject: [PATCH] perf: improve apply update performance (y-crdt/y-octo#34) this PR partially solves y-crdt/y-octo##33 and greatly improves the performance of apply update test cases show a performance improvement of about 40-50% --- Cargo.lock | 1 + libs/jwst-codec-utils/Cargo.toml | 4 + .../benches/apply_benchmarks.rs | 35 ++++++++ libs/jwst-codec/Cargo.toml | 5 ++ libs/jwst-codec/benches/apply_benchmarks.rs | 34 ++++++++ libs/jwst-codec/src/doc/codec/any.rs | 2 +- libs/jwst-codec/src/doc/codec/content.rs | 3 +- libs/jwst-codec/src/doc/codec/delete_set.rs | 19 +++-- libs/jwst-codec/src/doc/codec/update.rs | 21 +++-- libs/jwst-codec/src/doc/common/mod.rs | 2 + libs/jwst-codec/src/doc/common/range.rs | 79 +++++++++++-------- libs/jwst-codec/src/doc/common/somr.rs | 1 + libs/jwst-codec/src/doc/common/state.rs | 18 ++--- libs/jwst-codec/src/doc/document.rs | 6 +- libs/jwst-codec/src/doc/hasher.rs | 40 ++++++++++ libs/jwst-codec/src/doc/history.rs | 9 +-- libs/jwst-codec/src/doc/mod.rs | 3 + libs/jwst-codec/src/doc/store.rs | 49 ++++++++---- libs/jwst-codec/src/doc/types/mod.rs | 5 +- libs/jwst-codec/src/lib.rs | 8 +- libs/jwst-core/src/lib.rs | 2 +- .../jwst-core/src/workspaces/metadata/meta.rs | 4 +- libs/jwst-core/src/workspaces/mod.rs | 2 +- 23 files changed, 244 insertions(+), 108 deletions(-) create mode 100644 libs/jwst-codec-utils/benches/apply_benchmarks.rs create mode 100644 libs/jwst-codec/benches/apply_benchmarks.rs create mode 100644 libs/jwst-codec/src/doc/hasher.rs diff --git a/Cargo.lock b/Cargo.lock index 125ac1fd..36072788 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2285,6 +2285,7 @@ dependencies = [ name = "jwst-codec" version = "0.1.0" dependencies = [ + "ahash 0.8.6", "arbitrary", "assert-json-diff", "bitvec", diff --git a/libs/jwst-codec-utils/Cargo.toml b/libs/jwst-codec-utils/Cargo.toml index a8419116..9fc08f3a 100644 --- a/libs/jwst-codec-utils/Cargo.toml +++ b/libs/jwst-codec-utils/Cargo.toml @@ -59,5 +59,9 @@ name = "text_ops_benchmarks" harness = false name = "update_benchmarks" +[[bench]] +harness = false +name = "apply_benchmarks" + [lib] bench = true diff --git a/libs/jwst-codec-utils/benches/apply_benchmarks.rs b/libs/jwst-codec-utils/benches/apply_benchmarks.rs new file mode 100644 index 00000000..de2f3d3a --- /dev/null +++ b/libs/jwst-codec-utils/benches/apply_benchmarks.rs @@ -0,0 +1,35 @@ +mod utils; + +use std::time::Duration; + +use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput}; +use path_ext::PathExt; +use utils::Files; + +fn apply(c: &mut Criterion) { + let files = Files::load(); + + let mut group = c.benchmark_group("apply"); + group.measurement_time(Duration::from_secs(15)); + + for file in &files.files { + group.throughput(Throughput::Bytes(file.content.len() as u64)); + group.bench_with_input( + BenchmarkId::new("apply with yrs", file.path.name_str()), + &file.content, + |b, content| { + b.iter(|| { + use yrs::{updates::decoder::Decode, Doc, Transact, Update}; + let update = Update::decode_v1(content).unwrap(); + let doc = Doc::new(); + doc.transact_mut().apply_update(update); + }); + }, + ); + } + + group.finish(); +} + +criterion_group!(benches, apply); +criterion_main!(benches); diff --git a/libs/jwst-codec/Cargo.toml b/libs/jwst-codec/Cargo.toml index ea19bd14..03eececd 100644 --- a/libs/jwst-codec/Cargo.toml +++ b/libs/jwst-codec/Cargo.toml @@ -19,6 +19,7 @@ version = "0.1.0" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +ahash = "0.8" bitvec = "1.0" byteorder = "1.5" nom = "7.1" @@ -77,5 +78,9 @@ name = "text_ops_benchmarks" harness = false name = "update_benchmarks" +[[bench]] +harness = false +name = "apply_benchmarks" + [lib] bench = true diff --git a/libs/jwst-codec/benches/apply_benchmarks.rs b/libs/jwst-codec/benches/apply_benchmarks.rs new file mode 100644 index 00000000..d95083bb --- /dev/null +++ b/libs/jwst-codec/benches/apply_benchmarks.rs @@ -0,0 +1,34 @@ +mod utils; + +use std::time::Duration; + +use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput}; +use path_ext::PathExt; +use utils::Files; + +fn apply(c: &mut Criterion) { + let files = Files::load(); + + let mut group = c.benchmark_group("apply"); + group.measurement_time(Duration::from_secs(15)); + + for file in &files.files { + group.throughput(Throughput::Bytes(file.content.len() as u64)); + group.bench_with_input( + BenchmarkId::new("apply with jwst", file.path.name_str()), + &file.content, + |b, content| { + b.iter(|| { + use y_octo::*; + let mut doc = Doc::new(); + doc.apply_update_from_binary(content.clone()).unwrap() + }); + }, + ); + } + + group.finish(); +} + +criterion_group!(benches, apply); +criterion_main!(benches); diff --git a/libs/jwst-codec/src/doc/codec/any.rs b/libs/jwst-codec/src/doc/codec/any.rs index ee2e09c3..0cde55f2 100644 --- a/libs/jwst-codec/src/doc/codec/any.rs +++ b/libs/jwst-codec/src/doc/codec/any.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, fmt, ops::RangeInclusive}; +use std::{fmt, ops::RangeInclusive}; use ordered_float::OrderedFloat; diff --git a/libs/jwst-codec/src/doc/codec/content.rs b/libs/jwst-codec/src/doc/codec/content.rs index 06f0f982..db6bd9db 100644 --- a/libs/jwst-codec/src/doc/codec/content.rs +++ b/libs/jwst-codec/src/doc/codec/content.rs @@ -219,7 +219,8 @@ impl Content { match self { Self::Deleted(len) => *len, Self::Json(strings) => strings.len() as u64, - Self::String(string) => string.encode_utf16().count() as u64, + // TODO: need a custom wrapper with length cached, this cost too much + Self::String(string) => string.chars().map(|c| c.len_utf16()).sum::() as u64, Self::Any(any) => any.len() as u64, Self::Binary(_) | Self::Embed(_) | Self::Format { .. } | Self::Type(_) | Self::Doc { .. } => 1, } diff --git a/libs/jwst-codec/src/doc/codec/delete_set.rs b/libs/jwst-codec/src/doc/codec/delete_set.rs index 1cbc49d9..33d4a45e 100644 --- a/libs/jwst-codec/src/doc/codec/delete_set.rs +++ b/libs/jwst-codec/src/doc/codec/delete_set.rs @@ -1,5 +1,5 @@ use std::{ - collections::{hash_map::Entry, HashMap}, + collections::{hash_map::Entry, VecDeque}, ops::{Deref, DerefMut, Range}, }; @@ -28,10 +28,10 @@ impl CrdtRead for OrderRange { if num_of_deletes == 1 { Ok(OrderRange::Range(Range::::read(decoder)?)) } else { - let mut deletes = Vec::with_capacity(num_of_deletes); + let mut deletes = VecDeque::with_capacity(num_of_deletes); for _ in 0..num_of_deletes { - deletes.push(Range::::read(decoder)?); + deletes.push_back(Range::::read(decoder)?); } Ok(OrderRange::Fragment(deletes)) @@ -59,10 +59,10 @@ impl CrdtWrite for OrderRange { } #[derive(Debug, Default, Clone, PartialEq)] -pub struct DeleteSet(pub HashMap); +pub struct DeleteSet(pub ClientMap); impl Deref for DeleteSet { - type Target = HashMap; + type Target = ClientMap; fn deref(&self) -> &Self::Target { &self.0 @@ -71,7 +71,7 @@ impl Deref for DeleteSet { impl From<[(Client, Vec>); N]> for DeleteSet { fn from(value: [(Client, Vec>); N]) -> Self { - let mut map = HashMap::with_capacity(N); + let mut map = ClientMap::with_capacity(N); for (client, ranges) in value { map.insert(client, ranges.into()); } @@ -106,7 +106,6 @@ impl DeleteSet { } } - #[allow(dead_code)] pub fn batch_push(&mut self, client: Client, ranges: Vec>) { match self.0.entry(client) { Entry::Occupied(e) => { @@ -136,7 +135,7 @@ impl CrdtRead for DeleteSet { fn read(decoder: &mut R) -> JwstCodecResult { let num_of_clients = decoder.read_var_u64()? as usize; // See: [HASHMAP_SAFE_CAPACITY] - let mut map = HashMap::with_capacity(num_of_clients.min(HASHMAP_SAFE_CAPACITY)); + let mut map = ClientMap::with_capacity(num_of_clients.min(HASHMAP_SAFE_CAPACITY)); for _ in 0..num_of_clients { let client = decoder.read_var_u64()?; @@ -188,7 +187,7 @@ mod tests { { let mut delete_set = delete_set; delete_set.add(1, 5, 10); - assert_eq!(delete_set.get(&1), Some(&OrderRange::Fragment(vec![0..15, 20..30]))); + assert_eq!(delete_set.get(&1), Some(&OrderRange::from(vec![0..15, 20..30]))); } } @@ -210,7 +209,7 @@ mod tests { { let mut delete_set = delete_set; delete_set.batch_push(1, vec![40..50, 10..20]); - assert_eq!(delete_set.get(&1), Some(&OrderRange::Fragment(vec![0..30, 40..50]))); + assert_eq!(delete_set.get(&1), Some(&OrderRange::from(vec![0..30, 40..50]))); } } diff --git a/libs/jwst-codec/src/doc/codec/update.rs b/libs/jwst-codec/src/doc/codec/update.rs index 7f7c6dbe..1bf78d66 100644 --- a/libs/jwst-codec/src/doc/codec/update.rs +++ b/libs/jwst-codec/src/doc/codec/update.rs @@ -1,20 +1,17 @@ -use std::{ - collections::{HashMap, VecDeque}, - ops::Range, -}; +use std::{collections::VecDeque, ops::Range}; use super::*; use crate::doc::StateVector; #[derive(Debug, Default, Clone)] pub struct Update { - pub(crate) structs: HashMap>, + pub(crate) structs: ClientMap>, pub(crate) delete_set: DeleteSet, /// all unapplicable items that we can't integrate into doc /// any item with inconsistent id clock or missing dependency will be put /// here - pub(crate) pending_structs: HashMap>, + pub(crate) pending_structs: ClientMap>, /// missing state vector after applying updates pub(crate) missing_state: StateVector, /// all unapplicable delete set @@ -26,7 +23,7 @@ impl CrdtRead for Update { let num_of_clients = decoder.read_var_u64()? as usize; // See: [HASHMAP_SAFE_CAPACITY] - let mut map = HashMap::with_capacity(num_of_clients.min(HASHMAP_SAFE_CAPACITY)); + let mut map = ClientMap::with_capacity(num_of_clients.min(HASHMAP_SAFE_CAPACITY)); for _ in 0..num_of_clients { let num_of_structs = decoder.read_var_u64()? as usize; let client = decoder.read_var_u64()?; @@ -531,7 +528,7 @@ mod tests { fn test_update_iterator() { loom_model!({ let mut update = Update { - structs: HashMap::from([ + structs: ClientMap::from_iter([ ( 0, VecDeque::from([ @@ -571,7 +568,7 @@ mod tests { loom_model!({ let mut update = Update { // an item with higher sequence id than local state - structs: HashMap::from([(0, VecDeque::from([struct_item((0, 4), 1)]))]), + structs: ClientMap::from_iter([(0, VecDeque::from([struct_item((0, 4), 1)]))]), ..Update::default() }; @@ -623,7 +620,7 @@ mod tests { fn should_add_skip_when_clock_not_continuous() { loom_model!({ let update = Update { - structs: HashMap::from([( + structs: ClientMap::from_iter([( 0, VecDeque::from([ struct_item((0, 0), 1), @@ -655,7 +652,7 @@ mod tests { fn merged_update_should_not_be_released_in_next_turn() { loom_model!({ let update = Update { - structs: HashMap::from([( + structs: ClientMap::from_iter([( 0, VecDeque::from([ struct_item((0, 0), 1), @@ -670,7 +667,7 @@ mod tests { let merged = Update::merge([update]); let update2 = Update { - structs: HashMap::from([( + structs: ClientMap::from_iter([( 0, VecDeque::from([struct_item((0, 30), 1), Node::new_gc((0, 32).into(), 1)]), )]), diff --git a/libs/jwst-codec/src/doc/common/mod.rs b/libs/jwst-codec/src/doc/common/mod.rs index 17952105..27f9a561 100644 --- a/libs/jwst-codec/src/doc/common/mod.rs +++ b/libs/jwst-codec/src/doc/common/mod.rs @@ -5,3 +5,5 @@ mod state; pub use range::*; pub use somr::*; pub use state::*; + +use super::*; diff --git a/libs/jwst-codec/src/doc/common/range.rs b/libs/jwst-codec/src/doc/common/range.rs index 0c295599..1059ed15 100644 --- a/libs/jwst-codec/src/doc/common/range.rs +++ b/libs/jwst-codec/src/doc/common/range.rs @@ -1,9 +1,9 @@ -use std::{mem, ops::Range}; +use std::{collections::VecDeque, mem, ops::Range}; #[derive(Debug, PartialEq, Eq, Clone)] pub enum OrderRange { Range(Range), - Fragment(Vec>), + Fragment(VecDeque>), } impl Default for OrderRange { @@ -20,10 +20,17 @@ impl From> for OrderRange { impl From>> for OrderRange { fn from(value: Vec>) -> Self { + Self::Fragment(value.into_iter().collect()) + } +} + +impl From>> for OrderRange { + fn from(value: VecDeque>) -> Self { Self::Fragment(value) } } +#[inline] fn is_continuous_range(lhs: &Range, rhs: &Range) -> bool { lhs.end >= rhs.start && lhs.start <= rhs.end } @@ -116,9 +123,9 @@ impl OrderRange { pub fn extends(&mut self, list: T) where - T: Into>>, + T: Into>>, { - let list: Vec<_> = list.into(); + let list: VecDeque<_> = list.into(); if list.is_empty() { return; } @@ -156,9 +163,9 @@ impl OrderRange { r.start = r.start.min(range.start); } else { *self = OrderRange::Fragment(if r.start < range.start { - vec![r.clone(), range] + VecDeque::from([r.clone(), range]) } else { - vec![range, r.clone()] + VecDeque::from([range, r.clone()]) }); } } @@ -201,16 +208,16 @@ impl OrderRange { // merge intersected range OrderRange::Range(a.start.min(b.start)..a.end.max(b.end)) } else { - OrderRange::Fragment(vec![a, b]) + OrderRange::Fragment(VecDeque::from([a, b])) } } (OrderRange::Fragment(mut a), OrderRange::Range(b)) => { - a.push(b); + a.push_back(b); OrderRange::Fragment(a) } (OrderRange::Range(a), OrderRange::Fragment(b)) => { let mut v = b; - v.push(a); + v.push_back(a); OrderRange::Fragment(v) } (OrderRange::Fragment(mut a), OrderRange::Fragment(mut b)) => { @@ -232,31 +239,33 @@ impl OrderRange { return; } - let mut cur_idx = 0; - let mut next_idx = 1; - while next_idx < ranges.len() { - let cur = &ranges[cur_idx]; - let next = &ranges[next_idx]; - if is_continuous_range(cur, next) { - ranges[cur_idx] = cur.start.min(next.start)..cur.end.max(next.end); - ranges[next_idx] = 0..0; + let mut changed = false; + let mut merged = Vec::with_capacity(ranges.len()); + let mut cur = ranges[0].clone(); + + for next in ranges.iter().skip(1) { + if is_continuous_range(&cur, next) { + cur.start = cur.start.min(next.start); + cur.end = cur.end.max(next.end); + changed = true; } else { - cur_idx = next_idx; + merged.push(cur); + cur = next.clone(); } - - next_idx += 1; } + merged.push(cur); - ranges.retain(|r| !r.is_empty()); - if ranges.len() == 1 { - *self = OrderRange::Range(ranges[0].clone()); + if merged.len() == 1 { + *self = OrderRange::Range(merged[0].clone()); + } else if changed { + mem::swap(ranges, &mut merged.into_iter().collect()); } } } fn sort(&mut self) { if let OrderRange::Fragment(ranges) = self { - ranges.sort_by(|lhs, rhs| lhs.start.cmp(&rhs.start)); + ranges.make_contiguous().sort_by(|lhs, rhs| lhs.start.cmp(&rhs.start)); } } @@ -266,7 +275,7 @@ impl OrderRange { } else { match self { OrderRange::Range(range) => Some(mem::replace(range, 0..0)), - OrderRange::Fragment(list) => Some(list.remove(0)), + OrderRange::Fragment(list) => list.pop_front(), } } } @@ -324,11 +333,11 @@ mod tests { // turn to fragment range.push(20..30); - assert_eq!(range, OrderRange::Fragment(vec![(0..15), (20..30)])); + assert_eq!(range, OrderRange::from(vec![(0..15), (20..30)])); // auto merge range.push(15..16); - assert_eq!(range, OrderRange::Fragment(vec![(0..16), (20..30)])); + assert_eq!(range, OrderRange::from(vec![(0..16), (20..30)])); // squash range.push(16..20); @@ -348,19 +357,19 @@ mod tests { #[test] fn test_ranges_squash() { - let mut range = OrderRange::Fragment(vec![(0..10), (20..30)]); + let mut range = OrderRange::from(vec![(0..10), (20..30)]); // do nothing range.squash(); - assert_eq!(range, OrderRange::Fragment(vec![(0..10), (20..30)])); + assert_eq!(range, OrderRange::from(vec![(0..10), (20..30)])); // merged into list - range = OrderRange::Fragment(vec![(0..10), (10..20), (30..40)]); + range = OrderRange::from(vec![(0..10), (10..20), (30..40)]); range.squash(); - assert_eq!(range, OrderRange::Fragment(vec![(0..20), (30..40)])); + assert_eq!(range, OrderRange::from(vec![(0..20), (30..40)])); // turn to range - range = OrderRange::Fragment(vec![(0..10), (10..20), (20..30)]); + range = OrderRange::from(vec![(0..10), (10..20), (20..30)]); range.squash(); assert_eq!(range, OrderRange::Range(0..30)); } @@ -369,7 +378,7 @@ mod tests { fn test_range_sort() { let mut range: OrderRange = vec![(20..30), (0..10), (10..50)].into(); range.sort(); - assert_eq!(range, OrderRange::Fragment(vec![(0..10), (10..50), (20..30)])); + assert_eq!(range, OrderRange::from(vec![(0..10), (10..50), (20..30)])); } #[test] @@ -430,11 +439,11 @@ mod tests { fn test_range_merge() { let mut range: OrderRange = (0..10).into(); range.merge((20..30).into()); - assert_eq!(range, OrderRange::Fragment(vec![(0..10), (20..30)])); + assert_eq!(range, OrderRange::from(vec![(0..10), (20..30)])); let mut range: OrderRange = (0..10).into(); range.merge(vec![(10..15), (20..30)].into()); - assert_eq!(range, OrderRange::Fragment(vec![(0..15), (20..30)])); + assert_eq!(range, OrderRange::from(vec![(0..15), (20..30)])); let mut range: OrderRange = vec![(0..10), (20..30)].into(); range.merge((10..20).into()); diff --git a/libs/jwst-codec/src/doc/common/somr.rs b/libs/jwst-codec/src/doc/common/somr.rs index bff2d204..59ea278b 100644 --- a/libs/jwst-codec/src/doc/common/somr.rs +++ b/libs/jwst-codec/src/doc/common/somr.rs @@ -10,6 +10,7 @@ use std::{ use crate::sync::Ordering; const DANGLING_PTR: usize = usize::MAX; +#[inline] fn is_dangling(ptr: NonNull) -> bool { ptr.as_ptr() as usize == DANGLING_PTR } diff --git a/libs/jwst-codec/src/doc/common/state.rs b/libs/jwst-codec/src/doc/common/state.rs index 3157fee9..d19bfcf2 100644 --- a/libs/jwst-codec/src/doc/common/state.rs +++ b/libs/jwst-codec/src/doc/common/state.rs @@ -1,12 +1,12 @@ -use std::{ - collections::HashMap, - ops::{Deref, DerefMut}, -}; +use std::ops::{Deref, DerefMut}; -use crate::{Client, Clock, CrdtRead, CrdtReader, CrdtWrite, CrdtWriter, Id, JwstCodecResult, HASHMAP_SAFE_CAPACITY}; +use super::{ + Client, ClientMap, Clock, CrdtRead, CrdtReader, CrdtWrite, CrdtWriter, HashMapExt, Id, JwstCodecResult, + HASHMAP_SAFE_CAPACITY, +}; #[derive(Default, Debug, PartialEq, Clone)] -pub struct StateVector(HashMap); +pub struct StateVector(ClientMap); impl StateVector { pub fn set_max(&mut self, client: Client, clock: Clock) { @@ -49,7 +49,7 @@ impl StateVector { } impl Deref for StateVector { - type Target = HashMap; + type Target = ClientMap; fn deref(&self) -> &Self::Target { &self.0 @@ -64,7 +64,7 @@ impl DerefMut for StateVector { impl From<[(Client, Clock); N]> for StateVector { fn from(value: [(Client, Clock); N]) -> Self { - let mut map = HashMap::with_capacity(N); + let mut map = ClientMap::with_capacity(N); for (client, clock) in value { map.insert(client, clock); @@ -79,7 +79,7 @@ impl CrdtRead for StateVector { let len = decoder.read_var_u64()? as usize; // See: [HASHMAP_SAFE_CAPACITY] - let mut map = HashMap::with_capacity(len.min(HASHMAP_SAFE_CAPACITY)); + let mut map = ClientMap::with_capacity(len.min(HASHMAP_SAFE_CAPACITY)); for _ in 0..len { let client = decoder.read_var_u64()?; let clock = decoder.read_var_u64()?; diff --git a/libs/jwst-codec/src/doc/document.rs b/libs/jwst-codec/src/doc/document.rs index d69dc30d..5050c63c 100644 --- a/libs/jwst-codec/src/doc/document.rs +++ b/libs/jwst-codec/src/doc/document.rs @@ -1,5 +1,3 @@ -use std::collections::HashMap; - use super::{history::StoreHistory, publisher::DocPublisher, store::StoreRef, *}; use crate::sync::{Arc, RwLock}; @@ -35,7 +33,7 @@ pub struct DocOptions { impl Default for DocOptions { fn default() -> Self { - if cfg!(test) { + if cfg!(any(test, feature = "bench")) { Self { client_id: 1, guid: "test".into(), @@ -96,7 +94,7 @@ impl DocOptions { impl From for Any { fn from(value: DocOptions) -> Self { - Any::Object(HashMap::from([ + Any::Object(HashMap::from_iter([ ("gc".into(), value.gc.into()), ("guid".into(), value.guid.into()), ])) diff --git a/libs/jwst-codec/src/doc/hasher.rs b/libs/jwst-codec/src/doc/hasher.rs new file mode 100644 index 00000000..22483542 --- /dev/null +++ b/libs/jwst-codec/src/doc/hasher.rs @@ -0,0 +1,40 @@ +use std::{ + collections::HashMap, + hash::{BuildHasher, Hasher}, +}; + +use super::Client; + +pub struct ClientHasher(Client); + +impl Hasher for ClientHasher { + fn finish(&self) -> u64 { + self.0 + } + + fn write(&mut self, _: &[u8]) {} + + fn write_u64(&mut self, i: u64) { + self.0 = i + } +} + +impl Default for ClientHasher { + fn default() -> Self { + Self(0) + } +} + +#[derive(Default, Clone)] +pub struct ClientHasherBuilder; + +impl BuildHasher for ClientHasherBuilder { + type Hasher = ClientHasher; + + fn build_hasher(&self) -> Self::Hasher { + ClientHasher::default() + } +} + +// use ClientID as key +pub type ClientMap = HashMap; diff --git a/libs/jwst-codec/src/doc/history.rs b/libs/jwst-codec/src/doc/history.rs index 3f6faa27..923b80c2 100644 --- a/libs/jwst-codec/src/doc/history.rs +++ b/libs/jwst-codec/src/doc/history.rs @@ -1,7 +1,4 @@ -use std::{ - collections::{HashMap, VecDeque}, - sync::Arc, -}; +use std::{collections::VecDeque, sync::Arc}; use super::{store::StoreRef, *}; use crate::sync::RwLock; @@ -72,8 +69,8 @@ impl StoreHistory { pub fn parse_delete_sets( &self, - old_sets: &HashMap, - new_sets: &HashMap, + old_sets: &ClientMap, + new_sets: &ClientMap, ) -> Vec { let store = self.store.read().unwrap(); let deleted_items = new_sets diff --git a/libs/jwst-codec/src/doc/mod.rs b/libs/jwst-codec/src/doc/mod.rs index 6662b506..751f7f68 100644 --- a/libs/jwst-codec/src/doc/mod.rs +++ b/libs/jwst-codec/src/doc/mod.rs @@ -2,16 +2,19 @@ mod awareness; mod codec; mod common; mod document; +mod hasher; mod history; mod publisher; mod store; mod types; mod utils; +pub use ahash::{HashMap, HashMapExt, HashSet, HashSetExt}; pub use awareness::{Awareness, AwarenessEvent}; pub use codec::*; pub use common::*; pub use document::{Doc, DocOptions}; +pub use hasher::ClientMap; pub use history::{History, HistoryOptions, StoreHistory}; pub(crate) use store::DocStore; pub use types::*; diff --git a/libs/jwst-codec/src/doc/store.rs b/libs/jwst-codec/src/doc/store.rs index 60171842..ec1f5556 100644 --- a/libs/jwst-codec/src/doc/store.rs +++ b/libs/jwst-codec/src/doc/store.rs @@ -1,5 +1,5 @@ use std::{ - collections::{hash_map::Entry, HashMap, HashSet, VecDeque}, + collections::{hash_map::Entry, VecDeque}, mem, ops::{Deref, Range}, }; @@ -16,7 +16,7 @@ unsafe impl Sync for DocStore {} #[derive(Default, Debug)] pub(crate) struct DocStore { client: Client, - pub items: HashMap>, + pub items: ClientMap>, pub delete_set: DeleteSet, // following fields are only used in memory @@ -101,7 +101,7 @@ impl DocStore { Self::items_as_state_vector(&self.items) } - fn items_as_state_vector(items: &HashMap>) -> StateVector { + fn items_as_state_vector(items: &ClientMap>) -> StateVector { let mut state = StateVector::default(); for (client, structs) in items.iter() { if let Some(last_struct) = structs.back() { @@ -600,17 +600,25 @@ impl DocStore { } pub fn delete_item(&mut self, item: &Item, parent: Option<&mut YType>) { - Self::delete_item_inner(&mut self.delete_set, item, parent); + let mut pending_delete_sets = HashMap::new(); + Self::delete_item_inner(&mut pending_delete_sets, item, parent); + for (client, ranges) in pending_delete_sets { + self.delete_set.batch_push(client, ranges); + } } - fn delete_item_inner(delete_set: &mut DeleteSet, item: &Item, parent: Option<&mut YType>) { + fn delete_item_inner(delete_set: &mut HashMap>>, item: &Item, parent: Option<&mut YType>) { // 1. mark item as deleted, if item is gced, return if !item.delete() { return; } // 2. add it to delete set - delete_set.add(item.id.client, item.id.clock, item.len()); + let range = item.id.clock..item.id.clock + item.len(); + delete_set + .entry(item.id.client) + .and_modify(|v| v.push(range.clone())) + .or_insert(vec![range]); // 3. adjust parent length if item.parent_sub.is_none() && item.countable() { @@ -680,6 +688,7 @@ impl DocStore { } }; + let mut pending_delete_sets = HashMap::new(); while idx < items.len() { let node = items[idx].clone(); let id = node.id(); @@ -694,7 +703,7 @@ impl DocStore { DocStore::split_node_at(items, idx, end - id.clock)?; } - Self::delete_item_inner(&mut self.delete_set, item, None); + Self::delete_item_inner(&mut pending_delete_sets, item, None); } } } else { @@ -703,6 +712,9 @@ impl DocStore { idx += 1; } + for (client, ranges) in pending_delete_sets { + self.delete_set.batch_push(client, ranges); + } } } @@ -746,13 +758,10 @@ impl DocStore { Ok(update) } - fn diff_structs( - map: &HashMap>, - sv: &StateVector, - ) -> JwstCodecResult>> { + fn diff_structs(map: &ClientMap>, sv: &StateVector) -> JwstCodecResult>> { let local_state_vector = Self::items_as_state_vector(map); let diff = Self::diff_state_vectors(&local_state_vector, sv); - let mut update_structs: HashMap> = HashMap::new(); + let mut update_structs = ClientMap::new(); for (client, clock) in diff { // We have made sure that the client is in the local state vector in @@ -787,14 +796,20 @@ impl DocStore { Ok(update_structs) } - fn generate_delete_set(refs: &HashMap>) -> DeleteSet { + fn generate_delete_set(refs: &ClientMap>) -> DeleteSet { let mut delete_set = DeleteSet::default(); for (client, nodes) in refs { - for node in nodes { - if node.deleted() { - delete_set.add(*client, node.id().clock, node.len()); - } + let ranges = nodes + .iter() + .filter(|n| n.deleted()) + .map(|n| { + let clock = n.id().clock; + clock..clock + n.len() + }) + .collect::>(); + if ranges.len() > 0 { + delete_set.batch_push(*client, ranges); } } diff --git a/libs/jwst-codec/src/doc/types/mod.rs b/libs/jwst-codec/src/doc/types/mod.rs index f3a94fab..6a4a9f38 100644 --- a/libs/jwst-codec/src/doc/types/mod.rs +++ b/libs/jwst-codec/src/doc/types/mod.rs @@ -5,10 +5,7 @@ mod text; mod value; mod xml; -use std::{ - collections::{hash_map::Entry, HashMap}, - sync::Weak, -}; +use std::{collections::hash_map::Entry, sync::Weak}; pub use array::*; use list::*; diff --git a/libs/jwst-codec/src/lib.rs b/libs/jwst-codec/src/lib.rs index fad394b3..227705a3 100644 --- a/libs/jwst-codec/src/lib.rs +++ b/libs/jwst-codec/src/lib.rs @@ -7,11 +7,11 @@ mod sync; pub use codec::*; pub use doc::{ decode_maybe_update_with_guid, decode_update_with_guid, encode_awareness_as_message, encode_update_as_message, - encode_update_with_guid, merge_updates_v1, Any, Array, Awareness, AwarenessEvent, Client, Clock, CrdtRead, - CrdtReader, CrdtWrite, CrdtWriter, Doc, DocOptions, History, HistoryOptions, Id, Map, RawDecoder, RawEncoder, - StateVector, StoreHistory, Text, Update, Value, + encode_update_with_guid, merge_updates_v1, Any, Array, Awareness, AwarenessEvent, Client, ClientMap, Clock, + CrdtRead, CrdtReader, CrdtWrite, CrdtWriter, Doc, DocOptions, HashMap as AHashMap, HashMapExt, History, + HistoryOptions, Id, Map, RawDecoder, RawEncoder, StateVector, StoreHistory, Text, Update, Value, }; -pub(crate) use doc::{Content, Item, HASHMAP_SAFE_CAPACITY}; +pub(crate) use doc::{Content, Item}; pub use jwst_logger::{debug, error, info, trace, warn}; use nom::IResult; pub use protocol::{ diff --git a/libs/jwst-core/src/lib.rs b/libs/jwst-core/src/lib.rs index 0f046f4e..403c7561 100644 --- a/libs/jwst-core/src/lib.rs +++ b/libs/jwst-core/src/lib.rs @@ -7,7 +7,7 @@ mod workspaces; pub mod constants; pub use block::Block; -pub use jwst_codec::{Any, History, HistoryOptions}; +pub use jwst_codec::{AHashMap, Any, HashMapExt, History, HistoryOptions}; pub use space::Space; pub use tracing::{debug, error, info, log::LevelFilter, trace, warn}; pub use types::{BlobMetadata, BlobStorage, BucketBlobStorage, DocStorage, JwstError, JwstResult}; diff --git a/libs/jwst-core/src/workspaces/metadata/meta.rs b/libs/jwst-core/src/workspaces/metadata/meta.rs index 4d77350c..3c0521e6 100644 --- a/libs/jwst-core/src/workspaces/metadata/meta.rs +++ b/libs/jwst-core/src/workspaces/metadata/meta.rs @@ -1,5 +1,3 @@ -use std::collections::HashMap; - use jwst_codec::{Any, Array, Map}; use serde::{Deserialize, Serialize}; @@ -22,7 +20,7 @@ impl From for WorkspaceMetadata { impl From for Any { fn from(val: WorkspaceMetadata) -> Self { - let mut map = HashMap::new(); + let mut map = AHashMap::new(); if let Some(name) = val.name { map.insert(constants::metadata::NAME.to_owned(), name.into()); } diff --git a/libs/jwst-core/src/workspaces/mod.rs b/libs/jwst-core/src/workspaces/mod.rs index 8636c256..20cc9469 100644 --- a/libs/jwst-core/src/workspaces/mod.rs +++ b/libs/jwst-core/src/workspaces/mod.rs @@ -7,4 +7,4 @@ mod workspace; pub use metadata::{Pages, WorkspaceMetadata}; pub use workspace::Workspace; -use super::{constants, info, trace, warn, JwstResult, Space}; +use super::{constants, info, trace, warn, AHashMap, HashMapExt, JwstResult, Space};