Skip to content

Commit

Permalink
chore: update dependencies and use send_non_blocking (#320)
Browse files Browse the repository at this point in the history
* chore: update dependencies
* Replace async-rustls in favor of futures-rustls
* Organize dependencies using lexicography sort
* refactor(examples): use non deprecated send_non_blocking method
* doc(producer): update documentation to use send_non_blocking

Signed-off-by: Florentin Dubois <[email protected]>
  • Loading branch information
FlorentinDUBOIS authored Aug 1, 2024
1 parent 1b72e08 commit 5bb283a
Show file tree
Hide file tree
Showing 10 changed files with 89 additions and 121 deletions.
96 changes: 48 additions & 48 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,63 +16,63 @@ description = "Rust client for Apache Pulsar"
keywords = ["pulsar", "api", "client"]

[dependencies]
async-channel = "2"
bytes = "^1.4.0"
crc = "^3.0.1"
nom = { version="^7.1.3", default-features=false, features=["alloc"] }
async-channel = "^2.3.1"
async-trait = "^0.1.81"
async-std = { version = "^1.12.0", features = ["attributes", "unstable"], optional = true }
async-native-tls = { version = "^0.5.0", optional = true }
asynchronous-codec = { version = "^0.7.0", optional = true }
bytes = "^1.6.1"
bit-vec = "^0.8.0"
chrono = { version = "^0.4.38", default-features = false, features = ["clock", "std"] }
crc = "^3.2.1"
data-url = { version = "^0.3.1", optional = true }
flate2 = { version = "^1.0.30", optional = true }
futures = "^0.3.30"
futures-io = "^0.3.30"
futures-timer = "^3.0.3"
futures-rustls = { version = "^0.26.0", optional = true } # replacement of crate async-rustls (also a fork of tokio-rustls)
log = "^0.4.22"
lz4 = { version = "^1.26.0", optional = true }
native-tls = { version = "^0.2.12", optional = true }
nom = { version = "^7.1.3", default-features = false, features = ["alloc"] }
openidconnect = { version = "^3.5.0", optional = true }
oauth2 = { version = "^4.4.1", optional = true }
pem = "^3.0.4"
prost = "^0.13.1"
prost-derive = "^0.13.1"
rand = "^0.8.5"
chrono = { version = "^0.4.26", default-features = false, features = ["clock", "std"] }
futures-timer = "^3.0.2"
log = "^0.4.19"
url = "^2.4.0"
regex = "^1.9.1"
bit-vec = "^0.6.3"
futures = "^0.3.28"
futures-io = "^0.3.28"
native-tls = { version = "^0.2.11", optional = true }
rustls = { version = "^0.21.6", optional = true }
webpki-roots = { version = "^0.25.1", optional = true }
pem = "^3.0.0"
tokio = { version = "^1.29.1", features = ["rt", "net", "time"], optional = true }
tokio-util = { version = "^0.7.8", features = ["codec"], optional = true }
tokio-rustls = { version = "^0.24.1", optional = true }
regex = "^1.10.5"
rustls = { version = "^0.23.12", optional = true }
snap = { version = "^1.1.1", optional = true }
serde = { version = "^1.0.204", features = ["derive"], optional = true }
serde_json = { version = "^1.0.121", optional = true }
tokio = { version = "^1.39.2", features = ["rt", "net", "time"], optional = true }
tokio-util = { version = "^0.7.11", features = ["codec"], optional = true }
tokio-rustls = { version = "^0.26.0", optional = true }
tokio-native-tls = { version = "^0.3.1", optional = true }
async-std = { version = "^1.12.0", features = [ "attributes", "unstable" ], optional = true }
asynchronous-codec = { version = "^0.6.2", optional = true }
async-rustls = { version = "^0.4.0", optional = true }
async-native-tls = { version = "^0.5.0", optional = true }
lz4 = { version = "^1.24.0", optional = true }
flate2 = { version = "^1.0.26", optional = true }
zstd = { version = "^0.12.4", optional = true }
snap = { version = "^1.1.0", optional = true }
openidconnect = { version = "^3.3.0", optional = true }
oauth2 = { version = "^4.4.1", optional = true }
serde = { version = "^1.0.175", features = ["derive"], optional = true }
serde_json = { version = "^1.0.103", optional = true }
tracing = { version = "^0.1.37", optional = true }
async-trait = "^0.1.72"
data-url = { version = "^0.3.0", optional = true }
uuid = { version = "^1.4.1", features = ["v4", "fast-rng"] }
tracing = { version = "^0.1.40", optional = true }
url = "^2.5.2"
uuid = { version = "^1.10.0", features = ["v4", "fast-rng"] }
webpki-roots = { version = "^0.26.3", optional = true }
zstd = { version = "^0.13.2", optional = true }

[dev-dependencies]
serde = { version = "^1.0.175", features = ["derive"] }
serde_json = "^1.0.103"
env_logger = "^0.10.0"
tokio = { version = "^1.29.1", features = ["macros", "rt-multi-thread"] }
env_logger = "^0.11.5"
serde = { version = "^1.0.204", features = ["derive"] }
serde_json = "^1.0.121"
tokio = { version = "^1.39.2", features = ["macros", "rt-multi-thread"] }

[build-dependencies]
prost-build = "^0.13.1"
protobuf-src = { version = "1.1.0", optional = true }
protobuf-src = { version = "^2.1.0", optional = true }

[features]
default = [ "compression", "tokio-runtime", "async-std-runtime", "auth-oauth2" ]
compression = [ "lz4", "flate2", "zstd", "snap" ]
tokio-runtime = [ "tokio", "tokio-util", "native-tls", "tokio-native-tls" ]
tokio-rustls-runtime = ["tokio", "tokio-util", "tokio-rustls", "rustls", "webpki-roots" ]
async-std-runtime = [ "async-std", "asynchronous-codec", "native-tls", "async-native-tls" ]
async-std-rustls-runtime = ["async-std", "asynchronous-codec", "async-rustls", "rustls", "webpki-roots" ]
auth-oauth2 = [ "openidconnect", "oauth2", "serde", "serde_json", "data-url" ]
telemetry = ["tracing"]
async-std-runtime = ["async-std", "asynchronous-codec", "native-tls", "async-native-tls"]
async-std-rustls-runtime = ["async-std", "asynchronous-codec", "futures-rustls", "rustls", "webpki-roots"]
auth-oauth2 = ["openidconnect", "oauth2", "serde", "serde_json", "data-url"]
compression = ["lz4", "flate2", "zstd", "snap"]
default = ["compression", "tokio-runtime", "async-std-runtime", "auth-oauth2"]
protobuf-src = ["dep:protobuf-src"]
telemetry = ["tracing"]
tokio-runtime = ["tokio", "tokio-util", "native-tls", "tokio-native-tls"]
tokio-rustls-runtime = ["tokio", "tokio-util", "tokio-rustls", "rustls", "webpki-roots"]
2 changes: 1 addition & 1 deletion examples/batching.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ async fn main() -> Result<(), pulsar::Error> {
loop {
println!("will send");
let receipt_rx = producer
.send(TestData {
.send_non_blocking(TestData {
data: "data".to_string(),
})
.await
Expand Down
2 changes: 1 addition & 1 deletion examples/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ async fn main() -> Result<(), pulsar::Error> {
let mut counter = 0usize;
loop {
producer
.send(TestData {
.send_non_blocking(TestData {
data: "data".to_string(),
})
.await?
Expand Down
2 changes: 1 addition & 1 deletion examples/round_trip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ async fn main() -> Result<(), pulsar::Error> {
let mut counter = 0usize;
loop {
producer
.send(TestData {
.send_non_blocking(TestData {
data: "data".to_string(),
})
.await
Expand Down
64 changes: 16 additions & 48 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,8 @@ use futures::{
task::{Context, Poll},
Future, FutureExt, Sink, SinkExt, Stream, StreamExt,
};
#[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))]
use native_tls::Certificate;
use proto::MessageIdData;
use rand::{seq::SliceRandom, thread_rng};
#[cfg(all(
any(feature = "tokio-rustls-runtime", feature = "async-std-rustls-runtime"),
not(any(feature = "tokio-runtime", feature = "async-std-runtime"))
))]
use rustls::Certificate;
use url::Url;
use uuid::Uuid;

Expand All @@ -41,6 +34,7 @@ use crate::{
BaseCommand, Codec, Message,
},
producer::{self, ProducerOptions},
Certificate,
};

pub(crate) enum Register {
Expand Down Expand Up @@ -983,35 +977,22 @@ impl<Exe: Executor> Connection<Exe> {
if tls {
let stream = tokio::net::TcpStream::connect(&address).await?;
let mut root_store = rustls::RootCertStore::empty();

root_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
for certificate in certificate_chain {
root_store.add(certificate)?;
root_store.add(certificate.clone())?;
}

let trust_anchors = webpki_roots::TLS_SERVER_ROOTS.iter().fold(
vec![],
|mut acc, trust_anchor| {
acc.push(
rustls::OwnedTrustAnchor::from_subject_spki_name_constraints(
trust_anchor.subject,
trust_anchor.spki,
trust_anchor.name_constraints,
),
);
acc
},
);

root_store.add_trust_anchors(trust_anchors.into_iter());
let config = rustls::ClientConfig::builder()
.with_safe_default_cipher_suites()
.with_safe_default_kx_groups()
.with_safe_default_protocol_versions()?
.with_root_certificates(root_store)
.with_no_client_auth();

let cx = tokio_rustls::TlsConnector::from(Arc::new(config));
let stream = cx
.connect(rustls::ServerName::try_from(hostname.as_str())?, stream)
.connect(
rustls::pki_types::ServerName::try_from(hostname.as_str())?.to_owned(),
stream,
)
.await
.map(|stream| tokio_util::codec::Framed::new(stream, Codec))?;

Expand Down Expand Up @@ -1099,35 +1080,22 @@ impl<Exe: Executor> Connection<Exe> {
if tls {
let stream = async_std::net::TcpStream::connect(&address).await?;
let mut root_store = rustls::RootCertStore::empty();

root_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
for certificate in certificate_chain {
root_store.add(certificate)?;
root_store.add(certificate.clone())?;
}

let trust_anchors = webpki_roots::TLS_SERVER_ROOTS.iter().fold(
vec![],
|mut acc, trust_anchor| {
acc.push(
rustls::OwnedTrustAnchor::from_subject_spki_name_constraints(
trust_anchor.subject,
trust_anchor.spki,
trust_anchor.name_constraints,
),
);
acc
},
);

root_store.add_trust_anchors(trust_anchors.into_iter());
let config = rustls::ClientConfig::builder()
.with_safe_default_cipher_suites()
.with_safe_default_kx_groups()
.with_safe_default_protocol_versions()?
.with_root_certificates(root_store)
.with_no_client_auth();

let connector = async_rustls::TlsConnector::from(Arc::new(config));
let connector = futures_rustls::TlsConnector::from(Arc::new(config));
let stream = connector
.connect(rustls::ServerName::try_from(hostname.as_str())?, stream)
.connect(
rustls::pki_types::ServerName::try_from(hostname.as_str())?.to_owned(),
stream,
)
.await
.map(|stream| asynchronous_codec::Framed::new(stream, Codec))?;

Expand Down
20 changes: 6 additions & 14 deletions src/connection_manager.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,10 @@
use std::{collections::HashMap, sync::Arc, time::Duration};

use futures::{channel::oneshot, lock::Mutex};
#[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))]
use native_tls::Certificate;
use rand::Rng;
#[cfg(all(
any(feature = "tokio-rustls-runtime", feature = "async-std-rustls-runtime"),
not(any(feature = "tokio-runtime", feature = "async-std-runtime"))
))]
use rustls::Certificate;
use url::Url;

use crate::{connection::Connection, error::ConnectionError, executor::Executor};
use crate::{connection::Connection, error::ConnectionError, executor::Executor, Certificate};

/// holds connection information for a broker
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
Expand Down Expand Up @@ -156,11 +149,10 @@ impl<Exe: Executor> ConnectionManager<Exe> {
None => vec![],
Some(certificate_chain) => {
let mut v = vec![];
for cert in pem::parse_many(certificate_chain)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?
.iter()
.rev()
{
let certificates = pem::parse_many(certificate_chain)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;

for cert in certificates.iter().rev() {
#[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))]
v.push(
Certificate::from_der(cert.contents())
Expand All @@ -174,7 +166,7 @@ impl<Exe: Executor> ConnectionManager<Exe> {
),
not(any(feature = "tokio-runtime", feature = "async-std-runtime"))
))]
v.push(Certificate(cert.contents().to_vec()));
v.push(Certificate::from(cert.contents().to_vec()));
}
v
}
Expand Down
6 changes: 3 additions & 3 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ pub enum ConnectionError {
))]
Tls(rustls::Error),
#[cfg(any(feature = "tokio-rustls-runtime", feature = "async-std-rustls-runtime"))]
DnsName(rustls::client::InvalidDnsNameError),
DnsName(rustls::pki_types::InvalidDnsNameError),
Authentication(AuthenticationError),
NotFound,
Canceled,
Expand Down Expand Up @@ -142,9 +142,9 @@ impl From<rustls::Error> for ConnectionError {
}

#[cfg(any(feature = "tokio-rustls-runtime", feature = "async-std-rustls-runtime"))]
impl From<rustls::client::InvalidDnsNameError> for ConnectionError {
impl From<rustls::pki_types::InvalidDnsNameError> for ConnectionError {
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
fn from(err: rustls::client::InvalidDnsNameError) -> Self {
fn from(err: rustls::pki_types::InvalidDnsNameError) -> Self {
ConnectionError::DnsName(err)
}
}
Expand Down
8 changes: 8 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,14 @@ pub mod reader;
mod retry_op;
mod service_discovery;

#[cfg(all(
any(feature = "tokio-rustls-runtime", feature = "async-std-rustls-runtime"),
not(any(feature = "tokio-runtime", feature = "async-std-runtime"))
))]
pub(crate) type Certificate = rustls::pki_types::CertificateDer<'static>;
#[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))]
pub(crate) type Certificate = native_tls::Certificate;

#[cfg(test)]
mod tests {
use std::{
Expand Down
2 changes: 1 addition & 1 deletion src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ impl tokio_util::codec::Decoder for Codec {

#[cfg(any(feature = "async-std-runtime", feature = "async-std-rustls-runtime"))]
impl asynchronous_codec::Encoder for Codec {
type Item = Message;
type Item<'a> = Message;
type Error = ConnectionError;

#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
Expand Down
8 changes: 4 additions & 4 deletions src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,8 @@ pub struct ProducerOptions {
/// # let message = "data".to_owned();
/// let pulsar: Pulsar<_> = Pulsar::builder(addr, TokioExecutor).build().await?;
/// let mut producer = pulsar.producer().with_name("name").build_multi_topic();
/// let send_1 = producer.send(topic, &message).await?;
/// let send_2 = producer.send(topic, &message).await?;
/// let send_1 = producer.send_non_blocking(topic, &message).await?;
/// let send_2 = producer.send_non_blocking(topic, &message).await?;
/// send_1.await?;
/// send_2.await?;
/// # Ok(())
Expand Down Expand Up @@ -398,8 +398,8 @@ impl<Exe: Executor> Producer<Exe> {
///
/// ```rust,no_run
/// # async fn run(mut producer: pulsar::Producer<pulsar::TokioExecutor>) -> Result<(), pulsar::Error> {
/// let f1 = producer.send("hello").await?;
/// let f2 = producer.send("world").await?;
/// let f1 = producer.send_non_blocking("hello").await?;
/// let f2 = producer.send_non_blocking("world").await?;
/// let receipt1 = f1.await?;
/// let receipt2 = f2.await?;
/// # Ok(())
Expand Down

0 comments on commit 5bb283a

Please sign in to comment.