Skip to content

Commit

Permalink
Implement client heartbeat verification on the server (#1264)
Browse files Browse the repository at this point in the history
  • Loading branch information
spetz authored Sep 28, 2024
1 parent 246c572 commit 994bd06
Show file tree
Hide file tree
Showing 47 changed files with 482 additions and 115 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions configs/server.json
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@
"interval": "1 m"
}
},
"heartbeat": {
"enabled": false,
"interval": "5 s"
},
"system": {
"path": "local_data",
"backup": {
Expand Down
7 changes: 7 additions & 0 deletions configs/server.toml
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,13 @@ enabled = true
# Interval for running the token cleaner.
interval = "1 m"

# Heartbeat configuration
[heartbeat]
# Enables or disables the client heartbeat verification process.
enabled = false
# Interval for expected client heartbeats
interval = "5 s"

# System configuration.
[system]
# Base path for system data storage.
Expand Down
11 changes: 7 additions & 4 deletions examples/src/basic/consumer/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use clap::Parser;
use iggy::client::Client;
use iggy::client_provider;
use iggy::client_provider::ClientProviderConfig;
use iggy::clients::client::IggyClient;
use iggy::models::messages::PolledMessage;
use iggy_examples::shared::args::Args;
use iggy_examples::shared::system;
Expand All @@ -17,10 +19,11 @@ async fn main() -> Result<(), Box<dyn Error>> {
args.transport
);
let client_provider_config = Arc::new(ClientProviderConfig::from_args(args.to_sdk_args())?);
let client = client_provider::get_raw_connected_client(client_provider_config).await?;
let client = client.as_ref();
system::init_by_consumer(&args, client).await;
system::consume_messages(&args, client, &handle_message).await
let client = client_provider::get_raw_client(client_provider_config, false).await?;
let client = IggyClient::new(client);
client.connect().await?;
system::init_by_consumer(&args, &client).await;
system::consume_messages(&args, &client, &handle_message).await
}

fn handle_message(message: &PolledMessage) -> Result<(), Box<dyn Error>> {
Expand Down
10 changes: 6 additions & 4 deletions examples/src/basic/producer/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use clap::Parser;
use iggy::client::Client;
use iggy::client_provider;
use iggy::client_provider::ClientProviderConfig;
use iggy::clients::client::IggyClient;
use iggy::messages::send_messages::{Message, Partitioning};
use iggy_examples::shared::args::Args;
use iggy_examples::shared::system;
Expand All @@ -19,10 +20,11 @@ async fn main() -> Result<(), Box<dyn Error>> {
args.transport
);
let client_provider_config = Arc::new(ClientProviderConfig::from_args(args.to_sdk_args())?);
let client = client_provider::get_raw_connected_client(client_provider_config).await?;
let client = client.as_ref();
system::init_by_producer(&args, client).await?;
produce_messages(&args, client).await
let client = client_provider::get_raw_client(client_provider_config, false).await?;
let client = IggyClient::new(client);
client.connect().await?;
system::init_by_producer(&args, &client).await?;
produce_messages(&args, &client).await
}

async fn produce_messages(args: &Args, client: &dyn Client) -> Result<(), Box<dyn Error>> {
Expand Down
4 changes: 3 additions & 1 deletion examples/src/message-envelope/consumer/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use anyhow::Result;
use clap::Parser;
use iggy::client::Client;
use iggy::client_provider;
use iggy::client_provider::ClientProviderConfig;
use iggy::clients::client::IggyClient;
Expand All @@ -20,8 +21,9 @@ async fn main() -> Result<(), Box<dyn Error>> {
args.transport
);
let client_provider_config = Arc::new(ClientProviderConfig::from_args(args.to_sdk_args())?);
let client = client_provider::get_raw_connected_client(client_provider_config).await?;
let client = client_provider::get_raw_client(client_provider_config, false).await?;
let client = IggyClient::new(client);
client.connect().await?;
system::init_by_consumer(&args, &client).await;
system::consume_messages(&args, &client, &handle_message).await
}
Expand Down
7 changes: 4 additions & 3 deletions examples/src/message-envelope/producer/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use anyhow::Result;
use clap::Parser;
use iggy::client::MessageClient;
use iggy::client::{Client, MessageClient};
use iggy::client_provider;
use iggy::client_provider::ClientProviderConfig;
use iggy::clients::client::IggyClient;
Expand All @@ -22,8 +22,9 @@ async fn main() -> Result<(), Box<dyn Error>> {
args.transport
);
let client_provider_config = Arc::new(ClientProviderConfig::from_args(args.to_sdk_args())?);
let client = client_provider::get_raw_connected_client(client_provider_config).await?;
let client = IggyClient::builder().with_client(client).build()?;
let client = client_provider::get_raw_client(client_provider_config, false).await?;
let client = IggyClient::new(client);
client.connect().await?;
system::init_by_producer(&args, &client).await?;
produce_messages(&args, &client).await
}
Expand Down
4 changes: 3 additions & 1 deletion examples/src/message-headers/consumer/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use anyhow::Result;
use clap::Parser;
use iggy::client::Client;
use iggy::client_provider;
use iggy::client_provider::ClientProviderConfig;
use iggy::clients::client::IggyClient;
Expand All @@ -21,8 +22,9 @@ async fn main() -> Result<(), Box<dyn Error>> {
args.transport
);
let client_provider_config = Arc::new(ClientProviderConfig::from_args(args.to_sdk_args())?);
let client = client_provider::get_raw_connected_client(client_provider_config).await?;
let client = client_provider::get_raw_client(client_provider_config, false).await?;
let client = IggyClient::new(client);
client.connect().await?;
system::init_by_consumer(&args, &client).await;
system::consume_messages(&args, &client, &handle_message).await
}
Expand Down
7 changes: 4 additions & 3 deletions examples/src/message-headers/producer/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use anyhow::Result;
use bytes::Bytes;
use clap::Parser;
use iggy::client::MessageClient;
use iggy::client::{Client, MessageClient};
use iggy::client_provider;
use iggy::client_provider::ClientProviderConfig;
use iggy::clients::client::IggyClient;
Expand All @@ -25,8 +25,9 @@ async fn main() -> Result<(), Box<dyn Error>> {
args.transport
);
let client_provider_config = Arc::new(ClientProviderConfig::from_args(args.to_sdk_args())?);
let client = client_provider::get_raw_connected_client(client_provider_config).await?;
let client = IggyClient::builder().with_client(client).build()?;
let client = client_provider::get_raw_client(client_provider_config, false).await?;
let client = IggyClient::new(client);
client.connect().await?;
system::init_by_producer(&args, &client).await?;
produce_messages(&args, &client).await
}
Expand Down
4 changes: 3 additions & 1 deletion examples/src/new-sdk/consumer/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use clap::Parser;
use futures_util::StreamExt;
use iggy::client::Client;
use iggy::client_provider;
use iggy::client_provider::ClientProviderConfig;
use iggy::clients::client::IggyClient;
Expand All @@ -26,8 +27,9 @@ async fn main() -> anyhow::Result<(), Box<dyn Error>> {
args.transport
);
let client_provider_config = Arc::new(ClientProviderConfig::from_args(args.to_sdk_args())?);
let client = client_provider::get_raw_connected_client(client_provider_config).await?;
let client = client_provider::get_raw_client(client_provider_config, false).await?;
let client = IggyClient::new(client);
client.connect().await?;

let name = "new-sdk-consumer";
let mut consumer = match ConsumerKind::from_code(args.consumer_kind)? {
Expand Down
4 changes: 3 additions & 1 deletion examples/src/new-sdk/producer/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use clap::Parser;
use iggy::client::Client;
use iggy::client_provider;
use iggy::client_provider::ClientProviderConfig;
use iggy::clients::client::IggyClient;
Expand All @@ -23,8 +24,9 @@ async fn main() -> anyhow::Result<(), Box<dyn Error>> {
args.transport
);
let client_provider_config = Arc::new(ClientProviderConfig::from_args(args.to_sdk_args())?);
let client = client_provider::get_raw_connected_client(client_provider_config).await?;
let client = client_provider::get_raw_client(client_provider_config, false).await?;
let client = IggyClient::builder().with_client(client).build()?;
client.connect().await?;
let mut producer = client
.producer(&args.stream_id, &args.topic_id)?
.batch_size(args.messages_per_batch)
Expand Down
16 changes: 12 additions & 4 deletions examples/src/shared/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,10 @@ pub struct Args {
pub tcp_reconnection_interval: String,

#[arg(long, default_value = "5s")]
pub tcp_reconnection_re_establish_after: String,
pub tcp_reconnection_reestablish_after: String,

#[arg(long, default_value = "5s")]
pub tcp_heartbeat_interval: String,

#[arg(long, default_value = "127.0.0.1:8090")]
pub tcp_server_address: String,
Expand Down Expand Up @@ -100,7 +103,7 @@ pub struct Args {
pub quic_reconnection_interval: String,

#[arg(long, default_value = "5s")]
pub quic_reconnection_re_establish_after: String,
pub quic_reconnection_reestablish_after: String,

#[arg(long, default_value = "10000")]
pub quic_max_concurrent_bidi_streams: u64,
Expand Down Expand Up @@ -128,6 +131,9 @@ pub struct Args {

#[arg(long, default_value = "false")]
pub quic_validate_certificate: bool,

#[arg(long, default_value = "5s")]
pub quic_heartbeat_interval: String,
}

impl Args {
Expand All @@ -143,15 +149,16 @@ impl Args {
tcp_reconnection_enabled: self.tcp_reconnection_enabled,
tcp_reconnection_max_retries: self.tcp_reconnection_max_retries,
tcp_reconnection_interval: self.tcp_reconnection_interval.clone(),
tcp_reconnection_re_establish_after: self.tcp_reconnection_re_establish_after.clone(),
tcp_reconnection_reestablish_after: self.tcp_reconnection_reestablish_after.clone(),
tcp_heartbeat_interval: self.tcp_heartbeat_interval.clone(),
tcp_tls_enabled: self.tcp_tls_enabled,
tcp_tls_domain: self.tcp_tls_domain.clone(),
quic_client_address: self.quic_client_address.clone(),
quic_server_address: self.quic_server_address.clone(),
quic_server_name: self.quic_server_name.clone(),
quic_reconnection_enabled: self.quic_reconnection_enabled,
quic_reconnection_max_retries: self.quic_reconnection_max_retries,
quic_reconnection_re_establish_after: self.quic_reconnection_re_establish_after.clone(),
quic_reconnection_reestablish_after: self.quic_reconnection_reestablish_after.clone(),
quic_reconnection_interval: self.quic_reconnection_interval.clone(),
quic_max_concurrent_bidi_streams: self.quic_max_concurrent_bidi_streams,
quic_datagram_send_buffer_size: self.quic_datagram_send_buffer_size,
Expand All @@ -162,6 +169,7 @@ impl Args {
quic_keep_alive_interval: self.quic_keep_alive_interval,
quic_max_idle_timeout: self.quic_max_idle_timeout,
quic_validate_certificate: self.quic_validate_certificate,
quic_heartbeat_interval: self.quic_heartbeat_interval.clone(),
}
}

Expand Down
2 changes: 1 addition & 1 deletion sdk/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "iggy"
version = "0.6.21"
version = "0.6.30"
description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second."
edition = "2021"
license = "MIT"
Expand Down
16 changes: 12 additions & 4 deletions sdk/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,10 @@ pub struct Args {
pub tcp_reconnection_interval: String,

/// The optional re-establish after last connection interval for TCP
pub tcp_reconnection_re_establish_after: String,
pub tcp_reconnection_reestablish_after: String,

/// The optional heartbeat interval for the TCP transport
pub tcp_heartbeat_interval: String,

/// Flag to enable TLS for the TCP transport
pub tcp_tls_enabled: bool,
Expand All @@ -243,7 +246,7 @@ pub struct Args {
pub quic_reconnection_interval: String,

/// The optional re-establish after last connection interval for QUIC
pub quic_reconnection_re_establish_after: String,
pub quic_reconnection_reestablish_after: String,

/// The optional maximum number of concurrent bidirectional streams for QUIC
pub quic_max_concurrent_bidi_streams: u64,
Expand Down Expand Up @@ -271,6 +274,9 @@ pub struct Args {

/// Flag to enable certificate validation for QUIC
pub quic_validate_certificate: bool,

/// The optional heartbeat interval for the QUIC transport
pub quic_heartbeat_interval: String,
}

const QUIC_TRANSPORT: &str = "quic";
Expand Down Expand Up @@ -306,7 +312,8 @@ impl Default for Args {
tcp_reconnection_enabled: true,
tcp_reconnection_max_retries: None,
tcp_reconnection_interval: "1s".to_string(),
tcp_reconnection_re_establish_after: "5s".to_string(),
tcp_reconnection_reestablish_after: "5s".to_string(),
tcp_heartbeat_interval: "5s".to_string(),
tcp_tls_enabled: false,
tcp_tls_domain: "localhost".to_string(),
quic_client_address: "127.0.0.1:0".to_string(),
Expand All @@ -315,7 +322,7 @@ impl Default for Args {
quic_reconnection_enabled: true,
quic_reconnection_max_retries: None,
quic_reconnection_interval: "1s".to_string(),
quic_reconnection_re_establish_after: "5s".to_string(),
quic_reconnection_reestablish_after: "5s".to_string(),
quic_max_concurrent_bidi_streams: 10000,
quic_datagram_send_buffer_size: 100000,
quic_initial_mtu: 1200,
Expand All @@ -325,6 +332,7 @@ impl Default for Args {
quic_keep_alive_interval: 5000,
quic_max_idle_timeout: 10000,
quic_validate_certificate: false,
quic_heartbeat_interval: "5s".to_string(),
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions sdk/src/binary/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::command::Command;
use crate::diagnostic::DiagnosticEvent;
use crate::error::IggyError;
use crate::utils::duration::IggyDuration;
use async_trait::async_trait;
use bytes::Bytes;
use derive_more::Display;
Expand Down Expand Up @@ -60,6 +61,7 @@ pub trait BinaryTransport {
/// Sends a command and returns the response.
async fn send_with_response<T: Command>(&self, command: &T) -> Result<Bytes, IggyError>;
async fn send_raw_with_response(&self, code: u32, payload: Bytes) -> Result<Bytes, IggyError>;
fn get_heartbeat_interval(&self) -> IggyDuration;
}

async fn fail_if_not_authenticated<T: BinaryTransport>(transport: &T) -> Result<(), IggyError> {
Expand Down
5 changes: 5 additions & 0 deletions sdk/src/binary/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::system::get_clients::GetClients;
use crate::system::get_me::GetMe;
use crate::system::get_stats::GetStats;
use crate::system::ping::Ping;
use crate::utils::duration::IggyDuration;

#[async_trait::async_trait]
impl<B: BinaryClient> SystemClient for B {
Expand Down Expand Up @@ -44,4 +45,8 @@ impl<B: BinaryClient> SystemClient for B {
self.send_with_response(&Ping {}).await?;
Ok(())
}

async fn heartbeat_interval(&self) -> IggyDuration {
self.get_heartbeat_interval()
}
}
Loading

0 comments on commit 994bd06

Please sign in to comment.