From aec39a3d590c095539f2b45bf891f5f3dd8d05f8 Mon Sep 17 00:00:00 2001 From: Emmanuel Bosquet Date: Thu, 25 Jul 2024 14:33:52 +0200 Subject: [PATCH] bump prost and prost-build to 0.13.1 remove obsolete TryFrom implementation on base_command::Type --- Cargo.toml | 6 ++-- src/consumer/engine.rs | 16 +++++----- src/message.rs | 69 +++--------------------------------------- 3 files changed, 14 insertions(+), 77 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 1fd57f72..58e6301b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,8 +20,8 @@ async-channel = "2" bytes = "^1.4.0" crc = "^3.0.1" nom = { version="^7.1.3", default-features=false, features=["alloc"] } -prost = "^0.11.9" -prost-derive = "^0.11.9" +prost = "^0.13.1" +prost-derive = "^0.13.1" rand = "^0.8.5" chrono = { version = "^0.4.26", default-features = false, features = ["clock", "std"] } futures-timer = "^3.0.2" @@ -63,7 +63,7 @@ env_logger = "^0.10.0" tokio = { version = "^1.29.1", features = ["macros", "rt-multi-thread"] } [build-dependencies] -prost-build = "^0.11.9" +prost-build = "^0.13.1" protobuf-src = { version = "1.1.0", optional = true } [features] diff --git a/src/consumer/engine.rs b/src/consumer/engine.rs index 5da88dd0..0742a9e5 100644 --- a/src/consumer/engine.rs +++ b/src/consumer/engine.rs @@ -387,15 +387,13 @@ impl ConsumerEngine { ) -> Result<(), Error> { let compression = match payload.metadata.compression { None => proto::CompressionType::None, - Some(compression) => { - proto::CompressionType::from_i32(compression).ok_or_else(|| { - error!("unknown compression type: {}", compression); - Error::Consumer(ConsumerError::Io(std::io::Error::new( - ErrorKind::Other, - format!("unknown compression type: {compression}"), - ))) - })? - } + Some(compression) => proto::CompressionType::try_from(compression).map_err(|err| { + error!("unknown compression type: {}", compression); + Error::Consumer(ConsumerError::Io(std::io::Error::new( + ErrorKind::Other, + format!("unknown compression type {compression}: {err}"), + ))) + })?, }; let payload = match compression { diff --git a/src/message.rs b/src/message.rs index 0ab3a01c..c71c4ba1 100644 --- a/src/message.rs +++ b/src/message.rs @@ -204,10 +204,11 @@ impl Message { type_ ); } - Err(()) => { + Err(unknown_enum) => { warn!( - "Received BaseCommand of unexpected type: {}", - self.command.r#type + "Received BaseCommand of unexpected type {}: {}", + self.command.r#type, + unknown_enum.to_string() ); } } @@ -560,53 +561,6 @@ pub mod proto { } } -impl TryFrom for base_command::Type { - type Error = (); - - #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] - fn try_from(value: i32) -> Result { - match value { - 2 => Ok(base_command::Type::Connect), - 3 => Ok(base_command::Type::Connected), - 4 => Ok(base_command::Type::Subscribe), - 5 => Ok(base_command::Type::Producer), - 6 => Ok(base_command::Type::Send), - 7 => Ok(base_command::Type::SendReceipt), - 8 => Ok(base_command::Type::SendError), - 9 => Ok(base_command::Type::Message), - 10 => Ok(base_command::Type::Ack), - 11 => Ok(base_command::Type::Flow), - 12 => Ok(base_command::Type::Unsubscribe), - 13 => Ok(base_command::Type::Success), - 14 => Ok(base_command::Type::Error), - 15 => Ok(base_command::Type::CloseProducer), - 16 => Ok(base_command::Type::CloseConsumer), - 17 => Ok(base_command::Type::ProducerSuccess), - 18 => Ok(base_command::Type::Ping), - 19 => Ok(base_command::Type::Pong), - 20 => Ok(base_command::Type::RedeliverUnacknowledgedMessages), - 21 => Ok(base_command::Type::PartitionedMetadata), - 22 => Ok(base_command::Type::PartitionedMetadataResponse), - 23 => Ok(base_command::Type::Lookup), - 24 => Ok(base_command::Type::LookupResponse), - 25 => Ok(base_command::Type::ConsumerStats), - 26 => Ok(base_command::Type::ConsumerStatsResponse), - 27 => Ok(base_command::Type::ReachedEndOfTopic), - 28 => Ok(base_command::Type::Seek), - 29 => Ok(base_command::Type::GetLastMessageId), - 30 => Ok(base_command::Type::GetLastMessageIdResponse), - 31 => Ok(base_command::Type::ActiveConsumerChange), - 32 => Ok(base_command::Type::GetTopicsOfNamespace), - 33 => Ok(base_command::Type::GetTopicsOfNamespaceResponse), - 34 => Ok(base_command::Type::GetSchema), - 35 => Ok(base_command::Type::GetSchemaResponse), - 36 => Ok(base_command::Type::AuthChallenge), - 37 => Ok(base_command::Type::AuthResponse), - _ => Err(()), - } - } -} - impl From for ConnectionError { #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] fn from(e: prost::EncodeError) -> Self { @@ -623,8 +577,6 @@ impl From for ConnectionError { #[cfg(test)] mod tests { - use std::convert::TryFrom; - use bytes::BytesMut; use tokio_util::codec::{Decoder, Encoder}; @@ -679,17 +631,4 @@ mod tests { Codec.encode(message, &mut output).unwrap(); assert_eq!(&output, input); } - - #[test] - fn base_command_type_parsing() { - use super::base_command::Type; - let mut successes = 0; - for i in 0..40 { - if let Ok(type_) = Type::try_from(i) { - successes += 1; - assert_eq!(type_ as i32, i); - } - } - assert_eq!(successes, 36); - } }