Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug fixing 01 #1885

Merged
merged 8 commits into from
Aug 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub async fn run_source<S: ByteSource>(
operation_api: OperationAPI,
state: SessionStateAPI,
source: S,
source_id: u8,
source_id: u16,
parser: &ParserType,
rx_sde: Option<SdeReceiver>,
rx_tail: Option<Receiver<Result<(), tail::Error>>>,
Expand Down Expand Up @@ -73,7 +73,7 @@ pub async fn run_source<S: ByteSource>(
async fn run_producer<T: LogMessage, P: Parser<T>, S: ByteSource>(
operation_api: OperationAPI,
state: SessionStateAPI,
source_id: u8,
source_id: u16,
mut producer: MessageProducer<T, P, S>,
mut rx_tail: Option<Receiver<Result<(), tail::Error>>>,
) -> OperationResult<()> {
Expand Down
7 changes: 7 additions & 0 deletions application/apps/indexer/session/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,13 @@ impl Session {
.map_err(ComputationError::NativeError)
}

pub async fn get_indexed_ranges(&self) -> Result<Vec<RangeInclusive<u64>>, ComputationError> {
self.state
.get_indexed_ranges()
.await
.map_err(ComputationError::NativeError)
}

/// Used for debug goals
pub fn sleep(&self, operation_id: Uuid, ms: u64) -> Result<(), ComputationError> {
self.tx_operations
Expand Down
23 changes: 15 additions & 8 deletions application/apps/indexer/session/src/state/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ use super::values::graph::CandlePoint;
pub enum Api {
SetSessionFile((Option<PathBuf>, oneshot::Sender<Result<(), NativeError>>)),
GetSessionFile(oneshot::Sender<Result<PathBuf, NativeError>>),
WriteSessionFile((u8, String, oneshot::Sender<Result<(), NativeError>>)),
WriteSessionFile((u16, String, oneshot::Sender<Result<(), NativeError>>)),
FlushSessionFile(oneshot::Sender<Result<(), NativeError>>),
GetSessionFileOrigin(oneshot::Sender<Result<Option<SessionFileOrigin>, NativeError>>),
UpdateSession((u8, oneshot::Sender<Result<bool, NativeError>>)),
AddSource((String, oneshot::Sender<u8>)),
GetSource((String, oneshot::Sender<Option<u8>>)),
UpdateSession((u16, oneshot::Sender<Result<bool, NativeError>>)),
AddSource((String, oneshot::Sender<u16>)),
GetSource((String, oneshot::Sender<Option<u16>>)),
GetSourcesDefinitions(oneshot::Sender<Vec<SourceDefinition>>),
#[allow(clippy::large_enum_variant)]
AddExecutedObserve((ObserveOptions, oneshot::Sender<()>)),
Expand Down Expand Up @@ -139,6 +139,7 @@ pub enum Api {
),
),
DropSearchValues(oneshot::Sender<bool>),
GetIndexedRanges(oneshot::Sender<Vec<RangeInclusive<u64>>>),
CloseSession(oneshot::Sender<()>),
SetDebugMode((bool, oneshot::Sender<()>)),
NotifyCancelingOperation(Uuid),
Expand Down Expand Up @@ -194,6 +195,7 @@ impl Display for Api {
Self::SetSearchValues(_, _) => "SetSearchValues",
Self::GetSearchValues(_) => "GetSearchValues",
Self::DropSearchValues(_) => "DropSearchValues",
Self::GetIndexedRanges(_) => "GetIndexedRanges",
Self::CloseSession(_) => "CloseSession",
Self::SetDebugMode(_) => "SetDebugMode",
Self::NotifyCancelingOperation(_) => "NotifyCancelingOperation",
Expand Down Expand Up @@ -368,7 +370,7 @@ impl SessionStateAPI {
self.exec_operation(Api::GetSessionFile(tx), rx).await?
}

pub async fn write_session_file(&self, source_id: u8, msg: String) -> Result<(), NativeError> {
pub async fn write_session_file(&self, source_id: u16, msg: String) -> Result<(), NativeError> {
let (tx, rx) = oneshot::channel();
self.exec_operation(Api::WriteSessionFile((source_id, msg, tx)), rx)
.await?
Expand All @@ -385,19 +387,19 @@ impl SessionStateAPI {
.await?
}

pub async fn update_session(&self, source_id: u8) -> Result<bool, NativeError> {
pub async fn update_session(&self, source_id: u16) -> Result<bool, NativeError> {
let (tx, rx) = oneshot::channel();
self.exec_operation(Api::UpdateSession((source_id, tx)), rx)
.await?
}

pub async fn add_source(&self, uuid: &str) -> Result<u8, NativeError> {
pub async fn add_source(&self, uuid: &str) -> Result<u16, NativeError> {
let (tx, rx) = oneshot::channel();
self.exec_operation(Api::AddSource((uuid.to_owned(), tx)), rx)
.await
}

pub async fn get_source(&self, uuid: &str) -> Result<Option<u8>, NativeError> {
pub async fn get_source(&self, uuid: &str) -> Result<Option<u16>, NativeError> {
let (tx, rx) = oneshot::channel();
self.exec_operation(Api::GetSource((uuid.to_owned(), tx)), rx)
.await
Expand Down Expand Up @@ -545,6 +547,11 @@ impl SessionStateAPI {
self.exec_operation(Api::DropSearchValues(tx), rx).await
}

pub async fn get_indexed_ranges(&self) -> Result<Vec<RangeInclusive<u64>>, NativeError> {
let (tx, rx) = oneshot::channel();
self.exec_operation(Api::GetIndexedRanges(tx), rx).await
}

pub async fn close_session(&self) -> Result<(), NativeError> {
self.closing_token.cancel();
self.tracker.cancel_all().await?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,10 @@ impl Controller {
self.map.frame(range)
}

pub(crate) fn get_all_as_ranges(&self) -> Vec<RangeInclusive<u64>> {
self.map.get_all_as_ranges()
}

pub(crate) fn len(&self) -> usize {
self.map.len()
}
Expand Down
50 changes: 50 additions & 0 deletions application/apps/indexer/session/src/state/indexes/keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,28 @@ impl Keys {
sorted: false,
}
}

pub fn as_ranges(&self) -> Vec<RangeInclusive<u64>> {
let mut ranges = vec![];
let mut from: u64 = 0;
let mut to: u64 = 0;
for (i, value) in self.keys.iter().enumerate() {
if i == 0 {
from = *value;
} else if to + 1 != *value {
ranges.push(RangeInclusive::new(from, to));
from = *value;
}
to = *value;
}
if (!ranges.is_empty() && ranges[ranges.len() - 1].start() != &from)
|| (ranges.is_empty() && !self.keys.is_empty())
{
ranges.push(RangeInclusive::new(from, to));
}
ranges
}
DmitryAstafyev marked this conversation as resolved.
Show resolved Hide resolved

pub fn add(&mut self, position: u64) {
self.keys.push(position);
self.sorted = false;
Expand Down Expand Up @@ -198,4 +220,32 @@ mod test {
keys.remove(&7);
assert_eq!(keys.first(), None);
}

#[test]
fn test_keys_004() {
use std::ops::RangeInclusive;
let mut keys = super::Keys::new();
assert_eq!(keys.first(), None);
keys.add(100);
keys.add(101);
keys.add(102);
keys.add(103);
keys.add(104);
keys.add(105);
keys.add(13);
keys.add(12);
keys.add(11);
keys.add(10);
keys.add(5);
keys.add(4);
keys.add(3);
keys.add(2);
keys.add(1);
keys.sort();
let ranges = keys.as_ranges();
assert_eq!(ranges.len(), 3);
assert_eq!(ranges[0], RangeInclusive::new(1, 5));
assert_eq!(ranges[1], RangeInclusive::new(10, 13));
assert_eq!(ranges[2], RangeInclusive::new(100, 105));
}
}
4 changes: 4 additions & 0 deletions application/apps/indexer/session/src/state/indexes/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ impl Map {
Some(ranges)
}

pub fn get_all_as_ranges(&self) -> Vec<RangeInclusive<u64>> {
self.keys.as_ranges()
}

fn index_add(&mut self, position: u64, nature: Nature) {
if self.indexes.insert(position, nature).is_none() {
self.keys.add(position);
Expand Down
11 changes: 9 additions & 2 deletions application/apps/indexer/session/src/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ impl SessionState {

async fn handle_write_session_file(
&mut self,
source_id: u8,
source_id: u16,
state_cancellation_token: CancellationToken,
tx_callback_events: UnboundedSender<CallbackEvent>,
msg: String,
Expand Down Expand Up @@ -185,7 +185,7 @@ impl SessionState {

async fn handle_update_session(
&mut self,
source_id: u8,
source_id: u16,
state_cancellation_token: CancellationToken,
tx_callback_events: UnboundedSender<CallbackEvent>,
) -> Result<bool, NativeError> {
Expand Down Expand Up @@ -674,6 +674,13 @@ pub async fn run(
NativeError::channel("Failed to respond to Api::DropSearchValues")
})?;
}
Api::GetIndexedRanges(tx_response) => {
tx_response
.send(state.indexes.get_all_as_ranges())
.map_err(|_| {
NativeError::channel("Failed to respond to Api::GetIndexedRanges")
})?;
}
Api::CloseSession(tx_response) => {
state_cancellation_token.cancel();
state.status = Status::Closed;
Expand Down
6 changes: 3 additions & 3 deletions application/apps/indexer/session/src/state/session_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub const FLUSH_DATA_IN_MS: u128 = 500;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct GrabbedElement {
#[serde(rename = "id")]
pub source_id: u8,
pub source_id: u16,
#[serde(rename = "c")]
pub content: String,
#[serde(rename = "p")]
Expand Down Expand Up @@ -148,7 +148,7 @@ impl SessionFile {

pub fn write(
&mut self,
source_id: u8,
source_id: u16,
state_cancellation_token: CancellationToken,
msg: String,
) -> Result<SessionFileState, NativeError> {
Expand Down Expand Up @@ -201,7 +201,7 @@ impl SessionFile {

pub fn update(
&mut self,
source_id: u8,
source_id: u16,
state_cancellation_token: CancellationToken,
) -> Result<SessionFileState, NativeError> {
let grabber = &mut (self.grabber.as_mut().ok_or(NativeError {
Expand Down
32 changes: 16 additions & 16 deletions application/apps/indexer/session/src/state/source_ids.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, ops::RangeInclusive};
pub struct MappedRanges<'a> {
ranges: Vec<&'a (RangeInclusive<u64>, u8)>,
ranges: Vec<&'a (RangeInclusive<u64>, u16)>,
}

impl<'a> MappedRanges<'a> {
pub fn new(ranges: Vec<&'a (RangeInclusive<u64>, u8)>) -> Self {
pub fn new(ranges: Vec<&'a (RangeInclusive<u64>, u16)>) -> Self {
Self { ranges }
}

pub fn source(&self, line: u64) -> Option<u8> {
pub fn source(&self, line: u64) -> Option<u16> {
self.ranges.iter().find_map(|(range, source_id)| {
if range.contains(&line) {
Some(*source_id)
Expand All @@ -22,15 +22,15 @@ impl<'a> MappedRanges<'a> {

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct SourceDefinition {
pub id: u8,
pub id: u16,
pub alias: String,
}

#[derive(Debug)]
pub struct SourceIDs {
pub sources: HashMap<u8, String>,
pub map: Vec<(RangeInclusive<u64>, u8)>,
pub recent: Option<u8>,
pub sources: HashMap<u16, String>,
pub map: Vec<(RangeInclusive<u64>, u16)>,
pub recent: Option<u16>,
}

impl SourceIDs {
Expand All @@ -42,27 +42,27 @@ impl SourceIDs {
}
}

pub fn add_source(&mut self, alias: String) -> u8 {
let key = self.sources.len() as u8;
pub fn add_source(&mut self, alias: String) -> u16 {
let key = self.sources.len() as u16;
self.sources.insert(key, alias);
key
}

pub fn get_source(&mut self, alias: String) -> Option<u8> {
pub fn get_source(&mut self, alias: String) -> Option<u16> {
self.sources
.iter()
.find_map(|(key, val)| if val == &alias { Some(*key) } else { None })
}

pub fn is_source_same(&self, source_id: u8) -> bool {
pub fn is_source_same(&self, source_id: u16) -> bool {
if let Some(id) = self.recent {
id == source_id
} else {
true
}
}

pub fn source_update(&mut self, source_id: u8) {
pub fn source_update(&mut self, source_id: u16) {
let changed = if let Some(id) = self.recent {
id != source_id
} else {
Expand All @@ -73,11 +73,11 @@ impl SourceIDs {
}
}

pub fn get_recent_source_id(&self) -> u8 {
pub fn get_recent_source_id(&self) -> u16 {
if let Some(id) = self.recent {
id
} else {
self.sources.len() as u8
self.sources.len() as u16
}
}

Expand All @@ -91,7 +91,7 @@ impl SourceIDs {
.collect::<Vec<SourceDefinition>>()
}

pub fn add_range(&mut self, range: RangeInclusive<u64>, source_id: u8) {
pub fn add_range(&mut self, range: RangeInclusive<u64>, source_id: u16) {
self.map.push((range, source_id));
}

Expand All @@ -105,7 +105,7 @@ impl SourceIDs {
|| requested.contains(range.start())
|| requested.contains(range.end())
})
.collect::<Vec<&(RangeInclusive<u64>, u8)>>(),
.collect::<Vec<&(RangeInclusive<u64>, u16)>>(),
)
}
}
Expand Down
22 changes: 22 additions & 0 deletions application/apps/rustcore/rs-bindings/src/js/session/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,28 @@ impl RustSession {
}
}

#[node_bindgen]
async fn get_indexed_ranges(&self) -> Result<String, ComputationErrorWrapper> {
if let Some(ref session) = self.session {
let ranges = session
.state
.get_indexed_ranges()
.await
.map_err(|e: NativeError| {
<ComputationError as Into<ComputationErrorWrapper>>::into(
ComputationError::NativeError(e),
)
})?;
Ok(serde_json::to_string(&ranges).map_err(|e| {
ComputationErrorWrapper(ComputationError::IoOperation(e.to_string()))
})?)
} else {
Err(ComputationErrorWrapper(
ComputationError::SessionUnavailable,
))
}
}

#[node_bindgen]
async fn set_debug(&self, debug: bool) -> Result<(), ComputationErrorWrapper> {
if let Some(ref session) = self.session {
Expand Down
Loading
Loading