-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
enhancement(socket sink): support unix datagram mode (#21762)
* enhancement(socket sink): support unix datagram mode * 5269_support_unix_datagram_mode_in_socket_sink.enhancement.md: fix linter error * sinks/util/{udp,unix}.rs: abstract out common logic into sinks/util/datagram.rs * sinks/util/service/net/unix: use sinks/util/unix/UnixEither and move impls there * remove problematic feature gates for 'sinks-socket' and 'sinks-statsd' * use std type and spawn blocking * basic_unix_datagram_sink: attempt to reduce flakiness * socket sink: ignore unix_mode on macOS --------- Co-authored-by: Pavlos Rontidis <[email protected]>
- Loading branch information
Showing
10 changed files
with
426 additions
and
135 deletions.
There are no files selected for viewing
8 changes: 8 additions & 0 deletions
8
changelog.d/5269_support_unix_datagram_mode_in_socket_sink.enhancement.md
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
The `socket` sink now supports the `unix_mode` configuration option that specifies the Unix socket mode to use. Valid values: | ||
|
||
- `Stream` (default) - Stream-oriented (`SOCK_STREAM`) | ||
- `Datagram` - Datagram-oriented (`SOCK_DGRAM`) | ||
|
||
This option only applies when `mode = "unix"`, and is unavailable on macOS, where `SOCK_STREAM` is always used for Unix sockets. | ||
|
||
authors: jpovixwm |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
#[cfg(unix)] | ||
use std::path::PathBuf; | ||
|
||
use bytes::BytesMut; | ||
use futures::{stream::BoxStream, StreamExt}; | ||
use futures_util::stream::Peekable; | ||
use tokio::net::UdpSocket; | ||
#[cfg(unix)] | ||
use tokio::net::UnixDatagram; | ||
use tokio_util::codec::Encoder; | ||
use vector_lib::internal_event::RegisterInternalEvent; | ||
use vector_lib::internal_event::{ByteSize, BytesSent, InternalEventHandle}; | ||
use vector_lib::EstimatedJsonEncodedSizeOf; | ||
|
||
use crate::{ | ||
codecs::Transformer, | ||
event::{Event, EventStatus, Finalizable}, | ||
internal_events::{SocketEventsSent, SocketMode, SocketSendError, UdpSendIncompleteError}, | ||
}; | ||
|
||
#[cfg(unix)] | ||
use crate::internal_events::{UnixSendIncompleteError, UnixSocketSendError}; | ||
|
||
pub enum DatagramSocket { | ||
Udp(UdpSocket), | ||
#[cfg(unix)] | ||
Unix(UnixDatagram, PathBuf), | ||
} | ||
|
||
pub async fn send_datagrams<E: Encoder<Event, Error = vector_lib::codecs::encoding::Error>>( | ||
input: &mut Peekable<BoxStream<'_, Event>>, | ||
mut socket: DatagramSocket, | ||
transformer: &Transformer, | ||
encoder: &mut E, | ||
bytes_sent: &<BytesSent as RegisterInternalEvent>::Handle, | ||
) { | ||
while let Some(mut event) = input.next().await { | ||
let byte_size = event.estimated_json_encoded_size_of(); | ||
|
||
transformer.transform(&mut event); | ||
|
||
let finalizers = event.take_finalizers(); | ||
let mut bytes = BytesMut::new(); | ||
|
||
// Errors are handled by `Encoder`. | ||
if encoder.encode(event, &mut bytes).is_err() { | ||
continue; | ||
} | ||
|
||
match send_datagram(&mut socket, &bytes).await { | ||
Ok(()) => { | ||
emit!(SocketEventsSent { | ||
mode: match socket { | ||
DatagramSocket::Udp(_) => SocketMode::Udp, | ||
#[cfg(unix)] | ||
DatagramSocket::Unix(..) => SocketMode::Unix, | ||
}, | ||
count: 1, | ||
byte_size, | ||
}); | ||
|
||
bytes_sent.emit(ByteSize(bytes.len())); | ||
finalizers.update_status(EventStatus::Delivered); | ||
} | ||
Err(error) => { | ||
match socket { | ||
DatagramSocket::Udp(_) => emit!(SocketSendError { | ||
mode: SocketMode::Udp, | ||
error | ||
}), | ||
#[cfg(unix)] | ||
DatagramSocket::Unix(_, path) => { | ||
emit!(UnixSocketSendError { | ||
path: path.as_path(), | ||
error: &error | ||
}) | ||
} | ||
}; | ||
finalizers.update_status(EventStatus::Errored); | ||
return; | ||
} | ||
} | ||
} | ||
} | ||
|
||
async fn send_datagram(socket: &mut DatagramSocket, buf: &[u8]) -> tokio::io::Result<()> { | ||
let sent = match socket { | ||
DatagramSocket::Udp(udp) => udp.send(buf).await, | ||
#[cfg(unix)] | ||
DatagramSocket::Unix(uds, _) => uds.send(buf).await, | ||
}?; | ||
if sent != buf.len() { | ||
match socket { | ||
DatagramSocket::Udp(_) => emit!(UdpSendIncompleteError { | ||
data_size: buf.len(), | ||
sent, | ||
}), | ||
#[cfg(unix)] | ||
DatagramSocket::Unix(..) => emit!(UnixSendIncompleteError { | ||
data_size: buf.len(), | ||
sent, | ||
}), | ||
} | ||
} | ||
Ok(()) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.