From b77df3e11fd47f1da03317d83320120e99802926 Mon Sep 17 00:00:00 2001 From: Andrea Frigido Date: Mon, 31 Jul 2023 11:24:15 +0100 Subject: [PATCH 1/8] chore: Update license field following SPDX 2.1 (#287) --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 01f6fc0..46c8a45 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ authors = [ "Geoffroy Couprie ", ] -license = "MIT/Apache-2.0" +license = "MIT OR Apache-2.0" readme = "./README.md" repository = "https://github.com/streamnative/pulsar-rs" documentation = "https://docs.rs/pulsar" From 597e57b44bdb98f91c928d934dcd033fae6df520 Mon Sep 17 00:00:00 2001 From: Florentin DUBOIS Date: Wed, 2 Aug 2023 17:11:04 +0200 Subject: [PATCH 2/8] build: allow to use rustls instead of native-tls (#288) chore: update dependencies * Bump bytes to 1.4.0 * Bump crc to 3.0.1 * Bump nom to 7.1.3 * Bump prost to 0.11.9 * Bump prost-derive to 0.11.9 * Bump chrono to 0.4.26 * Bump log to 0.4.19 * Bump url to 2.4.0 * Bump regex to 1.9.1 * Bump futures to 0.3.28 * Bump futures-io to 0.3.28 * Bump native-tls to 0.2.11 * Bump pem to 3.0.0 * Bump tokio to 1.29.1 * Bump tokio-util to 0.7.8 * Bump tokio-native-tls to 0.3.1 * Bump asynchronous-codec to 0.6.2 * Bump async-native-tls to 0.5.0 * Bump flate2 to 1.0.26 * Bump zstd to 0.12.4 * Bump snap to 1.1.0 * Bump openidconnect to 3.3.0 * Bump oauth2 to 4.4.1 * Bump serde to 1.0.175 * Bump serde_json to 1.0.103 * Bump async-trait to 0.1.72 * Bump data-url to 0.3.0 * Bump uuid to 1.4.1 build: allow to use rustls instead of native-tls * This is used in an effort to remove all dependencies to openssl. Which could be interesting in embedded system or on environment which is difficult to know on which openssl version the software will run it and breaks deployments. * It introduces two compiler feature flags which are `tokio-rustls-runtime` and `async-std-rustls-runtime` that have the same meaning as `tokio-runtime` and `async-std-runtime` except that they use rustls. * There is a safe guard, if we enable both runtimes, this is the native-tls ones that are used to keep consistent with the current behaviour. Signed-off-by: Florentin Dubois --- Cargo.toml | 80 +++++++++++---------- src/connection.rs | 147 ++++++++++++++++++++++++++++++++++++-- src/connection_manager.rs | 18 ++++- src/consumer/mod.rs | 16 ++--- src/error.rs | 30 ++++++++ src/executor.rs | 60 ++++++++++------ src/lib.rs | 21 +++--- src/message.rs | 8 +-- 8 files changed, 297 insertions(+), 83 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 46c8a45..12633c8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,56 +16,62 @@ description = "Rust client for Apache Pulsar" keywords = ["pulsar", "api", "client"] [dependencies] -bytes = "^1.2.1" -crc = "^3.0.0" -nom = { version="^7.1.1", default-features=false, features=["alloc"] } -prost = "^0.11.0" -prost-derive = "^0.11.0" +bytes = "^1.4.0" +crc = "^3.0.1" +nom = { version="^7.1.3", default-features=false, features=["alloc"] } +prost = "^0.11.9" +prost-derive = "^0.11.9" rand = "^0.8.5" -chrono = "^0.4.22" +chrono = "^0.4.26" futures-timer = "^3.0.2" -log = "^0.4.17" -url = "^2.3.1" -regex = "^1.6.0" +log = "^0.4.19" +url = "^2.4.0" +regex = "^1.9.1" bit-vec = "^0.6.3" -futures = "^0.3.25" -futures-io = "^0.3.25" -native-tls = "^0.2.10" -pem = "^1.1.0" -tokio = { version = "^1.21.2", features = ["rt", "net", "time"], optional = true } -tokio-util = { version = "^0.7.4", features = ["codec"], optional = true } -tokio-native-tls = { version = "^0.3.0", optional = true } -async-std = {version = "^1.12.0", features = [ "attributes", "unstable" ], optional = true } -asynchronous-codec = { version = "^0.6.0", optional = true } -async-native-tls = { version = "^0.4.0", optional = true } +futures = "^0.3.28" +futures-io = "^0.3.28" +native-tls = { version = "^0.2.11", optional = true } +rustls = { version = "^0.21.5", optional = true } +webpki-roots = { version = "^0.25.1", optional = true } +pem = "^3.0.0" +tokio = { version = "^1.29.1", features = ["rt", "net", "time"], optional = true } +tokio-util = { version = "^0.7.8", features = ["codec"], optional = true } +tokio-rustls = { version = "^0.24.1", optional = true } +tokio-native-tls = { version = "^0.3.1", optional = true } +async-std = { version = "^1.12.0", features = [ "attributes", "unstable" ], optional = true } +asynchronous-codec = { version = "^0.6.2", optional = true } +async-rustls = { version = "^0.4.0", optional = true } +async-native-tls = { version = "^0.5.0", optional = true } lz4 = { version = "^1.24.0", optional = true } -flate2 = { version = "^1.0.24", optional = true } -zstd = { version = "^0.11.2", optional = true } -snap = { version = "^1.0.5", optional = true } -openidconnect = { version = "^2.4.0", optional = true } -oauth2 = { version = "^4.2.3", optional = true } -serde = { version = "^1.0.147", features = ["derive"], optional = true } -serde_json = { version = "^1.0.87", optional = true } +flate2 = { version = "^1.0.26", optional = true } +zstd = { version = "^0.12.4", optional = true } +snap = { version = "^1.1.0", optional = true } +openidconnect = { version = "^3.3.0", optional = true } +oauth2 = { version = "^4.4.1", optional = true } +serde = { version = "^1.0.175", features = ["derive"], optional = true } +serde_json = { version = "^1.0.103", optional = true } tracing = { version = "^0.1.37", optional = true } -async-trait = "^0.1.58" -data-url = { version = "^0.2.0", optional = true } -uuid = {version = "^1.2.1", features = ["v4", "fast-rng"] } +async-trait = "^0.1.72" +data-url = { version = "^0.3.0", optional = true } +uuid = { version = "^1.4.1", features = ["v4", "fast-rng"] } [dev-dependencies] -serde = { version = "^1.0.145", features = ["derive"] } -serde_json = "^1.0.85" -env_logger = "^0.9.1" -tokio = { version = "^1.21.2", features = ["macros", "rt-multi-thread"] } +serde = { version = "^1.0.175", features = ["derive"] } +serde_json = "^1.0.103" +env_logger = "^0.10.0" +tokio = { version = "^1.29.1", features = ["macros", "rt-multi-thread"] } [build-dependencies] -prost-build = "^0.11.1" +prost-build = "^0.11.9" protobuf-src = { version = "1.1.0", optional = true } [features] -default = [ "compression", "tokio-runtime", "async-std-runtime", "auth-oauth2" ] +default = [ "compression", "tokio-runtime", "async-std-runtime", "auth-oauth2"] compression = [ "lz4", "flate2", "zstd", "snap" ] -tokio-runtime = [ "tokio", "tokio-util", "tokio-native-tls" ] -async-std-runtime = [ "async-std", "asynchronous-codec", "async-native-tls" ] +tokio-runtime = [ "tokio", "tokio-util", "native-tls", "tokio-native-tls" ] +tokio-rustls-runtime = ["tokio", "tokio-util", "tokio-rustls", "rustls", "webpki-roots" ] +async-std-runtime = [ "async-std", "asynchronous-codec", "native-tls", "async-native-tls" ] +async-std-rustls-runtime = ["async-std", "asynchronous-codec", "async-rustls", "rustls", "webpki-roots" ] auth-oauth2 = [ "openidconnect", "oauth2", "serde", "serde_json", "data-url" ] telemetry = ["tracing"] protobuf-src = ["dep:protobuf-src"] diff --git a/src/connection.rs b/src/connection.rs index feda49e..2faae5c 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -20,9 +20,15 @@ use futures::{ task::{Context, Poll}, Future, FutureExt, Sink, SinkExt, Stream, StreamExt, }; +#[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))] use native_tls::Certificate; use proto::MessageIdData; use rand::{seq::SliceRandom, thread_rng}; +#[cfg(all( + any(feature = "tokio-rustls-runtime", feature = "async-std-rustls-runtime"), + not(any(feature = "tokio-runtime", feature = "async-std-runtime")) +))] +use rustls::Certificate; use url::Url; use uuid::Uuid; @@ -934,7 +940,69 @@ impl Connection { .await } } - #[cfg(not(feature = "tokio-runtime"))] + #[cfg(all(feature = "tokio-rustls-runtime", not(feature = "tokio-runtime")))] + ExecutorKind::Tokio => { + if tls { + let stream = tokio::net::TcpStream::connect(&address).await?; + let mut root_store = rustls::RootCertStore::empty(); + for certificate in certificate_chain { + root_store.add(certificate)?; + } + + let trust_anchors = webpki_roots::TLS_SERVER_ROOTS.iter().fold( + vec![], + |mut acc, trust_anchor| { + acc.push( + rustls::OwnedTrustAnchor::from_subject_spki_name_constraints( + trust_anchor.subject, + trust_anchor.spki, + trust_anchor.name_constraints, + ), + ); + acc + }, + ); + + root_store.add_server_trust_anchors(trust_anchors.into_iter()); + let config = rustls::ClientConfig::builder() + .with_safe_default_cipher_suites() + .with_safe_default_kx_groups() + .with_safe_default_protocol_versions()? + .with_root_certificates(root_store) + .with_no_client_auth(); + + let cx = tokio_rustls::TlsConnector::from(Arc::new(config)); + let stream = cx + .connect(rustls::ServerName::try_from(hostname.as_str())?, stream) + .await + .map(|stream| tokio_util::codec::Framed::new(stream, Codec))?; + + Connection::connect( + connection_id, + stream, + auth, + proxy_to_broker_url, + executor, + operation_timeout, + ) + .await + } else { + let stream = tokio::net::TcpStream::connect(&address) + .await + .map(|stream| tokio_util::codec::Framed::new(stream, Codec))?; + + Connection::connect( + connection_id, + stream, + auth, + proxy_to_broker_url, + executor, + operation_timeout, + ) + .await + } + } + #[cfg(all(not(feature = "tokio-runtime"), not(feature = "tokio-rustls-runtime")))] ExecutorKind::Tokio => { unimplemented!("the tokio-runtime cargo feature is not active"); } @@ -980,7 +1048,75 @@ impl Connection { .await } } - #[cfg(not(feature = "async-std-runtime"))] + #[cfg(all( + feature = "async-std-rustls-runtime", + not(feature = "async-std-runtime") + ))] + ExecutorKind::AsyncStd => { + if tls { + let stream = async_std::net::TcpStream::connect(&address).await?; + let mut root_store = rustls::RootCertStore::empty(); + for certificate in certificate_chain { + root_store.add(certificate)?; + } + + let trust_anchors = webpki_roots::TLS_SERVER_ROOTS.iter().fold( + vec![], + |mut acc, trust_anchor| { + acc.push( + rustls::OwnedTrustAnchor::from_subject_spki_name_constraints( + trust_anchor.subject, + trust_anchor.spki, + trust_anchor.name_constraints, + ), + ); + acc + }, + ); + + root_store.add_server_trust_anchors(trust_anchors.into_iter()); + let config = rustls::ClientConfig::builder() + .with_safe_default_cipher_suites() + .with_safe_default_kx_groups() + .with_safe_default_protocol_versions()? + .with_root_certificates(root_store) + .with_no_client_auth(); + + let connector = async_rustls::TlsConnector::from(Arc::new(config)); + let stream = connector + .connect(rustls::ServerName::try_from(hostname.as_str())?, stream) + .await + .map(|stream| asynchronous_codec::Framed::new(stream, Codec))?; + + Connection::connect( + connection_id, + stream, + auth, + proxy_to_broker_url, + executor, + operation_timeout, + ) + .await + } else { + let stream = async_std::net::TcpStream::connect(&address) + .await + .map(|stream| asynchronous_codec::Framed::new(stream, Codec))?; + + Connection::connect( + connection_id, + stream, + auth, + proxy_to_broker_url, + executor, + operation_timeout, + ) + .await + } + } + #[cfg(all( + not(feature = "async-std-runtime"), + not(feature = "async-std-rustls-runtime") + ))] ExecutorKind::AsyncStd => { unimplemented!("the async-std-runtime cargo feature is not active"); } @@ -1623,16 +1759,17 @@ mod tests { use uuid::Uuid; use super::{Connection, Receiver}; + #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] + use crate::TokioExecutor; use crate::{ authentication::Authentication, error::{AuthenticationError, SharedError}, message::{BaseCommand, Codec, Message}, proto::{AuthData, CommandAuthChallenge, CommandAuthResponse, CommandConnected}, - TokioExecutor, }; #[tokio::test] - #[cfg(feature = "tokio-runtime")] + #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] async fn receiver_auth_challenge_test() { let (message_tx, message_rx) = mpsc::unbounded(); let (tx, _) = mpsc::unbounded(); @@ -1690,7 +1827,7 @@ mod tests { } #[tokio::test] - #[cfg(feature = "tokio-runtime")] + #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] async fn connection_auth_challenge_test() { let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); diff --git a/src/connection_manager.rs b/src/connection_manager.rs index 73c516d..4390947 100644 --- a/src/connection_manager.rs +++ b/src/connection_manager.rs @@ -1,8 +1,14 @@ use std::{collections::HashMap, sync::Arc, time::Duration}; use futures::{channel::oneshot, lock::Mutex}; +#[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))] use native_tls::Certificate; use rand::Rng; +#[cfg(all( + any(feature = "tokio-rustls-runtime", feature = "async-std-rustls-runtime"), + not(any(feature = "tokio-runtime", feature = "async-std-runtime")) +))] +use rustls::Certificate; use url::Url; use crate::{connection::Connection, error::ConnectionError, executor::Executor}; @@ -153,10 +159,20 @@ impl ConnectionManager { .iter() .rev() { + #[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))] v.push( - Certificate::from_der(&cert.contents[..]) + Certificate::from_der(cert.contents()) .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?, ); + + #[cfg(all( + any( + feature = "tokio-rustls-runtime", + feature = "async-std-rustls-runtime" + ), + not(any(feature = "tokio-runtime", feature = "async-std-runtime")) + ))] + v.push(Certificate(cert.contents().to_vec())); } v } diff --git a/src/consumer/mod.rs b/src/consumer/mod.rs index 37bc417..849071d 100644 --- a/src/consumer/mod.rs +++ b/src/consumer/mod.rs @@ -437,11 +437,11 @@ mod tests { }; use log::LevelFilter; use regex::Regex; - #[cfg(feature = "tokio-runtime")] + #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] use tokio::time::timeout; use super::*; - #[cfg(feature = "tokio-runtime")] + #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] use crate::executor::TokioExecutor; use crate::{ consumer::initial_position::InitialPosition, producer, proto, tests::TEST_LOGGER, @@ -476,7 +476,7 @@ mod tests { tag: "multi_consumer", }; #[tokio::test] - #[cfg(feature = "tokio-runtime")] + #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] async fn multi_consumer() { let _result = log::set_logger(&MULTI_LOGGER); log::set_max_level(LevelFilter::Debug); @@ -567,7 +567,7 @@ mod tests { } #[tokio::test] - #[cfg(feature = "tokio-runtime")] + #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] async fn consumer_dropped_with_lingering_acks() { use rand::{distributions::Alphanumeric, Rng}; let _result = log::set_logger(&TEST_LOGGER); @@ -664,7 +664,7 @@ mod tests { } #[tokio::test] - #[cfg(feature = "tokio-runtime")] + #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] async fn dead_letter_queue() { let _result = log::set_logger(&TEST_LOGGER); log::set_max_level(LevelFilter::Debug); @@ -738,7 +738,7 @@ mod tests { } #[tokio::test] - #[cfg(feature = "tokio-runtime")] + #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] async fn failover() { let _result = log::set_logger(&MULTI_LOGGER); log::set_max_level(LevelFilter::Debug); @@ -798,7 +798,7 @@ mod tests { } #[tokio::test] - #[cfg(feature = "tokio-runtime")] + #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] async fn seek_single_consumer() { let _result = log::set_logger(&MULTI_LOGGER); log::set_max_level(LevelFilter::Debug); @@ -917,7 +917,7 @@ mod tests { } #[tokio::test] - #[cfg(feature = "tokio-runtime")] + #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] async fn schema_test() { #[derive(Serialize, Deserialize)] struct TestData { diff --git a/src/error.rs b/src/error.rs index bd1dc64..31ed7fb 100644 --- a/src/error.rs +++ b/src/error.rs @@ -88,7 +88,15 @@ pub enum ConnectionError { Encoding(String), SocketAddr(String), UnexpectedResponse(String), + #[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))] Tls(native_tls::Error), + #[cfg(all( + any(feature = "tokio-rustls-runtime", feature = "async-std-rustls-runtime"), + not(any(feature = "tokio-runtime", feature = "async-std-runtime")) + ))] + Tls(rustls::Error), + #[cfg(any(feature = "tokio-rustls-runtime", feature = "async-std-rustls-runtime"))] + DnsName(rustls::client::InvalidDnsNameError), Authentication(AuthenticationError), NotFound, Canceled, @@ -113,6 +121,7 @@ impl From for ConnectionError { } } +#[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))] impl From for ConnectionError { #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] fn from(err: native_tls::Error) -> Self { @@ -120,6 +129,25 @@ impl From for ConnectionError { } } +#[cfg(all( + any(feature = "tokio-rustls-runtime", feature = "async-std-rustls-runtime"), + not(any(feature = "tokio-runtime", feature = "async-std-runtime")) +))] +impl From for ConnectionError { + #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] + fn from(err: rustls::Error) -> Self { + ConnectionError::Tls(err) + } +} + +#[cfg(any(feature = "tokio-rustls-runtime", feature = "async-std-rustls-runtime"))] +impl From for ConnectionError { + #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] + fn from(err: rustls::client::InvalidDnsNameError) -> Self { + ConnectionError::DnsName(err) + } +} + impl From for ConnectionError { #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] fn from(err: AuthenticationError) -> Self { @@ -141,6 +169,8 @@ impl fmt::Display for ConnectionError { ConnectionError::Encoding(e) => write!(f, "Error encoding message: {e}"), ConnectionError::SocketAddr(e) => write!(f, "Error obtaining socket address: {e}"), ConnectionError::Tls(e) => write!(f, "Error connecting TLS stream: {e}"), + #[cfg(any(feature = "tokio-rustls-runtime", feature = "async-std-rustls-runtime"))] + ConnectionError::DnsName(e) => write!(f, "Error resolving hostname: {e}"), ConnectionError::Authentication(e) => write!(f, "Error authentication: {e}"), ConnectionError::UnexpectedResponse(e) => { write!(f, "Unexpected response from pulsar: {e}") diff --git a/src/executor.rs b/src/executor.rs index 3273f30..b980986 100644 --- a/src/executor.rs +++ b/src/executor.rs @@ -40,11 +40,11 @@ pub trait Executor: Clone + Send + Sync + 'static { } /// Wrapper for the Tokio executor -#[cfg(feature = "tokio-runtime")] +#[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] #[derive(Clone, Debug)] pub struct TokioExecutor; -#[cfg(feature = "tokio-runtime")] +#[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] impl Executor for TokioExecutor { #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] fn spawn(&self, f: Pin + Send>>) -> Result<(), ()> { @@ -78,11 +78,11 @@ impl Executor for TokioExecutor { } /// Wrapper for the async-std executor -#[cfg(feature = "async-std-runtime")] +#[cfg(any(feature = "async-std-runtime", features = "async-std-rustls-runtime"))] #[derive(Clone, Debug)] pub struct AsyncStdExecutor; -#[cfg(feature = "async-std-runtime")] +#[cfg(any(feature = "async-std-runtime", features = "async-std-rustls-runtime"))] impl Executor for AsyncStdExecutor { #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] fn spawn(&self, f: Pin + Send>>) -> Result<(), ()> { @@ -150,13 +150,18 @@ impl Executor for Arc { /// future returned by [Executor::spawn_blocking] to await on the task's result pub enum JoinHandle { /// wrapper for tokio's `JoinHandle` - #[cfg(feature = "tokio-runtime")] + #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] Tokio(tokio::task::JoinHandle), /// wrapper for async-std's `JoinHandle` - #[cfg(feature = "async-std-runtime")] + #[cfg(any(feature = "async-std-runtime", features = "async-std-rustls-runtime"))] AsyncStd(async_std::task::JoinHandle), // here to avoid a compilation error since T is not used - #[cfg(all(not(feature = "tokio-runtime"), not(feature = "async-std-runtime")))] + #[cfg(all( + not(feature = "tokio-runtime"), + not(feature = "tokio-rustls-runtime"), + not(feature = "async-std-runtime"), + not(feature = "async-std-rustls-runtime") + ))] PlaceHolder(T), } @@ -166,17 +171,22 @@ impl Future for JoinHandle { #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context) -> std::task::Poll { match self.get_mut() { - #[cfg(feature = "tokio-runtime")] + #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] JoinHandle::Tokio(j) => match Pin::new(j).poll(cx) { Poll::Pending => Poll::Pending, Poll::Ready(v) => Poll::Ready(v.ok()), }, - #[cfg(feature = "async-std-runtime")] + #[cfg(any(feature = "async-std-runtime", features = "async-std-rustls-runtime"))] JoinHandle::AsyncStd(j) => match Pin::new(j).poll(cx) { Poll::Pending => Poll::Pending, Poll::Ready(v) => Poll::Ready(Some(v)), }, - #[cfg(all(not(feature = "tokio-runtime"), not(feature = "async-std-runtime")))] + #[cfg(all( + not(feature = "tokio-runtime"), + not(feature = "tokio-rustls-runtime"), + not(feature = "async-std-runtime"), + not(feature = "async-std-rustls-runtime") + ))] JoinHandle::PlaceHolder(t) => { unimplemented!("please activate one of the following cargo features: tokio-runtime, async-std-runtime") } @@ -187,12 +197,17 @@ impl Future for JoinHandle { /// a `Stream` producing a `()` at rgular time intervals pub enum Interval { /// wrapper for tokio's interval - #[cfg(feature = "tokio-runtime")] + #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] Tokio(tokio::time::Interval), /// wrapper for async-std's interval - #[cfg(feature = "async-std-runtime")] + #[cfg(any(feature = "async-std-runtime", features = "async-std-rustls-runtime"))] AsyncStd(async_std::stream::Interval), - #[cfg(all(not(feature = "tokio-runtime"), not(feature = "async-std-runtime")))] + #[cfg(all( + not(feature = "tokio-runtime"), + not(feature = "tokio-rustls-runtime"), + not(feature = "async-std-runtime"), + not(feature = "async-std-rustls-runtime") + ))] PlaceHolder, } @@ -206,17 +221,22 @@ impl Stream for Interval { ) -> std::task::Poll> { unsafe { match Pin::get_unchecked_mut(self) { - #[cfg(feature = "tokio-runtime")] + #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] Interval::Tokio(j) => match Pin::new_unchecked(j).poll_tick(cx) { Poll::Pending => Poll::Pending, Poll::Ready(_) => Poll::Ready(Some(())), }, - #[cfg(feature = "async-std-runtime")] + #[cfg(any(feature = "async-std-runtime", features = "async-std-rustls-runtime"))] Interval::AsyncStd(j) => match Pin::new_unchecked(j).poll_next(cx) { Poll::Pending => Poll::Pending, Poll::Ready(v) => Poll::Ready(v), }, - #[cfg(all(not(feature = "tokio-runtime"), not(feature = "async-std-runtime")))] + #[cfg(all( + not(feature = "tokio-runtime"), + not(feature = "tokio-rustls-runtime"), + not(feature = "async-std-runtime"), + not(feature = "async-std-rustls-runtime") + ))] Interval::PlaceHolder => { unimplemented!("please activate one of the following cargo features: tokio-runtime, async-std-runtime") } @@ -228,10 +248,10 @@ impl Stream for Interval { /// a future producing a `()` after some time pub enum Delay { /// wrapper around tokio's `Sleep` - #[cfg(feature = "tokio-runtime")] + #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] Tokio(tokio::time::Sleep), /// wrapper around async-std's `Delay` - #[cfg(feature = "async-std-runtime")] + #[cfg(any(feature = "async-std-runtime", features = "async-std-rustls-runtime"))] AsyncStd(Pin + Send>>), } @@ -242,12 +262,12 @@ impl Future for Delay { fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context) -> std::task::Poll { unsafe { match Pin::get_unchecked_mut(self) { - #[cfg(feature = "tokio-runtime")] + #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] Delay::Tokio(d) => match Pin::new_unchecked(d).poll(cx) { Poll::Pending => Poll::Pending, Poll::Ready(_) => Poll::Ready(()), }, - #[cfg(feature = "async-std-runtime")] + #[cfg(any(feature = "async-std-runtime", features = "async-std-rustls-runtime"))] Delay::AsyncStd(j) => match Pin::new_unchecked(j).poll(cx) { Poll::Pending => Poll::Pending, Poll::Ready(_) => Poll::Ready(()), diff --git a/src/lib.rs b/src/lib.rs index 5841518..514973c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -160,6 +160,11 @@ extern crate prost_derive; #[macro_use] extern crate serde; +#[cfg(all(features = "tokio-rustls-runtime", features = "tokio-runtime"))] +compile_error!("You have selected both features \"tokio-rustls-runtime\" and \"tokio-runtime\" which are exclusive, please choose one of them"); +#[cfg(all(features = "async-std-rustls-runtime", features = "async-std-runtime"))] +compile_error!("You have selected both features \"async-std-rustls-runtime\" and \"async-std-runtime\" which are exclusive, please choose one of them"); + pub use client::{DeserializeMessage, Pulsar, PulsarBuilder, SerializeMessage}; pub use connection::Authentication; pub use connection_manager::{ @@ -167,10 +172,10 @@ pub use connection_manager::{ }; pub use consumer::{Consumer, ConsumerBuilder, ConsumerOptions}; pub use error::Error; -#[cfg(feature = "async-std-runtime")] +#[cfg(any(feature = "async-std-runtime", features = "async-std-rustls-runtime"))] pub use executor::AsyncStdExecutor; pub use executor::Executor; -#[cfg(feature = "tokio-runtime")] +#[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] pub use executor::TokioExecutor; pub use message::{ proto::{self, command_subscribe::SubType, CommandSendReceipt}, @@ -201,11 +206,11 @@ mod tests { use futures::{future::try_join_all, StreamExt}; use log::{LevelFilter, Metadata, Record}; - #[cfg(feature = "tokio-runtime")] + #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] use tokio::time::timeout; use super::*; - #[cfg(feature = "tokio-runtime")] + #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] use crate::executor::TokioExecutor; use crate::{ client::SerializeMessage, @@ -309,7 +314,7 @@ mod tests { pub static TEST_LOGGER: SimpleLogger = SimpleLogger { tag: "" }; #[tokio::test] - #[cfg(feature = "tokio-runtime")] + #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] async fn round_trip() { let _result = log::set_logger(&TEST_LOGGER); log::set_max_level(LevelFilter::Debug); @@ -374,7 +379,7 @@ mod tests { } #[tokio::test] - #[cfg(feature = "tokio-runtime")] + #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] async fn unsized_data() { let _result = log::set_logger(&TEST_LOGGER); log::set_max_level(LevelFilter::Debug); @@ -460,7 +465,7 @@ mod tests { } #[tokio::test] - #[cfg(feature = "tokio-runtime")] + #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] async fn redelivery() { let _result = log::set_logger(&TEST_LOGGER); log::set_max_level(LevelFilter::Debug); @@ -506,7 +511,7 @@ mod tests { } #[tokio::test] - #[cfg(feature = "tokio-runtime")] + #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] async fn batching() { let _result = log::set_logger(&TEST_LOGGER); log::set_max_level(LevelFilter::Debug); diff --git a/src/message.rs b/src/message.rs index fa4a484..0ab3a01 100644 --- a/src/message.rs +++ b/src/message.rs @@ -220,7 +220,7 @@ impl Message { /// tokio and async-std codec for Pulsar messages pub struct Codec; -#[cfg(feature = "tokio-runtime")] +#[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] impl tokio_util::codec::Encoder for Codec { type Error = ConnectionError; @@ -269,7 +269,7 @@ impl tokio_util::codec::Encoder for Codec { } } -#[cfg(feature = "tokio-runtime")] +#[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] impl tokio_util::codec::Decoder for Codec { type Item = Message; type Error = ConnectionError; @@ -324,7 +324,7 @@ impl tokio_util::codec::Decoder for Codec { } } -#[cfg(feature = "async-std-runtime")] +#[cfg(any(feature = "async-std-runtime", feature = "async-std-rustls-runtime"))] impl asynchronous_codec::Encoder for Codec { type Item = Message; type Error = ConnectionError; @@ -374,7 +374,7 @@ impl asynchronous_codec::Encoder for Codec { } } -#[cfg(feature = "async-std-runtime")] +#[cfg(any(feature = "async-std-runtime", feature = "async-std-rustls-runtime"))] impl asynchronous_codec::Decoder for Codec { type Item = Message; type Error = ConnectionError; From cfba9e544ae98035e2de95ed83e73711d67120af Mon Sep 17 00:00:00 2001 From: Paolo Barbolini Date: Mon, 7 Aug 2023 07:28:46 +0200 Subject: [PATCH 3/8] build: disable oldtime feature of chrono (#289) --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 12633c8..2087a3e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,7 +22,7 @@ nom = { version="^7.1.3", default-features=false, features=["alloc"] } prost = "^0.11.9" prost-derive = "^0.11.9" rand = "^0.8.5" -chrono = "^0.4.26" +chrono = { version = "^0.4.26", default-features = false, features = ["clock", "std"] } futures-timer = "^3.0.2" log = "^0.4.19" url = "^2.4.0" From 7054b8a1dc5852126b69829d66efeec3589ef397 Mon Sep 17 00:00:00 2001 From: Stellit Woo Date: Thu, 10 Aug 2023 10:36:02 +0800 Subject: [PATCH 4/8] fix: ordering key assignment (#290) Co-authored-by: Qian Wu --- src/producer.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/producer.rs b/src/producer.rs index 9a8c00e..bbb5dbc 100644 --- a/src/producer.rs +++ b/src/producer.rs @@ -1033,8 +1033,8 @@ impl<'a, T, Exe: Executor> MessageBuilder<'a, T, Exe> { /// sets the message's ordering key for key_shared subscription #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] - pub fn with_ordering_key>(mut self, partition_key: S) -> Self { - self.partition_key = Some(partition_key.into()); + pub fn with_ordering_key>>(mut self, ordering_key: S) -> Self { + self.ordering_key = Some(ordering_key.into()); self } From b030ef9ceae9cef9ad60d8d41a6c29f7be7e7437 Mon Sep 17 00:00:00 2001 From: cirias Date: Fri, 11 Aug 2023 19:51:33 +0800 Subject: [PATCH 5/8] chore: Implement Sync for Consumer (#291) Co-authored-by: Sirius --- src/connection.rs | 9 +++++---- src/consumer/multi.rs | 7 ++++--- src/error.rs | 2 +- src/executor.rs | 2 +- 4 files changed, 11 insertions(+), 9 deletions(-) diff --git a/src/connection.rs b/src/connection.rs index 2faae5c..2e82439 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -871,10 +871,11 @@ impl Connection { match auth { Some(m_auth) => { let mut auth_guard = m_auth.lock().await; - Ok(Some(Authentication { - name: auth_guard.auth_method_name(), - data: auth_guard.auth_data().await?, - })) + let name = auth_guard.auth_method_name(); + // wrap the future of auth_data() with Shared so that it implements Sync + let data_fut = auth_guard.auth_data().shared(); + let data = data_fut.await?; + Ok(Some(Authentication { name, data })) } None => Ok(None), } diff --git a/src/consumer/multi.rs b/src/consumer/multi.rs index fad4c75..cd8e0bd 100644 --- a/src/consumer/multi.rs +++ b/src/consumer/multi.rs @@ -30,9 +30,10 @@ pub struct MultiTopicConsumer { pub(super) topics: VecDeque, pub(super) existing_topics: VecDeque, #[allow(clippy::type_complexity)] - pub(super) new_consumers: - Option>, Error>> + Send>>>, - pub(super) refresh: Pin + Send>>, + pub(super) new_consumers: Option< + Pin>, Error>> + Send + Sync>>, + >, + pub(super) refresh: Pin + Send + Sync>>, pub(super) config: ConsumerConfig, // Stats on disconnected consumers to keep metrics correct pub(super) disc_messages_received: u64, diff --git a/src/error.rs b/src/error.rs index 31ed7fb..182dbf5 100644 --- a/src/error.rs +++ b/src/error.rs @@ -406,7 +406,7 @@ impl std::error::Error for ServiceDiscoveryError { } } -#[derive(Debug)] +#[derive(Clone, Debug)] pub enum AuthenticationError { Custom(String), } diff --git a/src/executor.rs b/src/executor.rs index b980986..c51797d 100644 --- a/src/executor.rs +++ b/src/executor.rs @@ -252,7 +252,7 @@ pub enum Delay { Tokio(tokio::time::Sleep), /// wrapper around async-std's `Delay` #[cfg(any(feature = "async-std-runtime", features = "async-std-rustls-runtime"))] - AsyncStd(Pin + Send>>), + AsyncStd(Pin + Send + Sync>>), } impl Future for Delay { From b2b54367165f91d25d6c41c154d6fc5bbc124b8e Mon Sep 17 00:00:00 2001 From: tison Date: Mon, 11 Sep 2023 16:40:03 +0800 Subject: [PATCH 6/8] fix: multiple clippy warnings (#296) * fix: feature is misspelled as features * fix feature flags * fix deprecation warning Signed-off-by: tison --- .github/workflows/rust.yml | 4 +++- Cargo.toml | 4 ++-- src/connection.rs | 5 +++-- src/executor.rs | 16 ++++++++-------- src/lib.rs | 6 +++--- 5 files changed, 19 insertions(+), 16 deletions(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 7db76fc..6b6e635 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -15,7 +15,9 @@ jobs: - name: Build run: cargo build --features protobuf-src - name: Check Clippy - run: cargo clippy --tests --all-features -- -D warnings + run: | + cargo clippy --tests --features telemetry,protobuf-src -- -D warnings + cargo clippy --tests --no-default-features --features compression,tokio-rustls-runtime,async-std-rustls-runtime,auth-oauth2,telemetry,protobuf-src -- -D warnings - name: Install nightly rustfmt run: rustup toolchain install nightly --component rustfmt - name: Check format diff --git a/Cargo.toml b/Cargo.toml index 2087a3e..a88b56f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,7 +31,7 @@ bit-vec = "^0.6.3" futures = "^0.3.28" futures-io = "^0.3.28" native-tls = { version = "^0.2.11", optional = true } -rustls = { version = "^0.21.5", optional = true } +rustls = { version = "^0.21.6", optional = true } webpki-roots = { version = "^0.25.1", optional = true } pem = "^3.0.0" tokio = { version = "^1.29.1", features = ["rt", "net", "time"], optional = true } @@ -66,7 +66,7 @@ prost-build = "^0.11.9" protobuf-src = { version = "1.1.0", optional = true } [features] -default = [ "compression", "tokio-runtime", "async-std-runtime", "auth-oauth2"] +default = [ "compression", "tokio-runtime", "async-std-runtime", "auth-oauth2" ] compression = [ "lz4", "flate2", "zstd", "snap" ] tokio-runtime = [ "tokio", "tokio-util", "native-tls", "tokio-native-tls" ] tokio-rustls-runtime = ["tokio", "tokio-util", "tokio-rustls", "rustls", "webpki-roots" ] diff --git a/src/connection.rs b/src/connection.rs index 2e82439..0adcf10 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -882,6 +882,7 @@ impl Connection { } #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] + #[allow(unused_variables)] // allow_insecure_connection and tls_hostname_verification_enabled are native-tls only async fn prepare_stream( connection_id: Uuid, address: SocketAddr, @@ -964,7 +965,7 @@ impl Connection { }, ); - root_store.add_server_trust_anchors(trust_anchors.into_iter()); + root_store.add_trust_anchors(trust_anchors.into_iter()); let config = rustls::ClientConfig::builder() .with_safe_default_cipher_suites() .with_safe_default_kx_groups() @@ -1075,7 +1076,7 @@ impl Connection { }, ); - root_store.add_server_trust_anchors(trust_anchors.into_iter()); + root_store.add_trust_anchors(trust_anchors.into_iter()); let config = rustls::ClientConfig::builder() .with_safe_default_cipher_suites() .with_safe_default_kx_groups() diff --git a/src/executor.rs b/src/executor.rs index c51797d..cb5d7fd 100644 --- a/src/executor.rs +++ b/src/executor.rs @@ -78,11 +78,11 @@ impl Executor for TokioExecutor { } /// Wrapper for the async-std executor -#[cfg(any(feature = "async-std-runtime", features = "async-std-rustls-runtime"))] +#[cfg(any(feature = "async-std-runtime", feature = "async-std-rustls-runtime"))] #[derive(Clone, Debug)] pub struct AsyncStdExecutor; -#[cfg(any(feature = "async-std-runtime", features = "async-std-rustls-runtime"))] +#[cfg(any(feature = "async-std-runtime", feature = "async-std-rustls-runtime"))] impl Executor for AsyncStdExecutor { #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] fn spawn(&self, f: Pin + Send>>) -> Result<(), ()> { @@ -153,7 +153,7 @@ pub enum JoinHandle { #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] Tokio(tokio::task::JoinHandle), /// wrapper for async-std's `JoinHandle` - #[cfg(any(feature = "async-std-runtime", features = "async-std-rustls-runtime"))] + #[cfg(any(feature = "async-std-runtime", feature = "async-std-rustls-runtime"))] AsyncStd(async_std::task::JoinHandle), // here to avoid a compilation error since T is not used #[cfg(all( @@ -176,7 +176,7 @@ impl Future for JoinHandle { Poll::Pending => Poll::Pending, Poll::Ready(v) => Poll::Ready(v.ok()), }, - #[cfg(any(feature = "async-std-runtime", features = "async-std-rustls-runtime"))] + #[cfg(any(feature = "async-std-runtime", feature = "async-std-rustls-runtime"))] JoinHandle::AsyncStd(j) => match Pin::new(j).poll(cx) { Poll::Pending => Poll::Pending, Poll::Ready(v) => Poll::Ready(Some(v)), @@ -200,7 +200,7 @@ pub enum Interval { #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] Tokio(tokio::time::Interval), /// wrapper for async-std's interval - #[cfg(any(feature = "async-std-runtime", features = "async-std-rustls-runtime"))] + #[cfg(any(feature = "async-std-runtime", feature = "async-std-rustls-runtime"))] AsyncStd(async_std::stream::Interval), #[cfg(all( not(feature = "tokio-runtime"), @@ -226,7 +226,7 @@ impl Stream for Interval { Poll::Pending => Poll::Pending, Poll::Ready(_) => Poll::Ready(Some(())), }, - #[cfg(any(feature = "async-std-runtime", features = "async-std-rustls-runtime"))] + #[cfg(any(feature = "async-std-runtime", feature = "async-std-rustls-runtime"))] Interval::AsyncStd(j) => match Pin::new_unchecked(j).poll_next(cx) { Poll::Pending => Poll::Pending, Poll::Ready(v) => Poll::Ready(v), @@ -251,7 +251,7 @@ pub enum Delay { #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] Tokio(tokio::time::Sleep), /// wrapper around async-std's `Delay` - #[cfg(any(feature = "async-std-runtime", features = "async-std-rustls-runtime"))] + #[cfg(any(feature = "async-std-runtime", feature = "async-std-rustls-runtime"))] AsyncStd(Pin + Send + Sync>>), } @@ -267,7 +267,7 @@ impl Future for Delay { Poll::Pending => Poll::Pending, Poll::Ready(_) => Poll::Ready(()), }, - #[cfg(any(feature = "async-std-runtime", features = "async-std-rustls-runtime"))] + #[cfg(any(feature = "async-std-runtime", feature = "async-std-rustls-runtime"))] Delay::AsyncStd(j) => match Pin::new_unchecked(j).poll(cx) { Poll::Pending => Poll::Pending, Poll::Ready(_) => Poll::Ready(()), diff --git a/src/lib.rs b/src/lib.rs index 514973c..460c8f1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -160,9 +160,9 @@ extern crate prost_derive; #[macro_use] extern crate serde; -#[cfg(all(features = "tokio-rustls-runtime", features = "tokio-runtime"))] +#[cfg(all(feature = "tokio-rustls-runtime", feature = "tokio-runtime"))] compile_error!("You have selected both features \"tokio-rustls-runtime\" and \"tokio-runtime\" which are exclusive, please choose one of them"); -#[cfg(all(features = "async-std-rustls-runtime", features = "async-std-runtime"))] +#[cfg(all(feature = "async-std-rustls-runtime", feature = "async-std-runtime"))] compile_error!("You have selected both features \"async-std-rustls-runtime\" and \"async-std-runtime\" which are exclusive, please choose one of them"); pub use client::{DeserializeMessage, Pulsar, PulsarBuilder, SerializeMessage}; @@ -172,7 +172,7 @@ pub use connection_manager::{ }; pub use consumer::{Consumer, ConsumerBuilder, ConsumerOptions}; pub use error::Error; -#[cfg(any(feature = "async-std-runtime", features = "async-std-rustls-runtime"))] +#[cfg(any(feature = "async-std-runtime", feature = "async-std-rustls-runtime"))] pub use executor::AsyncStdExecutor; pub use executor::Executor; #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] From 871a37fc7062d4313dc42a50df43cb9b886ea2b2 Mon Sep 17 00:00:00 2001 From: tison Date: Mon, 11 Sep 2023 15:00:29 +0800 Subject: [PATCH 7/8] chore: Remove outdated CONTRIBUTING file We should write a new CONTRIBUTING file. But let's remove this outdated one first. Signed-off-by: tison --- CONTRIBUTING.md | 9 --------- 1 file changed, 9 deletions(-) delete mode 100644 CONTRIBUTING.md diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md deleted file mode 100644 index dab6014..0000000 --- a/CONTRIBUTING.md +++ /dev/null @@ -1,9 +0,0 @@ -# Contributing to pulsar-rs - -Thank you for your interest in contributing to pulsar-rs! There are many ways to contribute and we appreciate all of them. - -As of 2022/08, maintainers of this repo have agreed to stabilize this repo since it's quality is not upto java pulsar client. Thus, issues that are beyond pulsar version 2.10.x are welcomed but will not be prioritized until 2022/12. - -If you would like to contribute to this crate, solving issues listed on this [page](https://github.com/streamnative/pulsar-rs/issues/224) is strongly encouraged. - -Thanks! From 2faad19b71f033c3b58b908a59dd79936bd43fae Mon Sep 17 00:00:00 2001 From: tison Date: Mon, 11 Sep 2023 15:25:03 +0800 Subject: [PATCH 8/8] docs: Add history and About section Signed-off-by: tison --- README.md | 38 +++++++++++++++++++++++--------------- 1 file changed, 23 insertions(+), 15 deletions(-) diff --git a/README.md b/README.md index 59f46f7..8412627 100644 --- a/README.md +++ b/README.md @@ -1,24 +1,22 @@ -## pulsar-rs: Future-based Rust client for [Apache Pulsar](https://pulsar.apache.org/) +# pulsar-rs: Future-based Rust client for [Apache Pulsar](https://pulsar.apache.org/) [![crates](https://img.shields.io/crates/v/pulsar.svg)](https://crates.io/crates/pulsar) [![docs](https://img.shields.io/docsrs/pulsar)](https://docs.rs/pulsar) -[Documentation](https://docs.rs/pulsar) - This is a pure Rust client for Apache Pulsar that does not depend on the C++ Pulsar library. It provides an async/await based API, compatible with [Tokio](https://tokio.rs/) and [async-std](https://async.rs/). Features: -- URL based (`pulsar://` and `pulsar+ssl://`) connections with DNS lookup -- multi topic consumers (based on a regex or list) -- TLS connection -- configurable executor (Tokio or async-std) -- automatic reconnection with exponential back off -- message batching -- compression with LZ4, zlib, zstd or Snappy (can be deactivated with Cargo features) -- telemetry using [tracing](https://github.com/tokio-rs/tracing) crate (can be activated with Cargo features) +- URL based (`pulsar://` and `pulsar+ssl://`) connections with DNS lookup; +- Multi topic consumers (based on a regex or list); +- TLS connection; +- Configurable executor (Tokio or async-std); +- Automatic reconnection with exponential back off; +- Message batching; +- Compression with LZ4, zlib, zstd or Snappy (can be deactivated with Cargo features); +- Telemetry using [tracing](https://github.com/tokio-rs/tracing) crate (can be activated with Cargo features). -### Getting Started +## Getting Started Add the following dependencies in your `Cargo.toml`: @@ -34,7 +32,7 @@ Try out [examples](examples): - [consumer](examples/consumer.rs) - [reader](examples/reader.rs) -### Project Maintainers +## Project Maintainers - [@CleverAkanoa](https://github.com/CleverAkanoa) - [@DonghunLouisLee](https://github.com/DonghunLouisLee) @@ -45,7 +43,7 @@ Try out [examples](examples): - [@stearnsc](https://github.com/stearnsc) - [@tisonkun](https://github.com/tisonkun) -### Contribution +## Contribution This project welcomes your PR and issues. For example, refactoring, adding features, correcting English, etc. @@ -55,8 +53,18 @@ Thanks to all the people who already contributed! -### License +## License This library is licensed under the terms of both the MIT license and the Apache License (Version 2.0), and may include packages written by third parties which carry their own copyright notices and license terms. See [LICENSE-APACHE](LICENSE-APACHE), [LICENSE-MIT](LICENSE-MIT), and [COPYRIGHT](COPYRIGHT) for details. + +## History + +This project is originally created by [@stearnsc](https://github.com/stearnsc) and others at [Wyyerd](https://github.com/wyyerd) at 2018. Later at 2022, the orginal creators [decided to transfer the repository to StreamNative](https://github.com/streamnative-oss/sn-pulsar-rs/issues/20). + +Currently, this project is actively maintained under the StreamNative organization with a diverse [maintainers group](#project-maintainers). + +## About StreamNative + +Founded in 2019 by the original creators of Apache Pulsar, [StreamNative](https://streamnative.io/) is one of the leading contributors to the open-source Apache Pulsar project. We have helped engineering teams worldwide make the move to Pulsar with [StreamNative Cloud](https://streamnative.io/product), a fully managed service to help teams accelerate time-to-production.