Skip to content

Commit

Permalink
Restructure to move back-end related logic out of main.rs and to co…
Browse files Browse the repository at this point in the history
…ndense `chathistory` logic into fewer locations.
  • Loading branch information
andymandias committed Nov 11, 2024
1 parent 44d70ec commit a3bfce5
Show file tree
Hide file tree
Showing 3 changed files with 298 additions and 246 deletions.
180 changes: 151 additions & 29 deletions data/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use chrono::{DateTime, Utc};
use futures::channel::mpsc;
use futures::{channel::mpsc, Future, FutureExt};
use irc::proto::{self, command, Command};
use itertools::{Either, Itertools};
use std::cmp::Ordering;
Expand Down Expand Up @@ -89,6 +89,14 @@ pub enum Broadcast {
},
}

#[derive(Debug)]
pub enum Message {
ChatHistoryRequest(Server, ChatHistorySubcommand),
ChatHistoryTargetsTimestampUpdated(Server, DateTime<Utc>, Result<(), Error>),
RequestNewerChatHistory(Server, String, DateTime<Utc>),
RequestChatHistoryTargets(Server, Option<DateTime<Utc>>, DateTime<Utc>),
}

#[derive(Debug)]
pub enum Event {
Single(message::Encoded, Nick),
Expand All @@ -99,10 +107,8 @@ pub enum Event {
UpdateReadMarker(String, ReadMarker),
JoinedChannel(String, DateTime<Utc>),
ChatHistoryAcknowledged(DateTime<Utc>),
ChatHistoryRequest(ChatHistorySubcommand),
ChatHistoryRequestReceived(ChatHistorySubcommand, usize),
ChatHistoryTarget(String, DateTime<Utc>),
ChatHistoryTargetsReceived(ChatHistorySubcommand, usize, DateTime<Utc>),
ChatHistoryTargetReceived(String, DateTime<Utc>),
ChatHistoryTargetsReceived(DateTime<Utc>),
}

struct ChatHistoryRequest {
Expand Down Expand Up @@ -327,7 +333,9 @@ impl Client {
} else {
match &finished.chathistory {
Some(ChatHistoryBatch::Target(batch_target)) => {
if let Some(ChatHistoryRequest { subcommand, .. }) =
let continuation_subcommand = if let Some(
ChatHistoryRequest { subcommand, .. },
) =
self.chathistory_requests.get(batch_target)
{
if let ChatHistorySubcommand::Before(_, _, limit) =
Expand All @@ -339,12 +347,20 @@ impl Client {
);
}

let continue_chathistory = match subcommand {
match subcommand {
ChatHistorySubcommand::Latest(
target,
message_reference,
limit,
) => {
log::debug!(
"[{}] received latest {} messages in {} since {}",
self.server,
finished.events.len(),
target,
message_reference,
);

if matches!(
message_reference,
MessageReference::None
Expand All @@ -363,13 +379,36 @@ impl Client {
None
}
}
ChatHistorySubcommand::Before(_, _, _) => None,
ChatHistorySubcommand::Between(
ChatHistorySubcommand::Before(
target,
message_reference,
_,
) => {
log::debug!(
"[{}] received {} messages in {} before {}",
self.server,
finished.events.len(),
target,
message_reference,
);

None
}
ChatHistorySubcommand::Between(
target,
start_message_reference,
end_message_reference,
limit,
) => {
log::debug!(
"[{}] received {} messages in {} between {} and {}",
self.server,
finished.events.len(),
target,
start_message_reference,
end_message_reference,
);

if finished.events.len() == *limit as usize {
continue_chathistory_between(
target,
Expand All @@ -381,35 +420,54 @@ impl Client {
None
}
}
ChatHistorySubcommand::Targets(_, _, _) => None,
};
ChatHistorySubcommand::Targets(_, _, _) => {
log::debug!(
"[{}] chathistory batch received for TARGETS request (draft/chathistory-targets batch expected)",
self.server
);

finished.events.push(
Event::ChatHistoryRequestReceived(
subcommand.clone(),
finished.events.len(),
),
);

if let Some(subcommand) = continue_chathistory {
finished
.events
.push(Event::ChatHistoryRequest(subcommand));
None
}
}
} else {
None
};

self.clear_chathistory_request(Some(batch_target));

if let Some(continuation_subcommand) =
continuation_subcommand
{
self.send_chathistory_request(continuation_subcommand);
}
}
Some(ChatHistoryBatch::Targets) => {
if let Some(ChatHistoryRequest { subcommand, .. }) =
&self.chathistory_targets_request
{
finished.events.push(
Event::ChatHistoryTargetsReceived(
subcommand.clone(),
if let ChatHistorySubcommand::Targets(
start_message_reference,
end_message_reference,
_,
) = subcommand
{
log::debug!(
"[{}] received {} targets between {} and {}",
self.server,
finished.events.len(),
server_time(&message),
),
start_message_reference,
end_message_reference,
);
}

finished.events.push(
Event::ChatHistoryTargetsReceived(server_time(
&message,
)),
);
}

self.clear_chathistory_request(None);
}
_ => (),
}
Expand Down Expand Up @@ -1521,13 +1579,13 @@ impl Client {
self.statusmsg(),
) {
if !prefixes.is_empty() && self.chanmap.contains_key(&channel) {
return Ok(vec![Event::ChatHistoryTarget(
return Ok(vec![Event::ChatHistoryTargetReceived(
target.clone(),
server_time(&message),
)]);
}
} else {
return Ok(vec![Event::ChatHistoryTarget(
return Ok(vec![Event::ChatHistoryTargetReceived(
target.clone(),
server_time(&message),
)]);
Expand Down Expand Up @@ -1749,6 +1807,52 @@ impl Client {
.unwrap_or_default()
}

pub fn load_chathistory_targets_timestamp(
&self,
server_time: DateTime<Utc>,
) -> impl Future<Output = Message> {
let server = self.server.clone();

let limit = self.chathistory_limit();

async move {
let timestamp = load_chathistory_targets_timestamp(server.clone())
.await
.ok()
.flatten();

let start_message_reference = timestamp.map_or(MessageReference::None, |timestamp| {
MessageReference::Timestamp(timestamp)
});

let end_message_reference = MessageReference::Timestamp(server_time);

Message::ChatHistoryRequest(
server,
ChatHistorySubcommand::Targets(
start_message_reference,
end_message_reference,
limit,
),
)
}
.boxed()
}

pub fn overwrite_chathistory_targets_timestamp(
&self,
timestamp: DateTime<Utc>,
) -> impl Future<Output = Message> {
let server = self.server.clone();

async move {
let result = overwrite_chathistory_targets_timestamp(server.clone(), timestamp).await;

Message::ChatHistoryTargetsTimestampUpdated(server, timestamp, result)
}
.boxed()
}

fn sync(&mut self) {
self.channels = self
.chanmap
Expand Down Expand Up @@ -2212,6 +2316,24 @@ impl Map {
})
}

pub fn load_chathistory_targets_timestamp(
&self,
server: &Server,
server_time: DateTime<Utc>,
) -> Option<impl Future<Output = Message>> {
self.client(server)
.map(|client| client.load_chathistory_targets_timestamp(server_time))
}

pub fn overwrite_chathistory_targets_timestamp(
&self,
server: &Server,
server_time: DateTime<Utc>,
) -> Option<impl Future<Output = Message>> {
self.client(server)
.map(|client| client.overwrite_chathistory_targets_timestamp(server_time))
}

pub fn get_server_handle(&self, server: &Server) -> Option<&server::Handle> {
self.client(server).map(|client| &client.handle)
}
Expand Down
Loading

0 comments on commit a3bfce5

Please sign in to comment.