diff --git a/changelog.d/21999-add-option-to-use-virtual-addressing.feature.md b/changelog.d/21999-add-option-to-use-virtual-addressing.feature.md new file mode 100644 index 0000000000000..5149bebb686aa --- /dev/null +++ b/changelog.d/21999-add-option-to-use-virtual-addressing.feature.md @@ -0,0 +1,3 @@ +Adds a `force_path_style` option to the `aws_s3` sink that allows users to configure virtual host style addressing. The value defaults to `true` to maintain existing behavior. + +authors: sam6258 diff --git a/src/aws/mod.rs b/src/aws/mod.rs index 7a96bdd536b87..de1f8985a1e28 100644 --- a/src/aws/mod.rs +++ b/src/aws/mod.rs @@ -123,7 +123,7 @@ pub trait ClientBuilder { type Client; /// Build the client using the given config settings. - fn build(config: &SdkConfig) -> Self::Client; + fn build(&self, config: &SdkConfig) -> Self::Client; } fn region_provider( @@ -161,28 +161,36 @@ async fn resolve_region( } /// Create the SDK client using the provided settings. -pub async fn create_client( +pub async fn create_client( + builder: &T, auth: &AwsAuthentication, region: Option, endpoint: Option, proxy: &ProxyConfig, tls_options: &Option, timeout: &Option, -) -> crate::Result { - create_client_and_region::(auth, region, endpoint, proxy, tls_options, timeout) +) -> crate::Result +where + T: ClientBuilder, +{ + create_client_and_region::(builder, auth, region, endpoint, proxy, tls_options, timeout) .await .map(|(client, _)| client) } /// Create the SDK client and resolve the region using the provided settings. -pub async fn create_client_and_region( +pub async fn create_client_and_region( + builder: &T, auth: &AwsAuthentication, region: Option, endpoint: Option, proxy: &ProxyConfig, tls_options: &Option, timeout: &Option, -) -> crate::Result<(T::Client, Region)> { +) -> crate::Result<(T::Client, Region)> +where + T: ClientBuilder, +{ let retry_config = RetryConfig::disabled(); // The default credentials chains will look for a region if not given but we'd like to @@ -239,7 +247,7 @@ pub async fn create_client_and_region( let config = config_builder.build(); - Ok((T::build(&config), region)) + Ok((T::build(builder, &config), region)) } #[derive(Snafu, Debug)] diff --git a/src/common/s3.rs b/src/common/s3.rs index cdb69725b4c66..7c7a9810b1850 100644 --- a/src/common/s3.rs +++ b/src/common/s3.rs @@ -2,13 +2,16 @@ use aws_sdk_s3::config; use crate::aws::ClientBuilder; -pub(crate) struct S3ClientBuilder; +pub(crate) struct S3ClientBuilder { + pub force_path_style: Option, +} impl ClientBuilder for S3ClientBuilder { type Client = aws_sdk_s3::client::Client; - fn build(config: &aws_types::SdkConfig) -> Self::Client { - let config = config::Builder::from(config).force_path_style(true).build(); - aws_sdk_s3::client::Client::from_conf(config) + fn build(&self, config: &aws_types::SdkConfig) -> Self::Client { + let builder = + config::Builder::from(config).force_path_style(self.force_path_style.unwrap_or(true)); + aws_sdk_s3::client::Client::from_conf(builder.build()) } } diff --git a/src/common/sqs.rs b/src/common/sqs.rs index f02aa2f7c4021..8f4ff3ecab69e 100644 --- a/src/common/sqs.rs +++ b/src/common/sqs.rs @@ -5,7 +5,7 @@ pub(crate) struct SqsClientBuilder; impl ClientBuilder for SqsClientBuilder { type Client = aws_sdk_sqs::client::Client; - fn build(config: &aws_types::SdkConfig) -> Self::Client { + fn build(&self, config: &aws_types::SdkConfig) -> Self::Client { aws_sdk_sqs::client::Client::new(config) } } diff --git a/src/secrets/aws_secrets_manager.rs b/src/secrets/aws_secrets_manager.rs index f3c221b42eee0..2f61f31e74a09 100644 --- a/src/secrets/aws_secrets_manager.rs +++ b/src/secrets/aws_secrets_manager.rs @@ -13,7 +13,7 @@ pub(crate) struct SecretsManagerClientBuilder; impl ClientBuilder for SecretsManagerClientBuilder { type Client = Client; - fn build(config: &aws_types::SdkConfig) -> Self::Client { + fn build(&self, config: &aws_types::SdkConfig) -> Self::Client { let config = config::Builder::from(config).build(); Client::from_conf(config) } @@ -57,6 +57,7 @@ impl SecretBackend for AwsSecretsManagerBackend { _: &mut signal::SignalRx, ) -> crate::Result> { let client = create_client::( + &SecretsManagerClientBuilder {}, &self.auth, self.region.region(), self.region.endpoint(), diff --git a/src/sinks/aws_cloudwatch_logs/config.rs b/src/sinks/aws_cloudwatch_logs/config.rs index 0dc7f90917620..d84076884a37e 100644 --- a/src/sinks/aws_cloudwatch_logs/config.rs +++ b/src/sinks/aws_cloudwatch_logs/config.rs @@ -33,7 +33,7 @@ pub struct CloudwatchLogsClientBuilder; impl ClientBuilder for CloudwatchLogsClientBuilder { type Client = aws_sdk_cloudwatchlogs::client::Client; - fn build(config: &aws_types::SdkConfig) -> Self::Client { + fn build(&self, config: &aws_types::SdkConfig) -> Self::Client { aws_sdk_cloudwatchlogs::client::Client::new(config) } } @@ -169,6 +169,7 @@ pub struct CloudwatchLogsSinkConfig { impl CloudwatchLogsSinkConfig { pub async fn create_client(&self, proxy: &ProxyConfig) -> crate::Result { create_client::( + &CloudwatchLogsClientBuilder {}, &self.auth, self.region.region(), self.region.endpoint(), diff --git a/src/sinks/aws_cloudwatch_logs/integration_tests.rs b/src/sinks/aws_cloudwatch_logs/integration_tests.rs index 4e80493adaf88..9eebc9dbbf5cc 100644 --- a/src/sinks/aws_cloudwatch_logs/integration_tests.rs +++ b/src/sinks/aws_cloudwatch_logs/integration_tests.rs @@ -467,9 +467,17 @@ async fn create_client_test() -> CloudwatchLogsClient { let endpoint = Some(cloudwatch_address()); let proxy = ProxyConfig::default(); - create_client::(&auth, region, endpoint, &proxy, &None, &None) - .await - .unwrap() + create_client::( + &CloudwatchLogsClientBuilder {}, + &auth, + region, + endpoint, + &proxy, + &None, + &None, + ) + .await + .unwrap() } async fn ensure_group() { diff --git a/src/sinks/aws_cloudwatch_metrics/mod.rs b/src/sinks/aws_cloudwatch_metrics/mod.rs index 371b1e29e79a4..c7da85dddc3e8 100644 --- a/src/sinks/aws_cloudwatch_metrics/mod.rs +++ b/src/sinks/aws_cloudwatch_metrics/mod.rs @@ -119,7 +119,7 @@ struct CloudwatchMetricsClientBuilder; impl ClientBuilder for CloudwatchMetricsClientBuilder { type Client = aws_sdk_cloudwatch::client::Client; - fn build(config: &aws_types::SdkConfig) -> Self::Client { + fn build(&self, config: &aws_types::SdkConfig) -> Self::Client { aws_sdk_cloudwatch::client::Client::new(config) } } @@ -172,6 +172,7 @@ impl CloudWatchMetricsSinkConfig { }; create_client::( + &CloudwatchMetricsClientBuilder {}, &self.auth, region, self.region.endpoint(), diff --git a/src/sinks/aws_kinesis/firehose/config.rs b/src/sinks/aws_kinesis/firehose/config.rs index 96b032f4b5738..a785e333d0bd5 100644 --- a/src/sinks/aws_kinesis/firehose/config.rs +++ b/src/sinks/aws_kinesis/firehose/config.rs @@ -38,7 +38,7 @@ pub struct KinesisFirehoseClientBuilder; impl ClientBuilder for KinesisFirehoseClientBuilder { type Client = KinesisClient; - fn build(config: &aws_types::SdkConfig) -> Self::Client { + fn build(&self, config: &aws_types::SdkConfig) -> Self::Client { Self::Client::new(config) } } @@ -102,6 +102,7 @@ impl KinesisFirehoseSinkConfig { pub async fn create_client(&self, proxy: &ProxyConfig) -> crate::Result { create_client::( + &KinesisFirehoseClientBuilder {}, &self.base.auth, self.base.region.region(), self.base.region.endpoint(), diff --git a/src/sinks/aws_kinesis/firehose/integration_tests.rs b/src/sinks/aws_kinesis/firehose/integration_tests.rs index 00f06301eb164..69c4b66424bee 100644 --- a/src/sinks/aws_kinesis/firehose/integration_tests.rs +++ b/src/sinks/aws_kinesis/firehose/integration_tests.rs @@ -255,6 +255,7 @@ async fn firehose_client() -> aws_sdk_firehose::Client { let proxy = ProxyConfig::default(); create_client::( + &KinesisFirehoseClientBuilder {}, &auth, region_endpoint.region(), region_endpoint.endpoint(), diff --git a/src/sinks/aws_kinesis/streams/config.rs b/src/sinks/aws_kinesis/streams/config.rs index aa1896d0c7333..8da69d329ad95 100644 --- a/src/sinks/aws_kinesis/streams/config.rs +++ b/src/sinks/aws_kinesis/streams/config.rs @@ -37,7 +37,7 @@ pub struct KinesisClientBuilder; impl ClientBuilder for KinesisClientBuilder { type Client = KinesisClient; - fn build(config: &aws_types::SdkConfig) -> Self::Client { + fn build(&self, config: &aws_types::SdkConfig) -> Self::Client { KinesisClient::new(config) } } @@ -99,6 +99,7 @@ impl KinesisStreamsSinkConfig { pub async fn create_client(&self, proxy: &ProxyConfig) -> crate::Result { create_client::( + &KinesisClientBuilder {}, &self.base.auth, self.base.region.region(), self.base.region.endpoint(), diff --git a/src/sinks/aws_kinesis/streams/integration_tests.rs b/src/sinks/aws_kinesis/streams/integration_tests.rs index 57958858f0122..673930b1c360f 100644 --- a/src/sinks/aws_kinesis/streams/integration_tests.rs +++ b/src/sinks/aws_kinesis/streams/integration_tests.rs @@ -177,6 +177,7 @@ async fn client() -> aws_sdk_kinesis::Client { let proxy = ProxyConfig::default(); let region = RegionOrEndpoint::with_both("us-east-1", kinesis_address()); create_client::( + &KinesisClientBuilder {}, &auth, region.region(), region.endpoint(), diff --git a/src/sinks/aws_s3/config.rs b/src/sinks/aws_s3/config.rs index b2ac3a4b31578..404b4f35f255c 100644 --- a/src/sinks/aws_s3/config.rs +++ b/src/sinks/aws_s3/config.rs @@ -139,6 +139,12 @@ pub struct S3SinkConfig { #[configurable(derived)] #[serde(default)] pub timezone: Option, + + /// Specifies which addressing style to use. + /// + /// This controls if the bucket name is in the hostname or part of the URL. + #[serde(default = "crate::serde::default_true")] + pub force_path_style: bool, } pub(super) fn default_key_prefix() -> String { @@ -167,6 +173,7 @@ impl GenerateConfig for S3SinkConfig { auth: AwsAuthentication::default(), acknowledgements: Default::default(), timezone: Default::default(), + force_path_style: Default::default(), }) .unwrap() } @@ -251,7 +258,14 @@ impl S3SinkConfig { } pub async fn create_service(&self, proxy: &ProxyConfig) -> crate::Result { - s3_common::config::create_service(&self.region, &self.auth, proxy, &self.tls).await + s3_common::config::create_service( + &self.region, + &self.auth, + proxy, + &self.tls, + self.force_path_style, + ) + .await } } diff --git a/src/sinks/aws_s3/integration_tests.rs b/src/sinks/aws_s3/integration_tests.rs index e86c8956895c6..61949bef3d287 100644 --- a/src/sinks/aws_s3/integration_tests.rs +++ b/src/sinks/aws_s3/integration_tests.rs @@ -438,6 +438,7 @@ async fn s3_flush_on_exhaustion() { auth: Default::default(), acknowledgements: Default::default(), timezone: Default::default(), + force_path_style: true, } }; let prefix = config.key_prefix.clone(); @@ -489,7 +490,12 @@ async fn client() -> S3Client { let region = RegionOrEndpoint::with_both("us-east-1", s3_address()); let proxy = ProxyConfig::default(); let tls_options = None; + let force_path_style_value: bool = true; + create_client::( + &S3ClientBuilder { + force_path_style: Some(force_path_style_value), + }, &auth, region.region(), region.endpoint(), @@ -522,6 +528,7 @@ fn config(bucket: &str, batch_size: usize) -> S3SinkConfig { auth: Default::default(), acknowledgements: Default::default(), timezone: Default::default(), + force_path_style: true, } } diff --git a/src/sinks/aws_s_s/sns/config.rs b/src/sinks/aws_s_s/sns/config.rs index 3463443d3639c..7dad0dae99059 100644 --- a/src/sinks/aws_s_s/sns/config.rs +++ b/src/sinks/aws_s_s/sns/config.rs @@ -48,6 +48,7 @@ impl GenerateConfig for SnsSinkConfig { impl SnsSinkConfig { pub(super) async fn create_client(&self, proxy: &ProxyConfig) -> crate::Result { create_client::( + &SnsClientBuilder {}, &self.base_config.auth, self.region.region(), self.region.endpoint(), @@ -108,7 +109,7 @@ pub(super) struct SnsClientBuilder; impl ClientBuilder for SnsClientBuilder { type Client = aws_sdk_sns::client::Client; - fn build(config: &aws_types::SdkConfig) -> Self::Client { + fn build(&self, config: &aws_types::SdkConfig) -> Self::Client { aws_sdk_sns::client::Client::new(config) } } diff --git a/src/sinks/aws_s_s/sns/integration_tests.rs b/src/sinks/aws_s_s/sns/integration_tests.rs index 30b43d8b9b116..be026eceee813 100644 --- a/src/sinks/aws_s_s/sns/integration_tests.rs +++ b/src/sinks/aws_s_s/sns/integration_tests.rs @@ -32,6 +32,7 @@ async fn create_sns_test_client() -> SnsClient { let endpoint = sns_address(); let proxy = ProxyConfig::default(); create_client::( + &SnsClientBuilder {}, &auth, Some(Region::new("us-east-1")), Some(endpoint), @@ -53,6 +54,7 @@ async fn create_sqs_test_client() -> SqsClient { let endpoint = sqs_address(); let proxy = ProxyConfig::default(); create_client::( + &SqsClientBuilder {}, &auth, Some(Region::new("us-east-1")), Some(endpoint), diff --git a/src/sinks/aws_s_s/sqs/config.rs b/src/sinks/aws_s_s/sqs/config.rs index a936a6badc03f..872844622a8fd 100644 --- a/src/sinks/aws_s_s/sqs/config.rs +++ b/src/sinks/aws_s_s/sqs/config.rs @@ -49,6 +49,7 @@ impl GenerateConfig for SqsSinkConfig { impl SqsSinkConfig { pub(super) async fn create_client(&self, proxy: &ProxyConfig) -> crate::Result { create_client::( + &SqsClientBuilder {}, &self.base_config.auth, self.region.region(), self.region.endpoint(), diff --git a/src/sinks/aws_s_s/sqs/integration_tests.rs b/src/sinks/aws_s_s/sqs/integration_tests.rs index 3428caa4374fe..1334beba6e407 100644 --- a/src/sinks/aws_s_s/sqs/integration_tests.rs +++ b/src/sinks/aws_s_s/sqs/integration_tests.rs @@ -30,6 +30,7 @@ async fn create_test_client() -> SqsClient { let endpoint = sqs_address(); let proxy = ProxyConfig::default(); create_client::( + &SqsClientBuilder {}, &auth, Some(Region::new("us-east-1")), Some(endpoint), diff --git a/src/sinks/s3_common/config.rs b/src/sinks/s3_common/config.rs index 2f87f3d6754a2..42c2f3b054eba 100644 --- a/src/sinks/s3_common/config.rs +++ b/src/sinks/s3_common/config.rs @@ -363,12 +363,24 @@ pub async fn create_service( auth: &AwsAuthentication, proxy: &ProxyConfig, tls_options: &Option, + force_path_style: impl Into, ) -> crate::Result { let endpoint = region.endpoint(); let region = region.region(); - let client = - create_client::(auth, region.clone(), endpoint, proxy, tls_options, &None) - .await?; + let force_path_style_value: bool = force_path_style.into(); + + let client = create_client::( + &S3ClientBuilder { + force_path_style: Some(force_path_style_value), + }, + auth, + region.clone(), + endpoint, + proxy, + tls_options, + &None, + ) + .await?; Ok(S3Service::new(client)) } diff --git a/src/sources/aws_s3/mod.rs b/src/sources/aws_s3/mod.rs index 988eb55bfcce4..369dce2d91843 100644 --- a/src/sources/aws_s3/mod.rs +++ b/src/sources/aws_s3/mod.rs @@ -230,8 +230,12 @@ impl AwsS3Config { ) -> crate::Result { let region = self.region.region(); let endpoint = self.region.endpoint(); + let force_path_style_value: bool = true; let s3_client = create_client::( + &S3ClientBuilder { + force_path_style: Some(force_path_style_value), + }, &self.auth, region.clone(), endpoint.clone(), @@ -248,6 +252,7 @@ impl AwsS3Config { match self.sqs { Some(ref sqs) => { let (sqs_client, region) = create_client_and_region::( + &SqsClientBuilder {}, &self.auth, region.clone(), endpoint, @@ -1016,7 +1021,11 @@ mod integration_tests { endpoint: Some(s3_address()), }; let proxy_config = ProxyConfig::default(); + let force_path_style_value: bool = true; create_client::( + &S3ClientBuilder { + force_path_style: Some(force_path_style_value), + }, &auth, region_endpoint.region(), region_endpoint.endpoint(), @@ -1036,6 +1045,7 @@ mod integration_tests { }; let proxy_config = ProxyConfig::default(); create_client::( + &SqsClientBuilder {}, &auth, region_endpoint.region(), region_endpoint.endpoint(), diff --git a/src/sources/aws_sqs/config.rs b/src/sources/aws_sqs/config.rs index 3d0583a007688..310603f703cb0 100644 --- a/src/sources/aws_sqs/config.rs +++ b/src/sources/aws_sqs/config.rs @@ -161,6 +161,7 @@ impl SourceConfig for AwsSqsConfig { impl AwsSqsConfig { async fn build_client(&self, cx: &SourceContext) -> crate::Result { create_client::( + &SqsClientBuilder {}, &self.auth, self.region.region(), self.region.endpoint(), diff --git a/website/cue/reference/components/sinks/base/aws_s3.cue b/website/cue/reference/components/sinks/base/aws_s3.cue index f15b16e097516..7c20ac233286c 100644 --- a/website/cue/reference/components/sinks/base/aws_s3.cue +++ b/website/cue/reference/components/sinks/base/aws_s3.cue @@ -701,6 +701,15 @@ base: components: sinks: aws_s3: configuration: { required: false type: string: default: "%s" } + force_path_style: { + description: """ + Specifies which addressing style to use. + + This controls if the bucket name is in the hostname or part of the URL. + """ + required: false + type: bool: default: true + } framing: { description: "Framing configuration." required: false