From 72203b16b958bb670dd207d2caae7240ab92ab32 Mon Sep 17 00:00:00 2001 From: pv42 Date: Fri, 30 Aug 2024 00:22:52 +0200 Subject: [PATCH 1/7] feat: add async connection mod with tcp, udp, file feat: add async peek reader feat: change async read fn to use AsyncPeekReader feat: add read_v1_msg_async, read_v2_msg_async_signed, write_versioned_msg_async_signed, write_v2_msg_async_signed --- mavlink-core/Cargo.toml | 9 +- mavlink-core/src/async_connection/file.rs | 83 +++++++ mavlink-core/src/async_connection/mod.rs | 137 +++++++++++ mavlink-core/src/async_connection/tcp.rs | 156 ++++++++++++ mavlink-core/src/async_connection/udp.rs | 277 ++++++++++++++++++++++ mavlink-core/src/async_peek_reader.rs | 147 ++++++++++++ mavlink-core/src/lib.rs | 185 ++++++++++++++- 7 files changed, 980 insertions(+), 14 deletions(-) create mode 100644 mavlink-core/src/async_connection/file.rs create mode 100644 mavlink-core/src/async_connection/mod.rs create mode 100644 mavlink-core/src/async_connection/tcp.rs create mode 100644 mavlink-core/src/async_connection/udp.rs create mode 100644 mavlink-core/src/async_peek_reader.rs diff --git a/mavlink-core/Cargo.toml b/mavlink-core/Cargo.toml index 64a20c938d..d51915905a 100644 --- a/mavlink-core/Cargo.toml +++ b/mavlink-core/Cargo.toml @@ -26,8 +26,9 @@ embedded-io-async = { version = "0.6.1", optional = true } serde = { version = "1.0.115", optional = true, features = ["derive"] } serde_arrays = { version = "0.1.0", optional = true } serial = { version = "0.4", optional = true } -tokio = { version = "1.0", default-features = false, features = ["io-util"], optional = true } +tokio = { version = "1.0", default-features = false, features = ["io-util", "net", "sync", "fs", ], optional = true } sha2 = { version = "0.10", optional = true } +async-trait = { version = "0.1.18", optional = true } [features] "std" = ["byteorder/std"] @@ -40,6 +41,10 @@ sha2 = { version = "0.10", optional = true } "embedded" = ["dep:embedded-io", "dep:embedded-io-async"] "embedded-hal-02" = ["dep:nb", "dep:embedded-hal-02"] "serde" = ["dep:serde", "dep:serde_arrays"] -"tokio-1" = ["dep:tokio"] +"tokio-1" = ["dep:tokio", "dep:async-trait"] "signing" = ["dep:sha2"] default = ["std", "tcp", "udp", "direct-serial", "serde"] + +[dev-dependencies] +# 1.39 tokio macros seems to be currently broken +tokio = { version = "1.0, <=1.38", default-features = false, features = ["io-util", "net", "sync", "fs","macros", "rt" ] } \ No newline at end of file diff --git a/mavlink-core/src/async_connection/file.rs b/mavlink-core/src/async_connection/file.rs new file mode 100644 index 0000000000..9b1c90af3a --- /dev/null +++ b/mavlink-core/src/async_connection/file.rs @@ -0,0 +1,83 @@ +use core::ops::DerefMut; + +use super::AsyncMavConnection; +use crate::error::{MessageReadError, MessageWriteError}; + +use crate::{async_peek_reader::AsyncPeekReader, MavHeader, MavlinkVersion, Message}; + +use tokio::fs::File; +use tokio::io; +use tokio::sync::Mutex; + +#[cfg(not(feature = "signing"))] +use crate::read_versioned_msg_async; + +#[cfg(feature = "signing")] +use crate::{read_versioned_msg_async_signed, SigningConfig, SigningData}; + +/// File MAVLINK connection + +pub async fn open(file_path: &str) -> io::Result { + let file = File::open(file_path).await?; + Ok(AsyncFileConnection { + file: Mutex::new(AsyncPeekReader::new(file)), + protocol_version: MavlinkVersion::V2, + #[cfg(feature = "signing")] + signing_data: None, + }) +} + +pub struct AsyncFileConnection { + file: Mutex>, + protocol_version: MavlinkVersion, + + #[cfg(feature = "signing")] + signing_data: Option, +} + +#[async_trait::async_trait] +impl AsyncMavConnection for AsyncFileConnection { + async fn recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> { + let mut file = self.file.lock().await; + + loop { + #[cfg(not(feature = "signing"))] + let result = read_versioned_msg_async(file.deref_mut(), self.protocol_version).await; + #[cfg(feature = "signing")] + let result = read_versioned_msg_async_signed( + file.deref_mut(), + self.protocol_version, + self.signing_data.as_ref(), + ) + .await; + match result { + ok @ Ok(..) => { + return ok; + } + Err(MessageReadError::Io(e)) => { + if e.kind() == io::ErrorKind::UnexpectedEof { + return Err(MessageReadError::Io(e)); + } + } + _ => {} + } + } + } + + async fn send(&self, _header: &MavHeader, _data: &M) -> Result { + Ok(0) + } + + fn set_protocol_version(&mut self, version: MavlinkVersion) { + self.protocol_version = version; + } + + fn get_protocol_version(&self) -> MavlinkVersion { + self.protocol_version + } + + #[cfg(feature = "signing")] + fn setup_signing(&mut self, signing_data: Option) { + self.signing_data = signing_data.map(SigningData::from_config) + } +} diff --git a/mavlink-core/src/async_connection/mod.rs b/mavlink-core/src/async_connection/mod.rs new file mode 100644 index 0000000000..6ef4cc156a --- /dev/null +++ b/mavlink-core/src/async_connection/mod.rs @@ -0,0 +1,137 @@ +use tokio::io; + +use crate::{MavFrame, MavHeader, MavlinkVersion, Message}; + +#[cfg(feature = "tcp")] +mod tcp; + +#[cfg(feature = "udp")] +mod udp; + +mod file; + +#[cfg(feature = "signing")] +use crate::SigningConfig; + +/// A MAVLink connection +#[async_trait::async_trait] +pub trait AsyncMavConnection { + /// Receive a mavlink message. + /// + /// Blocks until a valid frame is received, ignoring invalid messages. + async fn recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError>; + + /// Send a mavlink message + async fn send( + &self, + header: &MavHeader, + data: &M, + ) -> Result; + + fn set_protocol_version(&mut self, version: MavlinkVersion); + fn get_protocol_version(&self) -> MavlinkVersion; + + /// Write whole frame + async fn send_frame( + &self, + frame: &MavFrame, + ) -> Result { + self.send(&frame.header, &frame.msg).await + } + + /// Read whole frame + async fn recv_frame(&self) -> Result, crate::error::MessageReadError> { + let (header, msg) = self.recv().await?; + let protocol_version = self.get_protocol_version(); + Ok(MavFrame { + header, + msg, + protocol_version, + }) + } + + /// Send a message with default header + async fn send_default(&self, data: &M) -> Result { + let header = MavHeader::default(); + self.send(&header, data).await + } + + /// Setup secret key used for message signing, or disable message signing + #[cfg(feature = "signing")] + fn setup_signing(&mut self, signing_data: Option); +} + +/// Connect to a MAVLink node by address string. +/// +/// The address must be in one of the following formats: +/// +/// * `tcpin::` to create a TCP server, listening for incoming connections +/// * `tcpout::` to create a TCP client +/// * `udpin::` to create a UDP server, listening for incoming packets +/// * `udpout::` to create a UDP client +/// * `udpbcast::` to create a UDP broadcast +/// * NOT `serial::` to create a serial connection +/// * `file:` to extract file data +/// +/// The type of the connection is determined at runtime based on the address type, so the +/// connection is returned as a trait object. +// TODO only reason this has to be send is udp serve +pub async fn connect_async( + address: &str, +) -> io::Result + Sync + Send>> { + let protocol_err = Err(io::Error::new( + io::ErrorKind::AddrNotAvailable, + "Protocol unsupported", + )); + + if cfg!(feature = "tcp") && address.starts_with("tcp") { + #[cfg(feature = "tcp")] + { + tcp::select_protocol(address).await + } + #[cfg(not(feature = "tcp"))] + { + protocol_err + } + } else if cfg!(feature = "udp") && address.starts_with("udp") { + #[cfg(feature = "udp")] + { + udp::select_protocol(address).await + } + #[cfg(not(feature = "udp"))] + { + protocol_err + } + } else if cfg!(feature = "direct-serial") && address.starts_with("serial:") { + #[cfg(feature = "direct-serial")] + { + todo!() + //Ok(Box::new(direct_serial::open(&address["serial:".len()..])?)) + } + #[cfg(not(feature = "direct-serial"))] + { + protocol_err + } + } else if address.starts_with("file") { + Ok(Box::new(file::open(&address["file:".len()..]).await?)) + } else { + protocol_err + } +} + +// TODO remove this ? +/// Returns the socket address for the given address. +pub(crate) fn get_socket_addr( + address: T, +) -> Result { + let addr = match address.to_socket_addrs()?.next() { + Some(addr) => addr, + None => { + return Err(io::Error::new( + io::ErrorKind::Other, + "Host address lookup failed", + )); + } + }; + Ok(addr) +} diff --git a/mavlink-core/src/async_connection/tcp.rs b/mavlink-core/src/async_connection/tcp.rs new file mode 100644 index 0000000000..e05d70455b --- /dev/null +++ b/mavlink-core/src/async_connection/tcp.rs @@ -0,0 +1,156 @@ +use super::{get_socket_addr, AsyncMavConnection}; +use crate::async_peek_reader::AsyncPeekReader; +use crate::{MavHeader, MavlinkVersion, Message}; + +use core::ops::DerefMut; +use tokio::io; +use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf}; +use tokio::net::{TcpListener, TcpStream}; +use tokio::sync::Mutex; + +#[cfg(not(feature = "signing"))] +use crate::{read_versioned_msg_async, write_versioned_msg_async}; +#[cfg(feature = "signing")] +use crate::{ + read_versioned_msg_async_signed, write_versioned_msg_async_signed, SigningConfig, SigningData, +}; + +/// TCP MAVLink connection + +pub async fn select_protocol( + address: &str, +) -> io::Result + Sync + Send>> { + let connection = if let Some(address) = address.strip_prefix("tcpout:") { + tcpout(address).await + } else if let Some(address) = address.strip_prefix("tcpin:") { + tcpin(address).await + } else { + Err(io::Error::new( + io::ErrorKind::AddrNotAvailable, + "Protocol unsupported", + )) + }; + + Ok(Box::new(connection?)) +} + +pub async fn tcpout(address: T) -> io::Result { + let addr = get_socket_addr(address)?; + + let socket = TcpStream::connect(addr).await?; + + let (reader, writer) = socket.into_split(); + + Ok(TcpConnection { + reader: Mutex::new(AsyncPeekReader::new(reader)), + writer: Mutex::new(TcpWrite { + socket: writer, + sequence: 0, + }), + protocol_version: MavlinkVersion::V2, + #[cfg(feature = "signing")] + signing_data: None, + }) +} + +pub async fn tcpin(address: T) -> io::Result { + let addr = get_socket_addr(address)?; + let listener = TcpListener::bind(addr).await?; + + //For now we only accept one incoming stream: this blocks until we get one + match listener.accept().await { + Ok((socket, _)) => { + let (reader, writer) = socket.into_split(); + return Ok(TcpConnection { + reader: Mutex::new(AsyncPeekReader::new(reader)), + writer: Mutex::new(TcpWrite { + socket: writer, + sequence: 0, + }), + protocol_version: MavlinkVersion::V2, + #[cfg(feature = "signing")] + signing_data: None, + }); + } + Err(e) => { + //TODO don't println in lib + println!("listener err: {e}"); + } + } + Err(io::Error::new( + io::ErrorKind::NotConnected, + "No incoming connections!", + )) +} + +pub struct TcpConnection { + reader: Mutex>, + writer: Mutex, + protocol_version: MavlinkVersion, + #[cfg(feature = "signing")] + signing_data: Option, +} + +struct TcpWrite { + socket: OwnedWriteHalf, + sequence: u8, +} + +#[async_trait::async_trait] +impl AsyncMavConnection for TcpConnection { + async fn recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> { + let mut reader = self.reader.lock().await; + #[cfg(not(feature = "signing"))] + let result = read_versioned_msg_async(reader.deref_mut(), self.protocol_version).await; + #[cfg(feature = "signing")] + let result = read_versioned_msg_async_signed( + reader.deref_mut(), + self.protocol_version, + self.signing_data.as_ref(), + ) + .await; + result + } + + async fn send( + &self, + header: &MavHeader, + data: &M, + ) -> Result { + let mut lock = self.writer.lock().await; + + let header = MavHeader { + sequence: lock.sequence, + system_id: header.system_id, + component_id: header.component_id, + }; + + lock.sequence = lock.sequence.wrapping_add(1); + #[cfg(not(feature = "signing"))] + let result = + write_versioned_msg_async(&mut lock.socket, self.protocol_version, header, data).await; + #[cfg(feature = "signing")] + let result = write_versioned_msg_async_signed( + &mut lock.socket, + self.protocol_version, + header, + data, + self.signing_data.as_ref(), + ) + .await; + result + } + + fn set_protocol_version(&mut self, version: MavlinkVersion) { + self.protocol_version = version; + } + + fn get_protocol_version(&self) -> MavlinkVersion { + self.protocol_version + } + + #[cfg(feature = "signing")] + fn setup_signing(&mut self, signing_data: Option) { + self.signing_data = signing_data.map(SigningData::from_config) + } +} diff --git a/mavlink-core/src/async_connection/udp.rs b/mavlink-core/src/async_connection/udp.rs new file mode 100644 index 0000000000..cd945e8055 --- /dev/null +++ b/mavlink-core/src/async_connection/udp.rs @@ -0,0 +1,277 @@ +use core::{ops::DerefMut, task::Poll}; +use std::{collections::VecDeque, io::Read, sync::Arc}; + +use tokio::{ + io::{self, AsyncRead, ReadBuf}, + net::UdpSocket, + sync::Mutex, +}; + +use crate::{async_peek_reader::AsyncPeekReader, MavHeader, MavlinkVersion, Message}; + +use super::{get_socket_addr, AsyncMavConnection}; + +#[cfg(not(feature = "signing"))] +use crate::{read_versioned_msg_async, write_versioned_msg_async}; +#[cfg(feature = "signing")] +use crate::{ + read_versioned_msg_async_signed, write_versioned_msg_signed, SigningConfig, SigningData, +}; + +pub async fn select_protocol( + address: &str, +) -> tokio::io::Result + Sync + Send>> { + let connection = if let Some(address) = address.strip_prefix("udpin:") { + udpin(address).await + } else if let Some(address) = address.strip_prefix("udpout:") { + udpout(address).await + } else if let Some(address) = address.strip_prefix("udpbcast:") { + udpbcast(address).await + } else { + Err(tokio::io::Error::new( + tokio::io::ErrorKind::AddrNotAvailable, + "Protocol unsupported", + )) + }; + + Ok(Box::new(connection?)) +} + +pub async fn udpbcast(address: T) -> tokio::io::Result { + let addr = get_socket_addr(address)?; + let socket = UdpSocket::bind("0.0.0.0:0").await?; + socket + .set_broadcast(true) + .expect("Couldn't bind to broadcast address."); + UdpConnection::new(socket, false, Some(addr)) +} + +pub async fn udpout(address: T) -> tokio::io::Result { + let addr = get_socket_addr(address)?; + let socket = UdpSocket::bind("0.0.0.0:0").await?; + UdpConnection::new(socket, false, Some(addr)) +} + +pub async fn udpin(address: T) -> tokio::io::Result { + let addr = address + .to_socket_addrs() + .unwrap() + .next() + .expect("Invalid address"); + let socket = UdpSocket::bind(addr).await?; + UdpConnection::new(socket, true, None) +} + +struct UdpRead { + socket: Arc, + buffer: VecDeque, + last_recv_address: Option, +} + +const MTU_SIZE: usize = 1500; +impl AsyncRead for UdpRead { + fn poll_read( + mut self: core::pin::Pin<&mut Self>, + cx: &mut core::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> Poll> { + if self.buffer.is_empty() { + let mut read_buffer = [0u8; MTU_SIZE]; + let mut read_buffer = ReadBuf::new(&mut read_buffer); + + match self.socket.poll_recv_from(cx, &mut read_buffer) { + Poll::Ready(Ok(address)) => { + let n_buffer = read_buffer.filled().len(); + + let n = (&read_buffer.filled()[0..n_buffer]).read(buf.initialize_unfilled())?; + buf.advance(n); + + self.buffer.extend(&read_buffer.filled()[n..n_buffer]); + self.last_recv_address = Some(address); + Poll::Ready(Ok(())) + } + Poll::Ready(Err(err)) => Poll::Ready(Err(err)), + Poll::Pending => Poll::Pending, + } + } else { + let read_result = self.buffer.read(buf.initialize_unfilled()); + let result = match read_result { + Ok(n) => { + buf.advance(n); + Ok(()) + } + Err(err) => Err(err), + }; + Poll::Ready(result) + } + } +} + +struct UdpWrite { + socket: Arc, + dest: Option, + sequence: u8, +} + +pub struct UdpConnection { + reader: Mutex>, + writer: Mutex, + protocol_version: MavlinkVersion, + server: bool, + #[cfg(feature = "signing")] + signing_data: Option, +} + +impl UdpConnection { + fn new( + socket: UdpSocket, + server: bool, + dest: Option, + ) -> tokio::io::Result { + let socket = Arc::new(socket); + Ok(Self { + server, + reader: Mutex::new(AsyncPeekReader::new(UdpRead { + socket: socket.clone(), + buffer: VecDeque::new(), + last_recv_address: None, + })), + writer: Mutex::new(UdpWrite { + socket, + dest, + sequence: 0, + }), + protocol_version: MavlinkVersion::V2, + #[cfg(feature = "signing")] + signing_data: None, + }) + } +} + +#[async_trait::async_trait] +impl AsyncMavConnection for UdpConnection { + async fn recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> { + let mut reader = self.reader.lock().await; + + loop { + #[cfg(not(feature = "signing"))] + let result = read_versioned_msg_async(reader.deref_mut(), self.protocol_version).await; + #[cfg(feature = "signing")] + let result = read_versioned_msg_async_signed( + reader.deref_mut(), + self.protocol_version, + self.signing_data.as_ref(), + ) + .await; + if self.server { + if let addr @ Some(_) = reader.reader_ref().last_recv_address { + self.writer.lock().await.dest = addr; + } + } + if let ok @ Ok(..) = result { + return ok; + } + } + } + + async fn send( + &self, + header: &MavHeader, + data: &M, + ) -> Result { + let mut guard = self.writer.lock().await; + let state = &mut *guard; + + let header = MavHeader { + sequence: state.sequence, + system_id: header.system_id, + component_id: header.component_id, + }; + + state.sequence = state.sequence.wrapping_add(1); + + let len = if let Some(addr) = state.dest { + let mut buf = Vec::new(); + #[cfg(not(feature = "signing"))] + write_versioned_msg_async( + &mut buf, + self.protocol_version, + header, + data, + #[cfg(feature = "signing")] + self.signing_data.as_ref(), + ) + .await?; + #[cfg(feature = "signing")] + write_versioned_msg_signed( + &mut buf, + self.protocol_version, + header, + data, + #[cfg(feature = "signing")] + self.signing_data.as_ref(), + )?; + state.socket.send_to(&buf, addr).await? + } else { + 0 + }; + + Ok(len) + } + + fn set_protocol_version(&mut self, version: MavlinkVersion) { + self.protocol_version = version; + } + + fn get_protocol_version(&self) -> MavlinkVersion { + self.protocol_version + } + + #[cfg(feature = "signing")] + fn setup_signing(&mut self, signing_data: Option) { + self.signing_data = signing_data.map(SigningData::from_config) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use tokio::io::AsyncReadExt; + + #[tokio::test] + async fn test_datagram_buffering() { + let receiver_socket = Arc::new(UdpSocket::bind("127.0.0.1:5001").await.unwrap()); + let mut udp_reader = UdpRead { + socket: receiver_socket.clone(), + buffer: VecDeque::new(), + last_recv_address: None, + }; + let sender_socket = UdpSocket::bind("0.0.0.0:0").await.unwrap(); + sender_socket.connect("127.0.0.1:5001").await.unwrap(); + + let datagram: Vec = (0..50).collect::>(); + + let mut n_sent = sender_socket.send(&datagram).await.unwrap(); + assert_eq!(n_sent, datagram.len()); + n_sent = sender_socket.send(&datagram).await.unwrap(); + assert_eq!(n_sent, datagram.len()); + + let mut buf = [0u8; 30]; + + let mut n_read = udp_reader.read(&mut buf).await.unwrap(); + assert_eq!(n_read, 30); + assert_eq!(&buf[0..n_read], (0..30).collect::>().as_slice()); + + n_read = udp_reader.read(&mut buf).await.unwrap(); + assert_eq!(n_read, 20); + assert_eq!(&buf[0..n_read], (30..50).collect::>().as_slice()); + + n_read = udp_reader.read(&mut buf).await.unwrap(); + assert_eq!(n_read, 30); + assert_eq!(&buf[0..n_read], (0..30).collect::>().as_slice()); + + n_read = udp_reader.read(&mut buf).await.unwrap(); + assert_eq!(n_read, 20); + assert_eq!(&buf[0..n_read], (30..50).collect::>().as_slice()); + } +} diff --git a/mavlink-core/src/async_peek_reader.rs b/mavlink-core/src/async_peek_reader.rs new file mode 100644 index 0000000000..1d6ada2c4b --- /dev/null +++ b/mavlink-core/src/async_peek_reader.rs @@ -0,0 +1,147 @@ +//! This module implements a buffered/peekable reader using async I/O. +//! +//! The purpose of the buffered/peekable reader is to allow for backtracking parsers. +//! +//! This is the async version of [`crate::peek_reader::PeekReader`]. +//! A reader implementing the tokio library's [`tokio::io::AsyncBufRead`]/[`tokio::io::AsyncBufReadExt`] traits seems like a good fit, but +//! it does not allow for peeking a specific number of bytes, so it provides no way to request +//! more data from the underlying reader without consuming the existing data. +//! +//! This API still tries to adhere to the [`tokio::io::AsyncBufRead`]'s trait philosophy. +//! +//! The main type [`AsyncPeekReader`] does not implement [`tokio::io::AsyncBufReadExt`] itself, as there is no added benefit +//! in doing so. +//! +use tokio::io::AsyncReadExt; + +use crate::error::MessageReadError; + +/// A buffered/peekable reader +/// +/// This reader wraps a type implementing [`tokio::io::AsyncRead`] and adds buffering via an internal buffer. +/// +/// It allows the user to `peek` a specified number of bytes (without consuming them), +/// to `read` bytes (consuming them), or to `consume` them after `peek`ing. +/// +/// NOTE: This reader is generic over the size of the buffer, defaulting to MAVLink's current largest +/// possible message size of 280 bytes +/// +pub struct AsyncPeekReader { + // Internal buffer + buffer: [u8; BUFFER_SIZE], + // The position of the next byte to read from the buffer. + cursor: usize, + // The position of the next byte to read into the buffer. + top: usize, + // The wrapped reader. + reader: R, +} + +impl AsyncPeekReader { + /// Instantiates a new [`AsyncPeekReader`], wrapping the provided [`tokio::io::AsyncReadExt`] and using the default chunk size + pub fn new(reader: R) -> Self { + Self { + buffer: [0; BUFFER_SIZE], + cursor: 0, + top: 0, + reader, + } + } + + /// Peeks an exact amount of bytes from the internal buffer + /// + /// If the internal buffer does not contain enough data, this function will read + /// from the underlying [`tokio::io::AsyncReadExt`] until it does, an error occurs or no more data can be read (EOF). + /// + /// If an EOF occurs and the specified amount could not be read, this function will return an [`ErrorKind::UnexpectedEof`]. + /// + /// This function does not consume data from the buffer, so subsequent calls to `peek` or `read` functions + /// will still return the peeked data. + /// + pub async fn peek_exact(&mut self, amount: usize) -> Result<&[u8], MessageReadError> { + self.fetch(amount, false).await + } + + /// Reads a specified amount of bytes from the internal buffer + /// + /// If the internal buffer does not contain enough data, this function will read + /// from the underlying [`tokio::io::AsyncReadExt`] until it does, an error occurs or no more data can be read (EOF). + /// + /// If an EOF occurs and the specified amount could not be read, this function will return an [`ErrorKind::UnexpectedEof`]. + /// + /// This function consumes the data from the buffer, unless an error occurs, in which case no data is consumed. + /// + pub async fn read_exact(&mut self, amount: usize) -> Result<&[u8], MessageReadError> { + self.fetch(amount, true).await + } + + /// Reads a byte from the internal buffer + /// + /// If the internal buffer does not contain enough data, this function will read + /// from the underlying [`tokio::io::AsyncReadExt`] until it does, an error occurs or no more data can be read (EOF). + /// + /// If an EOF occurs and the specified amount could not be read, this function will return an [`ErrorKind::UnexpectedEof`]. + /// + /// This function consumes the data from the buffer, unless an error occurs, in which case no data is consumed. + /// + pub async fn read_u8(&mut self) -> Result { + let buf = self.read_exact(1).await?; + Ok(buf[0]) + } + + /// Consumes a specified amount of bytes from the buffer + /// + /// If the internal buffer does not contain enough data, this function will consume as much data as is buffered. + /// + pub fn consume(&mut self, amount: usize) -> usize { + let amount = amount.min(self.top - self.cursor); + self.cursor += amount; + amount + } + + /// Returns an immutable reference to the underlying [`tokio::io::AsyncRead`] + /// + /// Reading directly from the underlying stream will cause data loss + pub fn reader_ref(&mut self) -> &R { + &self.reader + } + + /// Returns a mutable reference to the underlying [`tokio::io::AsyncRead`] + /// + /// Reading directly from the underlying stream will cause data loss + pub fn reader_mut(&mut self) -> &mut R { + &mut self.reader + } + + /// Internal function to fetch data from the internal buffer and/or reader + async fn fetch(&mut self, amount: usize, consume: bool) -> Result<&[u8], MessageReadError> { + let buffered = self.top - self.cursor; + + // the caller requested more bytes than we have buffered, fetch them from the reader + if buffered < amount { + let bytes_read = amount - buffered; + assert!(bytes_read < BUFFER_SIZE); + let mut buf = [0u8; BUFFER_SIZE]; + + // read needed bytes from reader + self.reader.read_exact(&mut buf[..bytes_read]).await?; + + // if some bytes were read, add them to the buffer + + if self.buffer.len() - self.top < bytes_read { + // reallocate + self.buffer.copy_within(self.cursor..self.top, 0); + self.cursor = 0; + self.top = buffered; + } + self.buffer[self.top..self.top + bytes_read].copy_from_slice(&buf[..bytes_read]); + self.top += bytes_read; + } + + let result = &self.buffer[self.cursor..self.cursor + amount]; + if consume { + self.cursor += amount; + } + Ok(result) + } +} diff --git a/mavlink-core/src/lib.rs b/mavlink-core/src/lib.rs index 2ef674b104..d68fe1d214 100644 --- a/mavlink-core/src/lib.rs +++ b/mavlink-core/src/lib.rs @@ -50,6 +50,16 @@ pub mod error; #[cfg(feature = "std")] pub use self::connection::{connect, MavConnection}; +#[cfg(feature = "tokio-1")] +mod async_connection; +#[cfg(feature = "tokio-1")] +pub use self::async_connection::{connect_async, AsyncMavConnection}; + +#[cfg(feature = "tokio-1")] +pub mod async_peek_reader; +#[cfg(feature = "tokio-1")] +use async_peek_reader::AsyncPeekReader; + #[cfg(any(feature = "embedded", feature = "embedded-hal-02"))] pub mod embedded; #[cfg(any(feature = "embedded", feature = "embedded-hal-02"))] @@ -262,6 +272,17 @@ pub fn read_versioned_msg( } } +#[cfg(feature = "tokio-1")] +pub async fn read_versioned_msg_async( + r: &mut AsyncPeekReader, + version: MavlinkVersion, +) -> Result<(MavHeader, M), error::MessageReadError> { + match version { + MavlinkVersion::V2 => read_v2_msg_async(r).await, + MavlinkVersion::V1 => read_v1_msg_async(r).await, + } +} + #[cfg(feature = "signing")] pub fn read_versioned_msg_signed( r: &mut PeekReader, @@ -274,6 +295,18 @@ pub fn read_versioned_msg_signed( } } +#[cfg(all(feature = "tokio-1", feature = "signing"))] +pub async fn read_versioned_msg_async_signed( + r: &mut AsyncPeekReader, + version: MavlinkVersion, + signing_data: Option<&SigningData>, +) -> Result<(MavHeader, M), error::MessageReadError> { + match version { + MavlinkVersion::V2 => read_v2_msg_async_inner(r, signing_data).await, + MavlinkVersion::V1 => read_v1_msg_async(r).await, + } +} + #[derive(Debug, Copy, Clone, PartialEq, Eq)] // Follow protocol definition: `` pub struct MAVLinkV1MessageRaw([u8; 1 + Self::HEADER_SIZE + 255 + 2]); @@ -445,6 +478,42 @@ pub fn read_v1_raw_message( } } +/// Return a raw buffer with the mavlink message +/// V1 maximum size is 263 bytes: `` +#[cfg(feature = "tokio-1")] +pub async fn read_v1_raw_message_async( + reader: &mut AsyncPeekReader, +) -> Result { + loop { + loop { + // search for the magic framing value indicating start of mavlink message + if reader.read_u8().await? == MAV_STX { + break; + } + } + + let mut message = MAVLinkV1MessageRaw::new(); + + message.0[0] = MAV_STX; + let header = &reader.peek_exact(MAVLinkV1MessageRaw::HEADER_SIZE).await? + [..MAVLinkV1MessageRaw::HEADER_SIZE]; + message.mut_header().copy_from_slice(header); + let packet_length = message.raw_bytes().len() - 1; + let payload_and_checksum = &reader.peek_exact(packet_length).await? + [MAVLinkV1MessageRaw::HEADER_SIZE..packet_length]; + message + .mut_payload_and_checksum() + .copy_from_slice(payload_and_checksum); + + // retry if CRC failed after previous STX + // (an STX byte may appear in the middle of a message) + if message.has_valid_crc::() { + reader.consume(message.raw_bytes().len() - 1); + return Ok(message); + } + } +} + /// Async read a raw buffer with the mavlink message /// V1 maximum size is 263 bytes: `` /// @@ -507,6 +576,27 @@ pub fn read_v1_msg( )) } +/// Read a MAVLink v1 message from a Read stream. +#[cfg(feature = "tokio-1")] +pub async fn read_v1_msg_async( + r: &mut AsyncPeekReader, +) -> Result<(MavHeader, M), error::MessageReadError> { + let message = read_v1_raw_message_async::(r).await?; + + Ok(( + MavHeader { + sequence: message.sequence(), + system_id: message.system_id(), + component_id: message.component_id(), + }, + M::parse( + MavlinkVersion::V1, + u32::from(message.message_id()), + message.payload(), + )?, + )) +} + /// Async read a MAVLink v1 message from a Read stream. /// /// NOTE: it will be add ~80KB to firmware flash size because all *_DATA::deser methods will be add to firmware. @@ -872,7 +962,7 @@ fn read_v2_raw_message_inner( /// V2 maximum size is 280 bytes: `` #[cfg(feature = "tokio-1")] pub async fn read_v2_raw_message_async( - reader: &mut R, + reader: &mut AsyncPeekReader, ) -> Result { read_v2_raw_message_async_inner::(reader, None).await } @@ -881,7 +971,7 @@ pub async fn read_v2_raw_message_async` #[cfg(feature = "tokio-1")] async fn read_v2_raw_message_async_inner( - reader: &mut R, + reader: &mut AsyncPeekReader, signing_data: Option<&SigningData>, ) -> Result { loop { @@ -895,19 +985,26 @@ async fn read_v2_raw_message_async_inner 0 { // if there are incompatibility flags set that we do not know discard the message continue; } - reader - .read_exact(message.mut_payload_and_checksum_and_sign()) - .await?; + let packet_length = message.raw_bytes().len() - 1; + let payload_and_checksum_and_sign = &reader.peek_exact(packet_length).await? + [MAVLinkV2MessageRaw::HEADER_SIZE..packet_length]; + message + .mut_payload_and_checksum_and_sign() + .copy_from_slice(payload_and_checksum_and_sign); - if !message.has_valid_crc::() { + if message.has_valid_crc::() { + // even if the signature turn out to be invalid the valid crc shows that the received data presents a valid message as opposed to random bytes + reader.consume(message.raw_bytes().len() - 1); + } else { continue; } @@ -926,7 +1023,7 @@ async fn read_v2_raw_message_async_inner` #[cfg(all(feature = "tokio-1", feature = "signing"))] pub async fn read_v2_raw_message_async_signed( - reader: &mut R, + reader: &mut AsyncPeekReader, signing_data: Option<&SigningData>, ) -> Result { read_v2_raw_message_async_inner::(reader, signing_data).await @@ -1017,9 +1114,26 @@ fn read_v2_msg_inner( /// Async read a MAVLink v2 message from a Read stream. #[cfg(feature = "tokio-1")] pub async fn read_v2_msg_async( - read: &mut R, + read: &mut AsyncPeekReader, +) -> Result<(MavHeader, M), error::MessageReadError> { + read_v2_msg_async_inner(read, None).await +} + +/// Async read a MAVLink v2 message from a Read stream. +#[cfg(all(feature = "tokio-1", feature = "signing"))] +pub async fn read_v2_msg_async_signed( + read: &mut AsyncPeekReader, + signing_data: Option<&SigningData>, ) -> Result<(MavHeader, M), error::MessageReadError> { - let message = read_v2_raw_message_async::(read).await?; + read_v2_msg_async_inner(read, signing_data).await +} + +#[cfg(feature = "tokio-1")] +async fn read_v2_msg_async_inner( + read: &mut AsyncPeekReader, + signing_data: Option<&SigningData>, +) -> Result<(MavHeader, M), error::MessageReadError> { + let message = read_v2_raw_message_async_signed::(read, signing_data).await?; Ok(( MavHeader { @@ -1097,6 +1211,21 @@ pub async fn write_versioned_msg_async( + w: &mut W, + version: MavlinkVersion, + header: MavHeader, + data: &M, + signing_data: Option<&SigningData>, +) -> Result { + match version { + MavlinkVersion::V2 => write_v2_msg_async_signed(w, header, data, signing_data).await, + MavlinkVersion::V1 => write_v1_msg_async(w, header, data).await, + } +} + /// Async write a message using the given mavlink version /// /// NOTE: it will be add ~70KB to firmware flash size because all *_DATA::ser methods will be add to firmware. @@ -1143,7 +1272,6 @@ pub fn write_v2_msg_signed( let signature_len = if let Some(signing_data) = signing_data { if signing_data.config.sign_outgoing { - //header.incompat_flags |= MAVLINK_IFLAG_SIGNED; message_raw.serialize_message_for_signing(header, data); signing_data.sign_message(&mut message_raw); MAVLinkV2MessageRaw::SIGNATURE_SIZE @@ -1182,6 +1310,39 @@ pub async fn write_v2_msg_async Ok(len) } +/// Write a MAVLink v2 message to a Write stream with signing support. +#[cfg(feature = "signing")] +#[cfg(feature = "tokio-1")] +pub async fn write_v2_msg_async_signed( + w: &mut W, + header: MavHeader, + data: &M, + signing_data: Option<&SigningData>, +) -> Result { + let mut message_raw = MAVLinkV2MessageRaw::new(); + + let signature_len = if let Some(signing_data) = signing_data { + if signing_data.config.sign_outgoing { + message_raw.serialize_message_for_signing(header, data); + signing_data.sign_message(&mut message_raw); + MAVLinkV2MessageRaw::SIGNATURE_SIZE + } else { + message_raw.serialize_message(header, data); + 0 + } + } else { + message_raw.serialize_message(header, data); + 0 + }; + + let payload_length: usize = message_raw.payload_length().into(); + let len = 1 + MAVLinkV2MessageRaw::HEADER_SIZE + payload_length + 2 + signature_len; + + w.write_all(&message_raw.0[..len]).await?; + + Ok(len) +} + /// Async write a MAVLink v2 message to a Write stream. /// /// NOTE: it will be add ~70KB to firmware flash size because all *_DATA::ser methods will be add to firmware. From 2042c61f0abf0d987c30eef181c9fa34315108e7 Mon Sep 17 00:00:00 2001 From: pv42 Date: Sun, 1 Sep 2024 16:13:49 +0200 Subject: [PATCH 2/7] test: add async versiosn of network loopback tests fix: remove tokio version cap for dev-dep --- mavlink-core/Cargo.toml | 3 +- mavlink/Cargo.toml | 3 + mavlink/tests/tcp_loopback_async_tests.rs | 69 +++++++++++++++++++++++ mavlink/tests/udp_loopback_async_tests.rs | 46 +++++++++++++++ 4 files changed, 119 insertions(+), 2 deletions(-) create mode 100644 mavlink/tests/tcp_loopback_async_tests.rs create mode 100644 mavlink/tests/udp_loopback_async_tests.rs diff --git a/mavlink-core/Cargo.toml b/mavlink-core/Cargo.toml index d51915905a..eb749b07c4 100644 --- a/mavlink-core/Cargo.toml +++ b/mavlink-core/Cargo.toml @@ -46,5 +46,4 @@ async-trait = { version = "0.1.18", optional = true } default = ["std", "tcp", "udp", "direct-serial", "serde"] [dev-dependencies] -# 1.39 tokio macros seems to be currently broken -tokio = { version = "1.0, <=1.38", default-features = false, features = ["io-util", "net", "sync", "fs","macros", "rt" ] } \ No newline at end of file +tokio = { version = "1.0", default-features = false, features = ["io-util", "net", "sync", "fs","macros", "rt" ] } \ No newline at end of file diff --git a/mavlink/Cargo.toml b/mavlink/Cargo.toml index cafcd6d1d6..669d9b0752 100644 --- a/mavlink/Cargo.toml +++ b/mavlink/Cargo.toml @@ -115,3 +115,6 @@ features = [ "tokio-1", "signing" ] + +[dev-dependencies] +tokio = { version = "1.0", default-features = false, features = ["macros", "rt", "time" ] } diff --git a/mavlink/tests/tcp_loopback_async_tests.rs b/mavlink/tests/tcp_loopback_async_tests.rs new file mode 100644 index 0000000000..edd56ba595 --- /dev/null +++ b/mavlink/tests/tcp_loopback_async_tests.rs @@ -0,0 +1,69 @@ +mod test_shared; + +#[cfg(all(feature = "tokio-1", feature = "tcp", feature = "common"))] +mod test_tcp_connections { + #[cfg(feature = "signing")] + use crate::test_shared; + #[cfg(feature = "signing")] + use mavlink::SigningConfig; + + /// Test whether we can send a message via TCP and receive it OK using async_connect. + /// This also test signing as a property of a MavConnection if the signing feature is enabled. + #[tokio::test] + pub async fn test_tcp_loopback() { + const RECEIVE_CHECK_COUNT: i32 = 5; + + #[cfg(feature = "signing")] + let singing_cfg_server = SigningConfig::new(test_shared::SECRET_KEY, 0, true, false); + #[cfg(feature = "signing")] + let singing_cfg_client = singing_cfg_server.clone(); + + let server_thread = tokio::spawn(async move { + //TODO consider using get_available_port to use a random port + let mut server = + mavlink::connect_async("tcpin:0.0.0.0:14551").await.expect("Couldn't create server"); + + #[cfg(feature = "signing")] + server.setup_signing(Some(singing_cfg_server)); + + let mut recv_count = 0; + for _i in 0..RECEIVE_CHECK_COUNT { + match server.recv().await { + Ok((_header, msg)) => { + if let mavlink::common::MavMessage::HEARTBEAT(_heartbeat_msg) = msg { + recv_count += 1; + } else { + // one message parse failure fails the test + break; + } + } + Err(..) => { + // one message read failure fails the test + break; + } + } + } + assert_eq!(recv_count, RECEIVE_CHECK_COUNT); + }); + + // Give some time for the server to connect + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + // have the client send a few hearbeats + tokio::spawn(async move { + let msg = + mavlink::common::MavMessage::HEARTBEAT(crate::test_shared::get_heartbeat_msg()); + let mut client = + mavlink::connect_async("tcpout:127.0.0.1:14551").await.expect("Couldn't create client"); + + #[cfg(feature = "signing")] + client.setup_signing(Some(singing_cfg_client)); + + for _i in 0..RECEIVE_CHECK_COUNT { + client.send_default(&msg).await.ok(); + } + }); + + server_thread.await.unwrap(); + } +} diff --git a/mavlink/tests/udp_loopback_async_tests.rs b/mavlink/tests/udp_loopback_async_tests.rs new file mode 100644 index 0000000000..3d6b46d236 --- /dev/null +++ b/mavlink/tests/udp_loopback_async_tests.rs @@ -0,0 +1,46 @@ +mod test_shared; + +#[cfg(all(feature = "tokio-1", feature = "udp", feature = "common"))] +mod test_udp_connections { + + /// Test whether we can send a message via UDP and receive it OK using async_connect + #[tokio::test] + pub async fn test_udp_loopback() { + const RECEIVE_CHECK_COUNT: i32 = 3; + + let server = mavlink::connect_async("udpin:0.0.0.0:14552").await.expect("Couldn't create server"); + + // have the client send one heartbeat per second + tokio::spawn({ + async move { + let msg = + mavlink::common::MavMessage::HEARTBEAT(crate::test_shared::get_heartbeat_msg()); + let client = + mavlink::connect_async("udpout:127.0.0.1:14552").await.expect("Couldn't create client"); + loop { + client.send_default(&msg).await.ok(); + } + } + }); + + //TODO use std::sync::WaitTimeoutResult to timeout ourselves if recv fails? + let mut recv_count = 0; + for _i in 0..RECEIVE_CHECK_COUNT { + match server.recv().await { + Ok((_header, msg)) => { + if let mavlink::common::MavMessage::HEARTBEAT(_heartbeat_msg) = msg { + recv_count += 1; + } else { + // one message parse failure fails the test + break; + } + } + Err(..) => { + // one message read failure fails the test + break; + } + } + } + assert_eq!(recv_count, RECEIVE_CHECK_COUNT); + } +} From aa5b0b5bc9845e47b446e7414460461552ebe732 Mon Sep 17 00:00:00 2001 From: pv42 Date: Sun, 1 Sep 2024 16:16:32 +0200 Subject: [PATCH 3/7] fix: read_v2_msg_async_inner, write_versioned_msg_async_signed doc: update for async fix: remove unused serial code from async connection --- mavlink-core/src/async_connection/mod.rs | 20 ++++---------------- mavlink-core/src/lib.rs | 4 ++-- 2 files changed, 6 insertions(+), 18 deletions(-) diff --git a/mavlink-core/src/async_connection/mod.rs b/mavlink-core/src/async_connection/mod.rs index 6ef4cc156a..ed727ec93b 100644 --- a/mavlink-core/src/async_connection/mod.rs +++ b/mavlink-core/src/async_connection/mod.rs @@ -13,12 +13,12 @@ mod file; #[cfg(feature = "signing")] use crate::SigningConfig; -/// A MAVLink connection +/// An async MAVLink connection #[async_trait::async_trait] pub trait AsyncMavConnection { /// Receive a mavlink message. /// - /// Blocks until a valid frame is received, ignoring invalid messages. + /// Wait until a valid frame is received, ignoring invalid messages. async fn recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError>; /// Send a mavlink message @@ -61,7 +61,7 @@ pub trait AsyncMavConnection { fn setup_signing(&mut self, signing_data: Option); } -/// Connect to a MAVLink node by address string. +/// Connect asynchronously to a MAVLink node by address string. /// /// The address must be in one of the following formats: /// @@ -70,12 +70,11 @@ pub trait AsyncMavConnection { /// * `udpin::` to create a UDP server, listening for incoming packets /// * `udpout::` to create a UDP client /// * `udpbcast::` to create a UDP broadcast -/// * NOT `serial::` to create a serial connection /// * `file:` to extract file data /// +/// Serial is currently not supported for async connections, use [`crate::connect`] instead. /// The type of the connection is determined at runtime based on the address type, so the /// connection is returned as a trait object. -// TODO only reason this has to be send is udp serve pub async fn connect_async( address: &str, ) -> io::Result + Sync + Send>> { @@ -102,16 +101,6 @@ pub async fn connect_async( { protocol_err } - } else if cfg!(feature = "direct-serial") && address.starts_with("serial:") { - #[cfg(feature = "direct-serial")] - { - todo!() - //Ok(Box::new(direct_serial::open(&address["serial:".len()..])?)) - } - #[cfg(not(feature = "direct-serial"))] - { - protocol_err - } } else if address.starts_with("file") { Ok(Box::new(file::open(&address["file:".len()..]).await?)) } else { @@ -119,7 +108,6 @@ pub async fn connect_async( } } -// TODO remove this ? /// Returns the socket address for the given address. pub(crate) fn get_socket_addr( address: T, diff --git a/mavlink-core/src/lib.rs b/mavlink-core/src/lib.rs index d68fe1d214..7a3341950d 100644 --- a/mavlink-core/src/lib.rs +++ b/mavlink-core/src/lib.rs @@ -1133,7 +1133,7 @@ async fn read_v2_msg_async_inner read: &mut AsyncPeekReader, signing_data: Option<&SigningData>, ) -> Result<(MavHeader, M), error::MessageReadError> { - let message = read_v2_raw_message_async_signed::(read, signing_data).await?; + let message = read_v2_raw_message_async_inner::(read, signing_data).await?; Ok(( MavHeader { @@ -1212,7 +1212,7 @@ pub async fn write_versioned_msg_async( w: &mut W, version: MavlinkVersion, From c6bd97e8b99611366cc32acbe3889c724d606291 Mon Sep 17 00:00:00 2001 From: pv42 Date: Sun, 1 Sep 2024 16:17:44 +0200 Subject: [PATCH 4/7] fix: fmt --- mavlink/tests/tcp_loopback_async_tests.rs | 10 ++++++---- mavlink/tests/udp_loopback_async_tests.rs | 9 ++++++--- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/mavlink/tests/tcp_loopback_async_tests.rs b/mavlink/tests/tcp_loopback_async_tests.rs index edd56ba595..860ce0b10e 100644 --- a/mavlink/tests/tcp_loopback_async_tests.rs +++ b/mavlink/tests/tcp_loopback_async_tests.rs @@ -20,8 +20,9 @@ mod test_tcp_connections { let server_thread = tokio::spawn(async move { //TODO consider using get_available_port to use a random port - let mut server = - mavlink::connect_async("tcpin:0.0.0.0:14551").await.expect("Couldn't create server"); + let mut server = mavlink::connect_async("tcpin:0.0.0.0:14551") + .await + .expect("Couldn't create server"); #[cfg(feature = "signing")] server.setup_signing(Some(singing_cfg_server)); @@ -53,8 +54,9 @@ mod test_tcp_connections { tokio::spawn(async move { let msg = mavlink::common::MavMessage::HEARTBEAT(crate::test_shared::get_heartbeat_msg()); - let mut client = - mavlink::connect_async("tcpout:127.0.0.1:14551").await.expect("Couldn't create client"); + let mut client = mavlink::connect_async("tcpout:127.0.0.1:14551") + .await + .expect("Couldn't create client"); #[cfg(feature = "signing")] client.setup_signing(Some(singing_cfg_client)); diff --git a/mavlink/tests/udp_loopback_async_tests.rs b/mavlink/tests/udp_loopback_async_tests.rs index 3d6b46d236..4b3934523e 100644 --- a/mavlink/tests/udp_loopback_async_tests.rs +++ b/mavlink/tests/udp_loopback_async_tests.rs @@ -8,15 +8,18 @@ mod test_udp_connections { pub async fn test_udp_loopback() { const RECEIVE_CHECK_COUNT: i32 = 3; - let server = mavlink::connect_async("udpin:0.0.0.0:14552").await.expect("Couldn't create server"); + let server = mavlink::connect_async("udpin:0.0.0.0:14552") + .await + .expect("Couldn't create server"); // have the client send one heartbeat per second tokio::spawn({ async move { let msg = mavlink::common::MavMessage::HEARTBEAT(crate::test_shared::get_heartbeat_msg()); - let client = - mavlink::connect_async("udpout:127.0.0.1:14552").await.expect("Couldn't create client"); + let client = mavlink::connect_async("udpout:127.0.0.1:14552") + .await + .expect("Couldn't create client"); loop { client.send_default(&msg).await.ok(); } From bfda24c51ac975d6a6d21d612b852ac3ec07a24a Mon Sep 17 00:00:00 2001 From: pv42 Date: Wed, 4 Sep 2024 11:57:36 +0200 Subject: [PATCH 5/7] fix: add Send marker to AsyncMavConnection for rust 1.70 --- mavlink-core/src/async_connection/file.rs | 2 +- mavlink-core/src/async_connection/mod.rs | 2 +- mavlink-core/src/async_connection/tcp.rs | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/mavlink-core/src/async_connection/file.rs b/mavlink-core/src/async_connection/file.rs index 9b1c90af3a..852273649f 100644 --- a/mavlink-core/src/async_connection/file.rs +++ b/mavlink-core/src/async_connection/file.rs @@ -36,7 +36,7 @@ pub struct AsyncFileConnection { } #[async_trait::async_trait] -impl AsyncMavConnection for AsyncFileConnection { +impl AsyncMavConnection for AsyncFileConnection { async fn recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> { let mut file = self.file.lock().await; diff --git a/mavlink-core/src/async_connection/mod.rs b/mavlink-core/src/async_connection/mod.rs index ed727ec93b..a708a67328 100644 --- a/mavlink-core/src/async_connection/mod.rs +++ b/mavlink-core/src/async_connection/mod.rs @@ -15,7 +15,7 @@ use crate::SigningConfig; /// An async MAVLink connection #[async_trait::async_trait] -pub trait AsyncMavConnection { +pub trait AsyncMavConnection { /// Receive a mavlink message. /// /// Wait until a valid frame is received, ignoring invalid messages. diff --git a/mavlink-core/src/async_connection/tcp.rs b/mavlink-core/src/async_connection/tcp.rs index e05d70455b..e8171c7961 100644 --- a/mavlink-core/src/async_connection/tcp.rs +++ b/mavlink-core/src/async_connection/tcp.rs @@ -17,7 +17,7 @@ use crate::{ /// TCP MAVLink connection -pub async fn select_protocol( +pub async fn select_protocol( address: &str, ) -> io::Result + Sync + Send>> { let connection = if let Some(address) = address.strip_prefix("tcpout:") { @@ -97,7 +97,7 @@ struct TcpWrite { } #[async_trait::async_trait] -impl AsyncMavConnection for TcpConnection { +impl AsyncMavConnection for TcpConnection { async fn recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> { let mut reader = self.reader.lock().await; #[cfg(not(feature = "signing"))] From c3387955790d1503e127c9d1f0e2356789026a84 Mon Sep 17 00:00:00 2001 From: pv42 Date: Sun, 8 Sep 2024 14:18:24 +0200 Subject: [PATCH 6/7] doc: replace block with yield, move file comment, fmt cargo --- mavlink-core/Cargo.toml | 4 ++-- mavlink-core/src/async_connection/file.rs | 4 ++-- mavlink-core/src/async_connection/mod.rs | 2 +- mavlink-core/src/async_connection/tcp.rs | 6 +++--- mavlink-core/src/async_connection/udp.rs | 2 ++ mavlink-core/src/lib.rs | 1 - 6 files changed, 10 insertions(+), 9 deletions(-) diff --git a/mavlink-core/Cargo.toml b/mavlink-core/Cargo.toml index eb749b07c4..5bf97b903c 100644 --- a/mavlink-core/Cargo.toml +++ b/mavlink-core/Cargo.toml @@ -26,7 +26,7 @@ embedded-io-async = { version = "0.6.1", optional = true } serde = { version = "1.0.115", optional = true, features = ["derive"] } serde_arrays = { version = "0.1.0", optional = true } serial = { version = "0.4", optional = true } -tokio = { version = "1.0", default-features = false, features = ["io-util", "net", "sync", "fs", ], optional = true } +tokio = { version = "1.0", default-features = false, features = ["io-util", "net", "sync", "fs"], optional = true } sha2 = { version = "0.10", optional = true } async-trait = { version = "0.1.18", optional = true } @@ -46,4 +46,4 @@ async-trait = { version = "0.1.18", optional = true } default = ["std", "tcp", "udp", "direct-serial", "serde"] [dev-dependencies] -tokio = { version = "1.0", default-features = false, features = ["io-util", "net", "sync", "fs","macros", "rt" ] } \ No newline at end of file +tokio = { version = "1.0", default-features = false, features = ["io-util", "net", "sync", "fs", "macros", "rt"] } \ No newline at end of file diff --git a/mavlink-core/src/async_connection/file.rs b/mavlink-core/src/async_connection/file.rs index 852273649f..5dae56d99e 100644 --- a/mavlink-core/src/async_connection/file.rs +++ b/mavlink-core/src/async_connection/file.rs @@ -1,3 +1,5 @@ +//! Async File MAVLINK connection + use core::ops::DerefMut; use super::AsyncMavConnection; @@ -15,8 +17,6 @@ use crate::read_versioned_msg_async; #[cfg(feature = "signing")] use crate::{read_versioned_msg_async_signed, SigningConfig, SigningData}; -/// File MAVLINK connection - pub async fn open(file_path: &str) -> io::Result { let file = File::open(file_path).await?; Ok(AsyncFileConnection { diff --git a/mavlink-core/src/async_connection/mod.rs b/mavlink-core/src/async_connection/mod.rs index a708a67328..35e2fa0e78 100644 --- a/mavlink-core/src/async_connection/mod.rs +++ b/mavlink-core/src/async_connection/mod.rs @@ -18,7 +18,7 @@ use crate::SigningConfig; pub trait AsyncMavConnection { /// Receive a mavlink message. /// - /// Wait until a valid frame is received, ignoring invalid messages. + /// Yield until a valid frame is received, ignoring invalid messages. async fn recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError>; /// Send a mavlink message diff --git a/mavlink-core/src/async_connection/tcp.rs b/mavlink-core/src/async_connection/tcp.rs index e8171c7961..b0307d377d 100644 --- a/mavlink-core/src/async_connection/tcp.rs +++ b/mavlink-core/src/async_connection/tcp.rs @@ -1,3 +1,5 @@ +//! Async TCP MAVLink connection + use super::{get_socket_addr, AsyncMavConnection}; use crate::async_peek_reader::AsyncPeekReader; use crate::{MavHeader, MavlinkVersion, Message}; @@ -15,8 +17,6 @@ use crate::{ read_versioned_msg_async_signed, write_versioned_msg_async_signed, SigningConfig, SigningData, }; -/// TCP MAVLink connection - pub async fn select_protocol( address: &str, ) -> io::Result + Sync + Send>> { @@ -57,7 +57,7 @@ pub async fn tcpin(address: T) -> io::Result { let (reader, writer) = socket.into_split(); diff --git a/mavlink-core/src/async_connection/udp.rs b/mavlink-core/src/async_connection/udp.rs index cd945e8055..c3082d0edb 100644 --- a/mavlink-core/src/async_connection/udp.rs +++ b/mavlink-core/src/async_connection/udp.rs @@ -1,3 +1,5 @@ +//! Async UDP MAVLink connection + use core::{ops::DerefMut, task::Poll}; use std::{collections::VecDeque, io::Read, sync::Arc}; diff --git a/mavlink-core/src/lib.rs b/mavlink-core/src/lib.rs index 7a3341950d..8af70ad89b 100644 --- a/mavlink-core/src/lib.rs +++ b/mavlink-core/src/lib.rs @@ -958,7 +958,6 @@ fn read_v2_raw_message_inner( } /// Async read a raw buffer with the mavlink message -/// /// V2 maximum size is 280 bytes: `` #[cfg(feature = "tokio-1")] pub async fn read_v2_raw_message_async( From b2c3cb4915215688681881654f5a6c703d1810f1 Mon Sep 17 00:00:00 2001 From: pv42 Date: Sun, 8 Sep 2024 14:30:23 +0200 Subject: [PATCH 7/7] fix: rename tcp/udp connection to AsyncTcpConnection/AsyncUdpConnection, remove unneeded tokio:: prefixes --- mavlink-core/src/async_connection/tcp.rs | 12 +++++----- mavlink-core/src/async_connection/udp.rs | 30 ++++++++++++------------ 2 files changed, 21 insertions(+), 21 deletions(-) diff --git a/mavlink-core/src/async_connection/tcp.rs b/mavlink-core/src/async_connection/tcp.rs index b0307d377d..4cd1f9ac9a 100644 --- a/mavlink-core/src/async_connection/tcp.rs +++ b/mavlink-core/src/async_connection/tcp.rs @@ -34,14 +34,14 @@ pub async fn select_protocol( Ok(Box::new(connection?)) } -pub async fn tcpout(address: T) -> io::Result { +pub async fn tcpout(address: T) -> io::Result { let addr = get_socket_addr(address)?; let socket = TcpStream::connect(addr).await?; let (reader, writer) = socket.into_split(); - Ok(TcpConnection { + Ok(AsyncTcpConnection { reader: Mutex::new(AsyncPeekReader::new(reader)), writer: Mutex::new(TcpWrite { socket: writer, @@ -53,7 +53,7 @@ pub async fn tcpout(address: T) -> io::Result(address: T) -> io::Result { +pub async fn tcpin(address: T) -> io::Result { let addr = get_socket_addr(address)?; let listener = TcpListener::bind(addr).await?; @@ -61,7 +61,7 @@ pub async fn tcpin(address: T) -> io::Result { let (reader, writer) = socket.into_split(); - return Ok(TcpConnection { + return Ok(AsyncTcpConnection { reader: Mutex::new(AsyncPeekReader::new(reader)), writer: Mutex::new(TcpWrite { socket: writer, @@ -83,7 +83,7 @@ pub async fn tcpin(address: T) -> io::Result>, writer: Mutex, protocol_version: MavlinkVersion, @@ -97,7 +97,7 @@ struct TcpWrite { } #[async_trait::async_trait] -impl AsyncMavConnection for TcpConnection { +impl AsyncMavConnection for AsyncTcpConnection { async fn recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> { let mut reader = self.reader.lock().await; #[cfg(not(feature = "signing"))] diff --git a/mavlink-core/src/async_connection/udp.rs b/mavlink-core/src/async_connection/udp.rs index c3082d0edb..3f06b83e1d 100644 --- a/mavlink-core/src/async_connection/udp.rs +++ b/mavlink-core/src/async_connection/udp.rs @@ -22,7 +22,7 @@ use crate::{ pub async fn select_protocol( address: &str, -) -> tokio::io::Result + Sync + Send>> { +) -> io::Result + Sync + Send>> { let connection = if let Some(address) = address.strip_prefix("udpin:") { udpin(address).await } else if let Some(address) = address.strip_prefix("udpout:") { @@ -30,8 +30,8 @@ pub async fn select_protocol( } else if let Some(address) = address.strip_prefix("udpbcast:") { udpbcast(address).await } else { - Err(tokio::io::Error::new( - tokio::io::ErrorKind::AddrNotAvailable, + Err(io::Error::new( + io::ErrorKind::AddrNotAvailable, "Protocol unsupported", )) }; @@ -39,29 +39,29 @@ pub async fn select_protocol( Ok(Box::new(connection?)) } -pub async fn udpbcast(address: T) -> tokio::io::Result { +pub async fn udpbcast(address: T) -> io::Result { let addr = get_socket_addr(address)?; let socket = UdpSocket::bind("0.0.0.0:0").await?; socket .set_broadcast(true) .expect("Couldn't bind to broadcast address."); - UdpConnection::new(socket, false, Some(addr)) + AsyncUdpConnection::new(socket, false, Some(addr)) } -pub async fn udpout(address: T) -> tokio::io::Result { +pub async fn udpout(address: T) -> io::Result { let addr = get_socket_addr(address)?; let socket = UdpSocket::bind("0.0.0.0:0").await?; - UdpConnection::new(socket, false, Some(addr)) + AsyncUdpConnection::new(socket, false, Some(addr)) } -pub async fn udpin(address: T) -> tokio::io::Result { +pub async fn udpin(address: T) -> io::Result { let addr = address .to_socket_addrs() .unwrap() .next() .expect("Invalid address"); let socket = UdpSocket::bind(addr).await?; - UdpConnection::new(socket, true, None) + AsyncUdpConnection::new(socket, true, None) } struct UdpRead { @@ -75,7 +75,7 @@ impl AsyncRead for UdpRead { fn poll_read( mut self: core::pin::Pin<&mut Self>, cx: &mut core::task::Context<'_>, - buf: &mut tokio::io::ReadBuf<'_>, + buf: &mut io::ReadBuf<'_>, ) -> Poll> { if self.buffer.is_empty() { let mut read_buffer = [0u8; MTU_SIZE]; @@ -115,7 +115,7 @@ struct UdpWrite { sequence: u8, } -pub struct UdpConnection { +pub struct AsyncUdpConnection { reader: Mutex>, writer: Mutex, protocol_version: MavlinkVersion, @@ -124,12 +124,12 @@ pub struct UdpConnection { signing_data: Option, } -impl UdpConnection { +impl AsyncUdpConnection { fn new( socket: UdpSocket, server: bool, dest: Option, - ) -> tokio::io::Result { + ) -> io::Result { let socket = Arc::new(socket); Ok(Self { server, @@ -151,7 +151,7 @@ impl UdpConnection { } #[async_trait::async_trait] -impl AsyncMavConnection for UdpConnection { +impl AsyncMavConnection for AsyncUdpConnection { async fn recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> { let mut reader = self.reader.lock().await; @@ -238,7 +238,7 @@ impl AsyncMavConnection for UdpConnection { #[cfg(test)] mod tests { use super::*; - use tokio::io::AsyncReadExt; + use io::AsyncReadExt; #[tokio::test] async fn test_datagram_buffering() {