Skip to content

Commit

Permalink
bump prost and prost-build to 0.13.1 (#316)
Browse files Browse the repository at this point in the history
remove obsolete TryFrom implementation on base_command::Type

Co-authored-by: Emmanuel Bosquet <[email protected]>
  • Loading branch information
Keksoj and Emmanuel Bosquet authored Jul 25, 2024
1 parent 918c25a commit 60490ba
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 77 deletions.
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ async-channel = "2"
bytes = "^1.4.0"
crc = "^3.0.1"
nom = { version="^7.1.3", default-features=false, features=["alloc"] }
prost = "^0.11.9"
prost-derive = "^0.11.9"
prost = "^0.13.1"
prost-derive = "^0.13.1"
rand = "^0.8.5"
chrono = { version = "^0.4.26", default-features = false, features = ["clock", "std"] }
futures-timer = "^3.0.2"
Expand Down Expand Up @@ -63,7 +63,7 @@ env_logger = "^0.10.0"
tokio = { version = "^1.29.1", features = ["macros", "rt-multi-thread"] }

[build-dependencies]
prost-build = "^0.11.9"
prost-build = "^0.13.1"
protobuf-src = { version = "1.1.0", optional = true }

[features]
Expand Down
16 changes: 7 additions & 9 deletions src/consumer/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,15 +387,13 @@ impl<Exe: Executor> ConsumerEngine<Exe> {
) -> Result<(), Error> {
let compression = match payload.metadata.compression {
None => proto::CompressionType::None,
Some(compression) => {
proto::CompressionType::from_i32(compression).ok_or_else(|| {
error!("unknown compression type: {}", compression);
Error::Consumer(ConsumerError::Io(std::io::Error::new(
ErrorKind::Other,
format!("unknown compression type: {compression}"),
)))
})?
}
Some(compression) => proto::CompressionType::try_from(compression).map_err(|err| {
error!("unknown compression type: {}", compression);
Error::Consumer(ConsumerError::Io(std::io::Error::new(
ErrorKind::Other,
format!("unknown compression type {compression}: {err}"),
)))
})?,
};

let payload = match compression {
Expand Down
69 changes: 4 additions & 65 deletions src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,10 +204,11 @@ impl Message {
type_
);
}
Err(()) => {
Err(unknown_enum) => {
warn!(
"Received BaseCommand of unexpected type: {}",
self.command.r#type
"Received BaseCommand of unexpected type {}: {}",
self.command.r#type,
unknown_enum.to_string()
);
}
}
Expand Down Expand Up @@ -560,53 +561,6 @@ pub mod proto {
}
}

impl TryFrom<i32> for base_command::Type {
type Error = ();

#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
fn try_from(value: i32) -> Result<Self, ()> {
match value {
2 => Ok(base_command::Type::Connect),
3 => Ok(base_command::Type::Connected),
4 => Ok(base_command::Type::Subscribe),
5 => Ok(base_command::Type::Producer),
6 => Ok(base_command::Type::Send),
7 => Ok(base_command::Type::SendReceipt),
8 => Ok(base_command::Type::SendError),
9 => Ok(base_command::Type::Message),
10 => Ok(base_command::Type::Ack),
11 => Ok(base_command::Type::Flow),
12 => Ok(base_command::Type::Unsubscribe),
13 => Ok(base_command::Type::Success),
14 => Ok(base_command::Type::Error),
15 => Ok(base_command::Type::CloseProducer),
16 => Ok(base_command::Type::CloseConsumer),
17 => Ok(base_command::Type::ProducerSuccess),
18 => Ok(base_command::Type::Ping),
19 => Ok(base_command::Type::Pong),
20 => Ok(base_command::Type::RedeliverUnacknowledgedMessages),
21 => Ok(base_command::Type::PartitionedMetadata),
22 => Ok(base_command::Type::PartitionedMetadataResponse),
23 => Ok(base_command::Type::Lookup),
24 => Ok(base_command::Type::LookupResponse),
25 => Ok(base_command::Type::ConsumerStats),
26 => Ok(base_command::Type::ConsumerStatsResponse),
27 => Ok(base_command::Type::ReachedEndOfTopic),
28 => Ok(base_command::Type::Seek),
29 => Ok(base_command::Type::GetLastMessageId),
30 => Ok(base_command::Type::GetLastMessageIdResponse),
31 => Ok(base_command::Type::ActiveConsumerChange),
32 => Ok(base_command::Type::GetTopicsOfNamespace),
33 => Ok(base_command::Type::GetTopicsOfNamespaceResponse),
34 => Ok(base_command::Type::GetSchema),
35 => Ok(base_command::Type::GetSchemaResponse),
36 => Ok(base_command::Type::AuthChallenge),
37 => Ok(base_command::Type::AuthResponse),
_ => Err(()),
}
}
}

impl From<prost::EncodeError> for ConnectionError {
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
fn from(e: prost::EncodeError) -> Self {
Expand All @@ -623,8 +577,6 @@ impl From<prost::DecodeError> for ConnectionError {

#[cfg(test)]
mod tests {
use std::convert::TryFrom;

use bytes::BytesMut;
use tokio_util::codec::{Decoder, Encoder};

Expand Down Expand Up @@ -679,17 +631,4 @@ mod tests {
Codec.encode(message, &mut output).unwrap();
assert_eq!(&output, input);
}

#[test]
fn base_command_type_parsing() {
use super::base_command::Type;
let mut successes = 0;
for i in 0..40 {
if let Ok(type_) = Type::try_from(i) {
successes += 1;
assert_eq!(type_ as i32, i);
}
}
assert_eq!(successes, 36);
}
}

0 comments on commit 60490ba

Please sign in to comment.