Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch to attachments v4 #330

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions libsignal-service-actix/src/push_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ impl AwcPushService {
{
match response.status() {
StatusCode::OK => Ok(()),
StatusCode::CREATED => Ok(()),
StatusCode::NO_CONTENT => Ok(()),
StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN => {
Err(ServiceError::Unauthorized)
Expand Down
110 changes: 109 additions & 1 deletion libsignal-service-hyper/src/push_service.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::io;
use std::time::Duration;
use std::{collections::HashMap, io};

use bytes::{Buf, Bytes};
use futures::{FutureExt, StreamExt, TryStreamExt};
Expand Down Expand Up @@ -150,6 +150,7 @@ impl HyperPushService {

match response.status() {
StatusCode::OK => Ok(response),
StatusCode::CREATED => Ok(response),
StatusCode::NO_CONTENT => Ok(response),
StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN => {
Err(ServiceError::Unauthorized)
Expand All @@ -160,6 +161,7 @@ impl HyperPushService {
},
StatusCode::PAYLOAD_TOO_LARGE => {
// This is 413 and means rate limit exceeded for Signal.
panic!("{:?}", response);
Err(ServiceError::RateLimitExceeded)
},
StatusCode::CONFLICT => {
Expand Down Expand Up @@ -292,6 +294,34 @@ impl PushService for HyperPushService {
// This is in principle known at compile time, but long to write out.
type ByteStream = Box<dyn futures::io::AsyncRead + Send + Unpin>;

#[tracing::instrument(skip(self))]
async fn head(
&mut self,
service: Endpoint,
path: &str,
additional_headers: &[(&str, &str)],
credentials_override: HttpAuthOverride,
) -> Result<HashMap<String, String>, ServiceError> {
let mut response = self
.request(
Method::HEAD,
service,
path,
additional_headers,
credentials_override,
None,
)
.await?;

Ok(response
.headers()
.iter()
.filter_map(|(k, v)| {
Some((k.to_string(), v.to_str().ok()?.to_owned()))
})
.collect())
}

#[tracing::instrument(skip(self))]
async fn get_json<T>(
&mut self,
Expand Down Expand Up @@ -377,6 +407,32 @@ impl PushService for HyperPushService {
Self::json(&mut response).await
}

#[tracing::instrument(skip(self, contents))]
async fn patch_file(
&mut self,
service: Endpoint,
path: &str,
additional_headers: &[(&str, &str)],
credentials_override: HttpAuthOverride,
contents: Vec<u8>,
content_type: String,
) -> Result<(), ServiceError> {
let mut response = self
.request(
Method::PATCH,
service,
path,
additional_headers,
credentials_override,
Some(RequestBody {
contents,
content_type,
}),
)
.await?;
Ok(())
}

#[tracing::instrument(skip(self, value))]
async fn patch_json<D, S>(
&mut self,
Expand Down Expand Up @@ -413,6 +469,34 @@ impl PushService for HyperPushService {
Self::json(&mut response).await
}

#[tracing::instrument(skip(self))]
async fn post_empty_body(
&mut self,
service: Endpoint,
path: &str,
additional_headers: &[(&str, &str)],
credentials_override: HttpAuthOverride,
) -> Result<HashMap<String, String>, ServiceError> {
let mut response = self
.request(
Method::POST,
service,
path,
additional_headers,
credentials_override,
None,
)
.await?;

Ok(response
.headers()
.iter()
.filter_map(|(k, v)| {
Some((k.to_string(), v.to_str().ok()?.to_owned()))
})
.collect())
}

#[tracing::instrument(skip(self, value))]
async fn post_json<D, S>(
&mut self,
Expand Down Expand Up @@ -601,6 +685,30 @@ impl PushService for HyperPushService {
Ok(())
}

/// Upload larger file to CDN2
///
/// Implementations are allowed to *panic* when the Read instance throws an IO-Error
async fn post_to_cdn2<'s, C: std::io::Read + Send + 's>(
&mut self,
path: &str,
value: &[(&str, &str)],
file: Option<(&str, &'s mut C)>,
) -> Result<AttachmentDigest, ServiceError> {
unimplemented!()
}

/// Upload larger file to CDN3
///
/// Implementations are allowed to *panic* when the Read instance throws an IO-Error
async fn post_to_cdn3<'s, C: std::io::Read + Send + 's>(
&mut self,
path: &str,
value: &[(&str, &str)],
file: Option<(&str, &'s mut C)>,
) -> Result<AttachmentDigest, ServiceError> {
unimplemented!()
}

async fn ws(
&mut self,
path: &str,
Expand Down
2 changes: 2 additions & 0 deletions libsignal-service/src/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ impl From<&SignalServers> for ServiceConfiguration {
let mut map = HashMap::new();
map.insert(0, "https://cdn-staging.signal.org".parse().unwrap());
map.insert(2, "https://cdn2-staging.signal.org".parse().unwrap());
map.insert(3, "https://cdn3-staging.signal.org".parse().unwrap());
map
},
contact_discovery_url:
Expand All @@ -144,6 +145,7 @@ impl From<&SignalServers> for ServiceConfiguration {
let mut map = HashMap::new();
map.insert(0, "https://cdn.signal.org".parse().unwrap());
map.insert(2, "https://cdn2.signal.org".parse().unwrap());
map.insert(3, "https://cdn3.signal.org".parse().unwrap());
map
},
contact_discovery_url: "https://api.directory.signal.org".parse().unwrap(),
Expand Down
8 changes: 6 additions & 2 deletions libsignal-service/src/digeststream.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::io::Read;
use std::io::{self, Read, Seek, SeekFrom};

use sha2::{Digest, Sha256};

Expand All @@ -15,14 +15,18 @@ impl<'r, R: Read> Read for DigestingReader<'r, R> {
}
}

impl<'r, R: Read> DigestingReader<'r, R> {
impl<'r, R: Read + Seek> DigestingReader<'r, R> {
pub fn new(inner: &'r mut R) -> Self {
Self {
inner,
digest: Sha256::new(),
}
}

pub fn seek(&mut self, from: SeekFrom) -> io::Result<u64> {
self.inner.seek(from)
}

pub fn finalize(self) -> Vec<u8> {
// XXX representation is not ideal, but this leaks to the public interface and I don't
// really like exposing the GenericArray.
Expand Down
10 changes: 10 additions & 0 deletions libsignal-service/src/proto.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#![allow(clippy::all)]

use std::collections::HashMap;

use rand::{Rng, RngCore};
include!(concat!(env!("OUT_DIR"), "/signalservice.rs"));
include!(concat!(env!("OUT_DIR"), "/signal.rs"));
Expand Down Expand Up @@ -66,6 +68,14 @@ impl WebSocketResponseMessage {
}
}
}

pub fn headers(&self) -> HashMap<String, String> {
self.headers
.iter()
.filter_map(|kv| kv.split_once(":"))
.map(|(k, v)| (k.to_owned(), v.to_owned()))
.collect()
}
}

impl SyncMessage {
Expand Down
Loading