Skip to content

Commit

Permalink
base: Add media retention policy to EventCacheStore
Browse files Browse the repository at this point in the history
Signed-off-by: Kévin Commaille <[email protected]>
  • Loading branch information
zecakeh committed Aug 30, 2024
1 parent 06fc220 commit 1ff7a2a
Show file tree
Hide file tree
Showing 9 changed files with 1,188 additions and 102 deletions.
10 changes: 5 additions & 5 deletions crates/matrix-sdk-base/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ use crate::RoomMemberships;
use crate::{
deserialized_responses::{RawAnySyncOrStrippedTimelineEvent, SyncTimelineEvent},
error::{Error, Result},
event_cache_store::DynEventCacheStore,
event_cache_store::EventCacheStoreWrapper,
rooms::{
normal::{RoomInfoNotableUpdate, RoomInfoNotableUpdateReasons},
Room, RoomInfo, RoomState,
Expand All @@ -93,7 +93,7 @@ pub struct BaseClient {
/// Database
pub(crate) store: Store,
/// The store used by the event cache.
event_cache_store: Arc<DynEventCacheStore>,
event_cache_store: EventCacheStoreWrapper,
/// The store used for encryption.
///
/// This field is only meant to be used for `OlmMachine` initialization.
Expand Down Expand Up @@ -147,7 +147,7 @@ impl BaseClient {

BaseClient {
store: Store::new(config.state_store),
event_cache_store: config.event_cache_store,
event_cache_store: EventCacheStoreWrapper::new(config.event_cache_store),
#[cfg(feature = "e2e-encryption")]
crypto_store: config.crypto_store,
#[cfg(feature = "e2e-encryption")]
Expand Down Expand Up @@ -222,8 +222,8 @@ impl BaseClient {
}

/// Get a reference to the event cache store.
pub fn event_cache_store(&self) -> &DynEventCacheStore {
&*self.event_cache_store
pub fn event_cache_store(&self) -> &EventCacheStoreWrapper {
&self.event_cache_store
}

/// Is the client logged in.
Expand Down
497 changes: 468 additions & 29 deletions crates/matrix-sdk-base/src/event_cache_store/integration_tests.rs

Large diffs are not rendered by default.

201 changes: 176 additions & 25 deletions crates/matrix-sdk-base/src/event_cache_store/memory_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ use std::{num::NonZeroUsize, sync::RwLock as StdRwLock};

use async_trait::async_trait;
use matrix_sdk_common::ring_buffer::RingBuffer;
use ruma::{MxcUri, OwnedMxcUri};
use ruma::{time::SystemTime, MxcUri, OwnedMxcUri};

use super::{EventCacheStore, EventCacheStoreError, Result};
use super::{EventCacheStore, EventCacheStoreError, MediaRetentionPolicy, Result};
use crate::media::{MediaRequest, UniqueKey as _};

/// In-memory, non-persistent implementation of the `EventCacheStore`.
Expand All @@ -27,15 +27,41 @@ use crate::media::{MediaRequest, UniqueKey as _};
#[allow(clippy::type_complexity)]
#[derive(Debug)]
pub struct MemoryStore {
media: StdRwLock<RingBuffer<(OwnedMxcUri, String /* unique key */, Vec<u8>)>>,
inner: StdRwLock<MemoryStoreInner>,
}

#[derive(Debug)]
struct MemoryStoreInner {
/// The media retention policy to use on cleanups.
media_retention_policy: Option<MediaRetentionPolicy>,
/// Media content.
media: RingBuffer<MediaContent>,
}

// SAFETY: `new_unchecked` is safe because 20 is not zero.
const NUMBER_OF_MEDIAS: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(20) };

/// A media content.
#[derive(Debug, Clone)]
struct MediaContent {
/// The Matrix URI of the media.
uri: OwnedMxcUri,
/// The unique key of the media request.
key: String,
/// The content of the media.
data: Vec<u8>,
/// The last access time of the media.
last_access: SystemTime,
}

impl Default for MemoryStore {
fn default() -> Self {
Self { media: StdRwLock::new(RingBuffer::new(NUMBER_OF_MEDIAS)) }
let inner = MemoryStoreInner {
media_retention_policy: Default::default(),
media: RingBuffer::new(NUMBER_OF_MEDIAS),
};

Self { inner: StdRwLock::new(inner) }
}
}

Expand All @@ -51,53 +77,178 @@ impl MemoryStore {
impl EventCacheStore for MemoryStore {
type Error = EventCacheStoreError;

async fn add_media_content(&self, request: &MediaRequest, data: Vec<u8>) -> Result<()> {
async fn media_retention_policy(&self) -> Result<Option<MediaRetentionPolicy>, Self::Error> {
Ok(self.inner.read().unwrap().media_retention_policy)
}

async fn set_media_retention_policy(
&self,
policy: MediaRetentionPolicy,
) -> Result<(), Self::Error> {
let mut inner = self.inner.write().unwrap();
inner.media_retention_policy = Some(policy);

Ok(())
}

async fn add_media_content(
&self,
request: &MediaRequest,
data: Vec<u8>,
current_time: SystemTime,
policy: MediaRetentionPolicy,
) -> Result<()> {
// Avoid duplication. Let's try to remove it first.
self.remove_media_content(request).await?;

if policy.exceeds_max_file_size(data.len()) {
// The content is too big to be cached.
return Ok(());
}

// Now, let's add it.
self.media.write().unwrap().push((request.uri().to_owned(), request.unique_key(), data));
let content = MediaContent {
uri: request.uri().to_owned(),
key: request.unique_key(),
data,
last_access: current_time,
};
self.inner.write().unwrap().media.push(content);

Ok(())
}

async fn get_media_content(&self, request: &MediaRequest) -> Result<Option<Vec<u8>>> {
let media = self.media.read().unwrap();
async fn get_media_content(
&self,
request: &MediaRequest,
current_time: SystemTime,
) -> Result<Option<Vec<u8>>> {
let mut inner = self.inner.write().unwrap();
let expected_key = request.unique_key();

Ok(media.iter().find_map(|(_media_uri, media_key, media_content)| {
(media_key == &expected_key).then(|| media_content.to_owned())
}))
// First get the content out of the buffer.
let Some(index) = inner.media.iter().position(|media| media.key == expected_key) else {
return Ok(None);
};
let Some(mut content) = inner.media.remove(index) else {
return Ok(None);
};

// Clone the data.
let data = content.data.clone();

// Update the last access time.
content.last_access = current_time;

// Put it back in the buffer.
inner.media.push(content);

Ok(Some(data))
}

async fn remove_media_content(&self, request: &MediaRequest) -> Result<()> {
let mut media = self.media.write().unwrap();
let mut inner = self.inner.write().unwrap();

let expected_key = request.unique_key();
let Some(index) = media
.iter()
.position(|(_media_uri, media_key, _media_content)| media_key == &expected_key)
else {
let Some(index) = inner.media.iter().position(|media| media.key == expected_key) else {
return Ok(());
};

media.remove(index);
inner.media.remove(index);

Ok(())
}

async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<()> {
let mut media = self.media.write().unwrap();
let expected_key = uri.to_owned();
let positions = media
let mut inner = self.inner.write().unwrap();
let positions = inner
.media
.iter()
.enumerate()
.filter_map(|(position, (media_uri, _media_key, _media_content))| {
(media_uri == &expected_key).then_some(position)
})
.filter_map(|(position, media)| (media.uri == uri).then_some(position))
.collect::<Vec<_>>();

// Iterate in reverse-order so that positions stay valid after first removals.
for position in positions.into_iter().rev() {
media.remove(position);
inner.media.remove(position);
}

Ok(())
}

async fn clean_up_media_cache(
&self,
policy: MediaRetentionPolicy,
current_time: SystemTime,
) -> Result<(), Self::Error> {
if !policy.has_limitations() {
// We can safely skip all the checks.
return Ok(());
}

let mut inner = self.inner.write().unwrap();

// First, check media content that exceed the max filesize.
if policy.max_file_size.is_some() || policy.max_cache_size.is_some() {
inner.media.retain(|content| !policy.exceeds_max_file_size(content.data.len()));
}

// Then, clean up expired media content.
if policy.last_access_expiry.is_some() {
inner
.media
.retain(|content| !policy.has_content_expired(current_time, content.last_access));
}

// Finally, if the cache size is too big, remove old items until it fits.
if let Some(max_cache_size) = policy.max_cache_size {
// Reverse the iterator because in case the cache size is overflowing, we want
// to count the number of old items to remove, and old items are at
// the start.
let (cache_size, overflowing_count) = inner.media.iter().rev().fold(
(0usize, 0u8),
|(cache_size, overflowing_count), content| {
if overflowing_count > 0 {
// Assume that all data is overflowing now. Overflowing count cannot
// overflow because the number of items is limited to 20.
(cache_size, overflowing_count + 1)
} else {
match cache_size.checked_add(content.data.len()) {
Some(cache_size) => (cache_size, 0),
// The cache size is overflowing, let's count the number of overflowing
// items to be able to remove them, since the max cache size cannot be
// bigger than usize::MAX.
None => (cache_size, 1),
}
}
},
);

// If the cache size is overflowing, remove the number of old items we counted.
for _position in 0..overflowing_count {
inner.media.pop();
}

if cache_size > max_cache_size {
let difference = cache_size - max_cache_size;

// Count the number of old items to remove to reach the difference.
let mut accumulated_items_size = 0usize;
let mut remove_items_count = 0u8;
for content in inner.media.iter() {
remove_items_count += 1;
// Cannot overflow since we already removed overflowing items.
accumulated_items_size += content.data.len();

if accumulated_items_size >= difference {
break;
}
}

for _position in 0..remove_items_count {
inner.media.pop();
}
}
}

Ok(())
Expand All @@ -112,5 +263,5 @@ mod tests {
Ok(MemoryStore::new())
}

event_cache_store_integration_tests!();
event_cache_store_integration_tests!(with_media_size_tests);
}
Loading

0 comments on commit 1ff7a2a

Please sign in to comment.