Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch to attachments v4 #330

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions libsignal-service-actix/src/push_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ impl AwcPushService {
{
match response.status() {
StatusCode::OK => Ok(()),
StatusCode::CREATED => Ok(()),
StatusCode::NO_CONTENT => Ok(()),
StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN => {
Err(ServiceError::Unauthorized)
Expand Down
83 changes: 82 additions & 1 deletion libsignal-service-hyper/src/push_service.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::io;
use std::time::Duration;
use std::{collections::HashMap, io};

use bytes::{Buf, Bytes};
use futures::{FutureExt, StreamExt, TryStreamExt};
Expand Down Expand Up @@ -150,6 +150,7 @@ impl HyperPushService {

match response.status() {
StatusCode::OK => Ok(response),
StatusCode::CREATED => Ok(response),
StatusCode::NO_CONTENT => Ok(response),
StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN => {
Err(ServiceError::Unauthorized)
Expand Down Expand Up @@ -292,6 +293,34 @@ impl PushService for HyperPushService {
// This is in principle known at compile time, but long to write out.
type ByteStream = Box<dyn futures::io::AsyncRead + Send + Unpin>;

#[tracing::instrument(skip(self))]
async fn head(
&mut self,
service: Endpoint,
path: &str,
additional_headers: &[(&str, &str)],
credentials_override: HttpAuthOverride,
) -> Result<HashMap<String, String>, ServiceError> {
let mut response = self
.request(
Method::HEAD,
service,
path,
additional_headers,
credentials_override,
None,
)
.await?;

Ok(response
.headers()
.iter()
.filter_map(|(k, v)| {
Some((k.to_string(), v.to_str().ok()?.to_owned()))
})
.collect())
}

#[tracing::instrument(skip(self))]
async fn get_json<T>(
&mut self,
Expand Down Expand Up @@ -413,6 +442,34 @@ impl PushService for HyperPushService {
Self::json(&mut response).await
}

#[tracing::instrument(skip(self))]
async fn post(
&mut self,
service: Endpoint,
path: &str,
additional_headers: &[(&str, &str)],
credentials_override: HttpAuthOverride,
) -> Result<HashMap<String, String>, ServiceError> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Post without any data?

Copy link
Collaborator Author

@gferon gferon Oct 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes 😁 don't shoot the messenger implementer

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case maybe

Suggested change
#[tracing::instrument(skip(self))]
async fn post(
&mut self,
service: Endpoint,
path: &str,
additional_headers: &[(&str, &str)],
credentials_override: HttpAuthOverride,
) -> Result<HashMap<String, String>, ServiceError> {
#[tracing::instrument(skip(self))]
async fn post_empty_body(
&mut self,
service: Endpoint,
path: &str,
additional_headers: &[(&str, &str)],
credentials_override: HttpAuthOverride,
) -> Result<HashMap<String, String>, ServiceError> {

just for clarity?

let mut response = self
.request(
Method::POST,
service,
path,
additional_headers,
credentials_override,
None,
)
.await?;

Ok(response
.headers()
.iter()
.filter_map(|(k, v)| {
Some((k.to_string(), v.to_str().ok()?.to_owned()))
})
.collect())
}

#[tracing::instrument(skip(self, value))]
async fn post_json<D, S>(
&mut self,
Expand Down Expand Up @@ -601,6 +658,30 @@ impl PushService for HyperPushService {
Ok(())
}

/// Upload larger file to CDN2
///
/// Implementations are allowed to *panic* when the Read instance throws an IO-Error
async fn post_to_cdn2<'s, C: std::io::Read + Send + 's>(
&mut self,
path: &str,
value: &[(&str, &str)],
file: Option<(&str, &'s mut C)>,
) -> Result<AttachmentDigest, ServiceError> {
unimplemented!()
}

/// Upload larger file to CDN3
///
/// Implementations are allowed to *panic* when the Read instance throws an IO-Error
async fn post_to_cdn3<'s, C: std::io::Read + Send + 's>(
&mut self,
path: &str,
value: &[(&str, &str)],
file: Option<(&str, &'s mut C)>,
) -> Result<AttachmentDigest, ServiceError> {
unimplemented!()
}

async fn ws(
&mut self,
path: &str,
Expand Down
2 changes: 2 additions & 0 deletions libsignal-service/src/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ impl From<&SignalServers> for ServiceConfiguration {
let mut map = HashMap::new();
map.insert(0, "https://cdn-staging.signal.org".parse().unwrap());
map.insert(2, "https://cdn2-staging.signal.org".parse().unwrap());
map.insert(3, "https://cdn3-staging.signal.org".parse().unwrap());
map
},
contact_discovery_url:
Expand All @@ -144,6 +145,7 @@ impl From<&SignalServers> for ServiceConfiguration {
let mut map = HashMap::new();
map.insert(0, "https://cdn.signal.org".parse().unwrap());
map.insert(2, "https://cdn2.signal.org".parse().unwrap());
map.insert(3, "https://cdn3.signal.org".parse().unwrap());
map
},
contact_discovery_url: "https://api.directory.signal.org".parse().unwrap(),
Expand Down
10 changes: 10 additions & 0 deletions libsignal-service/src/proto.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#![allow(clippy::all)]

use std::collections::HashMap;

use rand::{Rng, RngCore};
include!(concat!(env!("OUT_DIR"), "/signalservice.rs"));
include!(concat!(env!("OUT_DIR"), "/signal.rs"));
Expand Down Expand Up @@ -66,6 +68,14 @@ impl WebSocketResponseMessage {
}
}
}

pub fn headers(&self) -> HashMap<String, String> {
self.headers
.iter()
.filter_map(|kv| kv.split_once(":"))
.map(|(k, v)| (k.to_owned(), v.to_owned()))
.collect()
}
}

impl SyncMessage {
Expand Down
Loading