Skip to content

Commit

Permalink
Stop sending the heartbeats when client was shutdown. (#1310)
Browse files Browse the repository at this point in the history
  • Loading branch information
spetz authored Oct 25, 2024
1 parent 9d6666a commit 4396aef
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 15 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion sdk/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "iggy"
version = "0.6.30"
version = "0.6.31"
description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second."
edition = "2021"
license = "MIT"
Expand Down
30 changes: 17 additions & 13 deletions sdk/src/clients/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,37 @@ use crate::client::{
Client, ConsumerGroupClient, ConsumerOffsetClient, MessageClient, PartitionClient,
PersonalAccessTokenClient, StreamClient, SystemClient, TopicClient, UserClient,
};
use crate::clients::builder::IggyClientBuilder;
use crate::clients::consumer::IggyConsumerBuilder;
use crate::clients::producer::IggyProducerBuilder;
use crate::compression::compression_algorithm::CompressionAlgorithm;
use crate::consumer::Consumer;
use crate::diagnostic::DiagnosticEvent;
use crate::error::IggyError;
use crate::identifier::Identifier;
use crate::locking::IggySharedMut;
use crate::locking::IggySharedMutFn;
use crate::messages::poll_messages::PollingStrategy;
use crate::messages::send_messages::{Message, Partitioning};
use crate::models::client_info::{ClientInfo, ClientInfoDetails};
use crate::models::consumer_group::{ConsumerGroup, ConsumerGroupDetails};
use crate::models::consumer_offset_info::ConsumerOffsetInfo;
use crate::models::identity_info::IdentityInfo;
use crate::models::messages::PolledMessages;
use crate::models::permissions::Permissions;
use crate::models::personal_access_token::{PersonalAccessTokenInfo, RawPersonalAccessToken};
use crate::models::stats::Stats;
use crate::models::stream::{Stream, StreamDetails};
use crate::models::topic::{Topic, TopicDetails};
use crate::models::user_info::{UserInfo, UserInfoDetails};
use crate::models::user_status::UserStatus;
use crate::partitioner::Partitioner;
use crate::tcp::client::TcpClient;
use crate::utils::crypto::Encryptor;
use crate::utils::duration::IggyDuration;
use crate::utils::expiry::IggyExpiry;
use crate::utils::personal_access_token_expiry::PersonalAccessTokenExpiry;
use crate::utils::topic_size::MaxTopicSize;
use async_broadcast::Receiver;
use async_dropper::AsyncDrop;
use async_trait::async_trait;
Expand All @@ -29,21 +41,9 @@ use std::fmt::Debug;
use std::sync::Arc;
use tokio::spawn;
use tokio::time::sleep;
use tracing::log::warn;
use tracing::{debug, error, info};

use crate::clients::builder::IggyClientBuilder;
use crate::clients::consumer::IggyConsumerBuilder;
use crate::clients::producer::IggyProducerBuilder;
use crate::compression::compression_algorithm::CompressionAlgorithm;
use crate::diagnostic::DiagnosticEvent;
use crate::messages::poll_messages::PollingStrategy;
use crate::models::permissions::Permissions;
use crate::models::user_status::UserStatus;
use crate::utils::duration::IggyDuration;
use crate::utils::expiry::IggyExpiry;
use crate::utils::personal_access_token_expiry::PersonalAccessTokenExpiry;
use crate::utils::topic_size::MaxTopicSize;

/// The main client struct which implements all the `Client` traits and wraps the underlying low-level client for the specific transport.
///
/// It also provides the additional builders for the standalone consumer, consumer group, and producer.
Expand Down Expand Up @@ -184,6 +184,10 @@ impl Client for IggyClient {
debug!("Sending the heartbeat...");
if let Err(error) = client.read().await.ping().await {
error!("There was an error when sending a heartbeat. {error}");
if error == IggyError::ClientShutdown {
warn!("The client has been shut down - stopping the heartbeat.");
return;
}
} else {
debug!("Heartbeat was sent successfully.");
}
Expand Down

0 comments on commit 4396aef

Please sign in to comment.