Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

assemble runtime crdt state #408

Merged
merged 18 commits into from
Apr 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions libs/jwst-codec/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,14 @@ license = "AGPL-3.0-only"
[dependencies]
bitvec = "1.0.1"
byteorder = "1.4.3"
nanoid = "0.4.0"
nom = "7.1.3"
rand = "0.8.5"
serde_json = "1.0.94"
thiserror = "1.0.40"

# ======= workspace dependencies =======
jwst-logger = { path = "../jwst-logger" }

[dev-dependencies]
criterion = { version = "0.4.0", features = ["html_reports"] }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use nom::{
use super::*;
use std::collections::HashMap;

#[derive(Debug)]
#[derive(Debug, Clone, PartialEq)]
pub enum Any {
Undefined,
Null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use nom::{
};
use serde_json::Value as JsonValue;

#[derive(Debug)]
#[derive(Debug, Clone, PartialEq)]
pub enum YType {
Array,
Map,
Expand All @@ -17,7 +17,7 @@ pub enum YType {
XmlHook(String),
}

#[derive(Debug)]
#[derive(Debug, Clone, PartialEq)]
pub enum Content {
Deleted(u64),
JSON(Vec<Option<String>>),
Expand All @@ -30,6 +30,27 @@ pub enum Content {
Doc { guid: String, opts: Vec<Any> },
}

impl Content {
pub fn clock_len(&self) -> u64 {
match self {
Content::Deleted(len) => *len,
Content::JSON(strings) => strings.len() as u64,
Content::String(string) => string.len() as u64,
Content::Any(any) => any.len() as u64,
Content::Binary(_)
| Content::Embed(_)
| Content::Format { .. }
| Content::Type(_)
| Content::Doc { .. } => 1,
}
}

pub fn split(&self, diff: u64) -> JwstCodecResult<(Content, Content)> {
// TODO: implement split for other types
Err(JwstCodecError::ContentSplitNotSupport(diff))
}
}

pub fn read_content(input: &[u8], tag_type: u8) -> IResult<&[u8], Content> {
match tag_type {
1 => {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
use super::*;

#[derive(Debug)]
#[derive(Debug, Clone, PartialEq)]
pub struct Id {
pub client: u64,
pub clock: u64,
}

impl Id {
pub fn new(client: u64, clock: u64) -> Self {
Self { client, clock }
}
}

pub fn read_item_id(input: &[u8]) -> IResult<&[u8], Id> {
let (tail, client) = read_var_u64(input)?;
let (tail, clock) = read_var_u64(tail)?;
Ok((tail, Id { client, clock }))
Ok((tail, Id::new(client, clock)))
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
use super::*;

#[derive(Debug)]
#[derive(Debug, Clone, PartialEq)]
pub enum Parent {
String(String),
Id(Id),
}

#[derive(Debug)]
#[derive(Debug, Clone, PartialEq)]
pub struct Item {
pub info: u8,
pub left_id: Option<Id>,
pub right_id: Option<Id>,
pub parent: Option<Parent>,
Expand All @@ -32,7 +31,6 @@ pub fn read_item(input: &[u8], info: u8, first_5_bit: u8) -> IResult<&[u8], Item
// NOTE: read order must keep the same as the order in yjs
// TODO: this data structure design will break the cpu OOE, need to be optimized
let item = Item {
info,
left_id: if has_left_id {
let (tail, id) = read_item_id(input)?;
input = tail;
Expand Down
20 changes: 20 additions & 0 deletions libs/jwst-codec/src/doc/codec/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
mod any;
mod content;
mod id;
mod item;
mod refs;
mod update;

pub use any::Any;
pub use content::Content;
pub use id::Id;
pub use item::Item;
pub use refs::StructInfo;
pub use update::{read_update, Update};

use super::*;
use any::read_any;
use content::read_content;
use id::read_item_id;
use item::read_item;
use refs::read_client_struct_refs;
177 changes: 177 additions & 0 deletions libs/jwst-codec/src/doc/codec/refs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
use super::*;
use nom::{multi::count, number::complete::be_u8};
use std::collections::HashMap;

enum RawStructInfo {
GC(u64),
Skip(u64),
Item(Item),
}

struct RawRefs {
client: u64,
refs: Vec<StructInfo>,
}

#[derive(Debug, Clone, PartialEq)]
pub enum StructInfo {
GC { id: Id, len: u64 },
Skip { id: Id, len: u64 },
Item { id: Id, item: Item },
}

impl StructInfo {
pub fn id(&self) -> &Id {
match self {
StructInfo::GC { id, .. } => id,
StructInfo::Skip { id, .. } => id,
StructInfo::Item { id, .. } => id,
}
}

pub fn client_id(&self) -> u64 {
self.id().client
}

pub fn clock(&self) -> u64 {
self.id().clock
}

pub fn len(&self) -> u64 {
match self {
StructInfo::GC { len, .. } => *len,
StructInfo::Skip { len, .. } => *len,
StructInfo::Item { item, .. } => item.content.clock_len(),
}
}

pub fn is_gc(&self) -> bool {
matches!(self, StructInfo::GC { .. })
}

pub fn is_skip(&self) -> bool {
matches!(self, StructInfo::Skip { .. })
}

pub fn is_item(&self) -> bool {
matches!(self, StructInfo::Item { .. })
}

pub fn split_item(&mut self, diff: u64) -> JwstCodecResult<(Self, Self)> {
if let Self::Item { id, item } = self {
let right_id = Id::new(id.client, id.clock + diff);
let (left_content, right_content) = item.content.split(diff)?;

let left_item = StructInfo::Item {
id: id.clone(),
item: Item {
right_id: Some(right_id.clone()),
content: left_content,
..item.clone()
},
};

let right_item = StructInfo::Item {
id: right_id,
item: Item {
left_id: Some(Id::new(id.client, id.clock + diff - 1)),
right_id: item.right_id.clone(),
parent: item.parent.clone(),
parent_sub: item.parent_sub.clone(),
content: right_content,
},
};

Ok((left_item, right_item))
} else {
Err(JwstCodecError::ItemSplitNotSupport)
}
}
}

fn read_struct(input: &[u8]) -> IResult<&[u8], RawStructInfo> {
let (input, info) = be_u8(input)?;
let first_5_bit = info & 0b11111;

match first_5_bit {
0 => {
let (input, len) = read_var_u64(input)?;
Ok((input, RawStructInfo::GC(len)))
}
10 => {
let (input, len) = read_var_u64(input)?;
Ok((input, RawStructInfo::Skip(len)))
}
_ => {
let (input, item) = read_item(input, info, first_5_bit)?;
Ok((input, RawStructInfo::Item(item)))
}
}
}

fn read_refs(input: &[u8]) -> IResult<&[u8], RawRefs> {
let (input, num_of_structs) = read_var_u64(input)?;
let (input, client) = read_var_u64(input)?;
let (input, clock) = read_var_u64(input)?;
let (input, structs) = count(read_struct, num_of_structs as usize)(input)?;
let (refs, _) = structs
.into_iter()
.fold((vec![], clock), |(mut vec, clock), s| {
let id = Id::new(client, clock);
match s {
RawStructInfo::GC(len) => {
vec.push(StructInfo::GC { id, len });
(vec, clock + len)
}
RawStructInfo::Skip(len) => {
vec.push(StructInfo::Skip { id, len });
(vec, clock + len)
}
RawStructInfo::Item(item) => {
let len = item.content.clock_len();
vec.push(StructInfo::Item { id, item });
(vec, clock + len)
}
}
});

Ok((input, RawRefs { client, refs }))
}

pub fn read_client_struct_refs(input: &[u8]) -> IResult<&[u8], HashMap<u64, Vec<StructInfo>>> {
let (input, num_of_updates) = read_var_u64(input)?;
let (tail, updates) = count(read_refs, num_of_updates as usize)(input)?;

Ok((
tail,
updates.into_iter().map(|u| (u.client, u.refs)).collect(),
))
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_struct_info() {
{
let struct_info = StructInfo::GC {
id: Id::new(1, 0),
len: 10,
};
assert_eq!(struct_info.len(), 10);
assert_eq!(struct_info.client_id(), 1);
assert_eq!(struct_info.clock(), 0);
}

{
let struct_info = StructInfo::Skip {
id: Id::new(2, 0),
len: 20,
};
assert_eq!(struct_info.len(), 20);
assert_eq!(struct_info.client_id(), 2);
assert_eq!(struct_info.clock(), 0);
}
}
}
54 changes: 54 additions & 0 deletions libs/jwst-codec/src/doc/codec/update.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use super::*;
use nom::multi::count;
use std::collections::HashMap;

#[derive(Debug)]
pub struct Delete {
pub clock: u64,
pub clock_len: u64,
}

#[derive(Debug)]
pub struct DeleteSets {
pub client: u64,
pub deletes: Vec<Delete>,
}

#[derive(Debug)]
pub struct Update {
pub delete_sets: Vec<DeleteSets>,
pub structs: HashMap<u64, Vec<StructInfo>>,
}

fn read_delete(input: &[u8]) -> IResult<&[u8], Delete> {
let (tail, clock) = read_var_u64(input)?;
let (tail, clock_len) = read_var_u64(tail)?;
Ok((tail, Delete { clock, clock_len }))
}

fn parse_delete_set(input: &[u8]) -> IResult<&[u8], DeleteSets> {
let (input, client) = read_var_u64(input)?;
let (input, num_of_deletes) = read_var_u64(input)?;
let (tail, deletes) = count(read_delete, num_of_deletes as usize)(input)?;

Ok((tail, DeleteSets { client, deletes }))
}

fn read_delete_set(input: &[u8]) -> IResult<&[u8], Vec<DeleteSets>> {
let (input, num_of_clients) = read_var_u64(input)?;
let (tail, deletes) = count(parse_delete_set, num_of_clients as usize)(input)?;

Ok((tail, deletes))
}

pub fn read_update(input: &[u8]) -> IResult<&[u8], Update> {
let (tail, structs) = read_client_struct_refs(input)?;
let (tail, delete_sets) = read_delete_set(tail)?;
Ok((
tail,
Update {
structs,
delete_sets,
},
))
}
Loading