Skip to content

Commit

Permalink
Improve metrics, fix expired segments check, add max users accounts l…
Browse files Browse the repository at this point in the history
…imit (#902)
  • Loading branch information
spetz authored Apr 15, 2024
1 parent ea50337 commit c5bc90c
Show file tree
Hide file tree
Showing 10 changed files with 20 additions and 9 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions integration/tests/streaming/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ async fn given_all_expired_messages_segment_should_be_expired() {
let messages_count = 10;
let now = IggyTimestamp::now().to_micros();
let message_expiry = message_expiry as u64;
let mut expired_timestamp = now - (1000 * 2 * message_expiry);
let mut expired_timestamp = now - (1000000 * 2 * message_expiry);
let mut base_offset = 0;
let mut last_timestamp = 0;
let mut batch_buffer = BytesMut::new();
Expand Down Expand Up @@ -323,8 +323,8 @@ async fn given_at_least_one_not_expired_message_segment_should_not_be_expired()
.await;
let now = IggyTimestamp::now().to_micros();
let message_expiry = message_expiry as u64;
let expired_timestamp = now - (1000 * 2 * message_expiry);
let not_expired_timestamp = now - (1000 * message_expiry) + 1;
let expired_timestamp = now - (1000000 * 2 * message_expiry);
let not_expired_timestamp = now - (1000000 * message_expiry) + 1;
let expired_message = create_message(0, "test", expired_timestamp);
let not_expired_message = create_message(1, "test", not_expired_timestamp);

Expand Down
2 changes: 2 additions & 0 deletions sdk/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ pub enum IggyError {
InvalidPersonalAccessToken = 53,
#[error("Personal access token: {0} for user with ID: {1} has expired.")]
PersonalAccessTokenExpired(String, u32) = 54,
#[error("Users limit reached.")]
UsersLimitReached = 55,
#[error("Not connected")]
NotConnected = 61,
#[error("Request error")]
Expand Down
2 changes: 1 addition & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "server"
version = "0.2.19"
version = "0.2.20"
edition = "2021"
build = "src/build.rs"

Expand Down
3 changes: 2 additions & 1 deletion server/src/streaming/segments/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ impl Segment {
}

let last_message = &last_messages[0];
let message_expiry = (self.message_expiry.unwrap() * 1000) as u64;
// Message expiry is in seconds, and timestamp is in microseconds
let message_expiry = (self.message_expiry.unwrap() * 1000000) as u64;
(last_message.timestamp + message_expiry) <= now
}

Expand Down
2 changes: 1 addition & 1 deletion server/src/streaming/systems/partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ impl System {
{
let stream = self.get_stream(stream_id)?;
let topic = stream.get_topic(topic_id)?;
self.permissioner.create_partitons(
self.permissioner.create_partitions(
session.get_user_id(),
stream.stream_id,
topic.topic_id,
Expand Down
1 change: 0 additions & 1 deletion server/src/streaming/systems/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,6 @@ impl System {
.decrement_partitions(stream.get_partitions_count());
self.metrics.decrement_messages(stream.get_messages_count());
self.metrics.decrement_segments(stream.get_segments_count());

self.streams.remove(&stream_id);
self.streams_ids.remove(&stream_name);
let current_stream_id = CURRENT_STREAM_ID.load(Ordering::SeqCst);
Expand Down
1 change: 1 addition & 0 deletions server/src/streaming/systems/topics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ impl System {
.get_stream_mut(stream_id)?
.delete_topic(topic_id)
.await?;

self.metrics.decrement_topics(1);
self.metrics
.decrement_partitions(topic.get_partitions_count());
Expand Down
8 changes: 8 additions & 0 deletions server/src/streaming/systems/users.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use std::sync::atomic::{AtomicU32, Ordering};
use tracing::{error, info, warn};

static USER_ID: AtomicU32 = AtomicU32::new(1);
const MAX_USERS: usize = u32::MAX as usize;

impl System {
pub(crate) async fn load_users(&mut self) -> Result<(), IggyError> {
Expand All @@ -31,6 +32,7 @@ impl System {
let current_user_id = users.iter().map(|user| user.id).max().unwrap_or(1);
USER_ID.store(current_user_id + 1, Ordering::SeqCst);
self.permissioner.init(users);
self.metrics.increment_users(users_count as u32);
info!("Initialized {} user(s).", users_count);
Ok(())
}
Expand Down Expand Up @@ -121,6 +123,12 @@ impl System {
error!("User: {username} already exists.");
return Err(IggyError::UserAlreadyExists);
}

if self.storage.user.load_all().await?.len() > MAX_USERS {
error!("Available users limit reached.");
return Err(IggyError::UsersLimitReached);
}

let user_id = USER_ID.fetch_add(1, Ordering::SeqCst);
info!("Creating user: {username} with ID: {user_id}...");
let user = User::new(user_id, &username, password, status, permissions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::streaming::users::permissioner::Permissioner;
use iggy::error::IggyError;

impl Permissioner {
pub fn create_partitons(
pub fn create_partitions(
&self,
user_id: u32,
stream_id: u32,
Expand Down

0 comments on commit c5bc90c

Please sign in to comment.