diff --git a/libsignal-service-actix/src/push_service.rs b/libsignal-service-actix/src/push_service.rs index 47f26dd84..4fab9f39c 100644 --- a/libsignal-service-actix/src/push_service.rs +++ b/libsignal-service-actix/src/push_service.rs @@ -95,6 +95,7 @@ impl AwcPushService { { match response.status() { StatusCode::OK => Ok(()), + StatusCode::CREATED => Ok(()), StatusCode::NO_CONTENT => Ok(()), StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN => { Err(ServiceError::Unauthorized) diff --git a/libsignal-service-hyper/src/push_service.rs b/libsignal-service-hyper/src/push_service.rs index 4a28ae2b6..f5b4c974b 100644 --- a/libsignal-service-hyper/src/push_service.rs +++ b/libsignal-service-hyper/src/push_service.rs @@ -1,5 +1,5 @@ -use std::io; use std::time::Duration; +use std::{collections::HashMap, io}; use bytes::{Buf, Bytes}; use futures::{FutureExt, StreamExt, TryStreamExt}; @@ -150,6 +150,7 @@ impl HyperPushService { match response.status() { StatusCode::OK => Ok(response), + StatusCode::CREATED => Ok(response), StatusCode::NO_CONTENT => Ok(response), StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN => { Err(ServiceError::Unauthorized) @@ -160,6 +161,7 @@ impl HyperPushService { }, StatusCode::PAYLOAD_TOO_LARGE => { // This is 413 and means rate limit exceeded for Signal. + panic!("{:?}", response); Err(ServiceError::RateLimitExceeded) }, StatusCode::CONFLICT => { @@ -292,6 +294,34 @@ impl PushService for HyperPushService { // This is in principle known at compile time, but long to write out. type ByteStream = Box; + #[tracing::instrument(skip(self))] + async fn head( + &mut self, + service: Endpoint, + path: &str, + additional_headers: &[(&str, &str)], + credentials_override: HttpAuthOverride, + ) -> Result, ServiceError> { + let mut response = self + .request( + Method::HEAD, + service, + path, + additional_headers, + credentials_override, + None, + ) + .await?; + + Ok(response + .headers() + .iter() + .filter_map(|(k, v)| { + Some((k.to_string(), v.to_str().ok()?.to_owned())) + }) + .collect()) + } + #[tracing::instrument(skip(self))] async fn get_json( &mut self, @@ -377,6 +407,32 @@ impl PushService for HyperPushService { Self::json(&mut response).await } + #[tracing::instrument(skip(self, contents))] + async fn patch_file( + &mut self, + service: Endpoint, + path: &str, + additional_headers: &[(&str, &str)], + credentials_override: HttpAuthOverride, + contents: Vec, + content_type: String, + ) -> Result<(), ServiceError> { + let mut response = self + .request( + Method::PATCH, + service, + path, + additional_headers, + credentials_override, + Some(RequestBody { + contents, + content_type, + }), + ) + .await?; + Ok(()) + } + #[tracing::instrument(skip(self, value))] async fn patch_json( &mut self, @@ -413,6 +469,34 @@ impl PushService for HyperPushService { Self::json(&mut response).await } + #[tracing::instrument(skip(self))] + async fn post_empty_body( + &mut self, + service: Endpoint, + path: &str, + additional_headers: &[(&str, &str)], + credentials_override: HttpAuthOverride, + ) -> Result, ServiceError> { + let mut response = self + .request( + Method::POST, + service, + path, + additional_headers, + credentials_override, + None, + ) + .await?; + + Ok(response + .headers() + .iter() + .filter_map(|(k, v)| { + Some((k.to_string(), v.to_str().ok()?.to_owned())) + }) + .collect()) + } + #[tracing::instrument(skip(self, value))] async fn post_json( &mut self, @@ -601,6 +685,30 @@ impl PushService for HyperPushService { Ok(()) } + /// Upload larger file to CDN2 + /// + /// Implementations are allowed to *panic* when the Read instance throws an IO-Error + async fn post_to_cdn2<'s, C: std::io::Read + Send + 's>( + &mut self, + path: &str, + value: &[(&str, &str)], + file: Option<(&str, &'s mut C)>, + ) -> Result { + unimplemented!() + } + + /// Upload larger file to CDN3 + /// + /// Implementations are allowed to *panic* when the Read instance throws an IO-Error + async fn post_to_cdn3<'s, C: std::io::Read + Send + 's>( + &mut self, + path: &str, + value: &[(&str, &str)], + file: Option<(&str, &'s mut C)>, + ) -> Result { + unimplemented!() + } + async fn ws( &mut self, path: &str, diff --git a/libsignal-service/src/configuration.rs b/libsignal-service/src/configuration.rs index df0bf4b39..18c9db2cb 100644 --- a/libsignal-service/src/configuration.rs +++ b/libsignal-service/src/configuration.rs @@ -125,6 +125,7 @@ impl From<&SignalServers> for ServiceConfiguration { let mut map = HashMap::new(); map.insert(0, "https://cdn-staging.signal.org".parse().unwrap()); map.insert(2, "https://cdn2-staging.signal.org".parse().unwrap()); + map.insert(3, "https://cdn3-staging.signal.org".parse().unwrap()); map }, contact_discovery_url: @@ -144,6 +145,7 @@ impl From<&SignalServers> for ServiceConfiguration { let mut map = HashMap::new(); map.insert(0, "https://cdn.signal.org".parse().unwrap()); map.insert(2, "https://cdn2.signal.org".parse().unwrap()); + map.insert(3, "https://cdn3.signal.org".parse().unwrap()); map }, contact_discovery_url: "https://api.directory.signal.org".parse().unwrap(), diff --git a/libsignal-service/src/digeststream.rs b/libsignal-service/src/digeststream.rs index 626caeca9..638fc1aa0 100644 --- a/libsignal-service/src/digeststream.rs +++ b/libsignal-service/src/digeststream.rs @@ -1,4 +1,4 @@ -use std::io::Read; +use std::io::{self, Read, Seek, SeekFrom}; use sha2::{Digest, Sha256}; @@ -15,7 +15,7 @@ impl<'r, R: Read> Read for DigestingReader<'r, R> { } } -impl<'r, R: Read> DigestingReader<'r, R> { +impl<'r, R: Read + Seek> DigestingReader<'r, R> { pub fn new(inner: &'r mut R) -> Self { Self { inner, @@ -23,6 +23,10 @@ impl<'r, R: Read> DigestingReader<'r, R> { } } + pub fn seek(&mut self, from: SeekFrom) -> io::Result { + self.inner.seek(from) + } + pub fn finalize(self) -> Vec { // XXX representation is not ideal, but this leaks to the public interface and I don't // really like exposing the GenericArray. diff --git a/libsignal-service/src/proto.rs b/libsignal-service/src/proto.rs index f24d08037..f4e06e07c 100644 --- a/libsignal-service/src/proto.rs +++ b/libsignal-service/src/proto.rs @@ -1,5 +1,7 @@ #![allow(clippy::all)] +use std::collections::HashMap; + use rand::{Rng, RngCore}; include!(concat!(env!("OUT_DIR"), "/signalservice.rs")); include!(concat!(env!("OUT_DIR"), "/signal.rs")); @@ -66,6 +68,14 @@ impl WebSocketResponseMessage { } } } + + pub fn headers(&self) -> HashMap { + self.headers + .iter() + .filter_map(|kv| kv.split_once(":")) + .map(|(k, v)| (k.to_owned(), v.to_owned())) + .collect() + } } impl SyncMessage { diff --git a/libsignal-service/src/push_service.rs b/libsignal-service/src/push_service.rs index 7bdec5912..bc23adcc9 100644 --- a/libsignal-service/src/push_service.rs +++ b/libsignal-service/src/push_service.rs @@ -1,4 +1,9 @@ -use std::{collections::HashMap, fmt, time::Duration}; +use std::{ + collections::HashMap, + fmt, + io::{Read, SeekFrom}, + time::Duration, +}; use crate::{ configuration::{Endpoint, ServiceCredentials}, @@ -25,6 +30,7 @@ use libsignal_protocol::{ use phonenumber::PhoneNumber; use prost::Message as ProtobufMessage; use serde::{Deserialize, Serialize}; +use url::Url; use uuid::Uuid; use zkgroup::{ profiles::{ProfileKeyCommitment, ProfileKeyVersion}, @@ -53,7 +59,7 @@ pub const DIRECTORY_AUTH_PATH: &str = "/v1/directory/auth"; pub const DIRECTORY_FEEDBACK_PATH: &str = "/v1/directory/feedback-v3/%s"; pub const SENDER_ACK_MESSAGE_PATH: &str = "/v1/messages/%s/%d"; pub const UUID_ACK_MESSAGE_PATH: &str = "/v1/messages/uuid/%s"; -pub const ATTACHMENT_PATH: &str = "/v2/attachments/form/upload"; +pub const ATTACHMENT_PATH: &str = "/v4/attachments/form/upload"; pub const PROFILE_PATH: &str = "/v1/profile/"; @@ -513,20 +519,39 @@ impl SignalServiceProfile { } } -#[derive(Debug, serde::Deserialize, Default)] +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct AttachmentUploadForm { + pub cdn: u32, + pub key: String, + pub headers: HashMap, + pub signed_upload_location: Url, +} + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct AttachmentDigest { + pub digest: Vec, + pub incremental_digest: Option>, + pub incremental_mac_chunk_size: u64, +} + +#[derive(Debug, Deserialize)] #[serde(rename_all = "camelCase")] -pub struct AttachmentV2UploadAttributes { - key: String, - credential: String, - acl: String, - algorithm: String, - date: String, - policy: String, - signature: String, - // This is different from Java's implementation, - // and I (Ruben) am unsure why they decide to force-parse at upload-time instead of at registration - // time. - attachment_id: u64, +pub struct ResumableUploadSpec { + attachment_key: Vec, + attachment_iv: Vec, + cdn_key: String, + cdn_number: u32, + resume_location: String, + expiration_timestamp: u64, + headers: HashMap, +} + +#[derive(Debug)] +pub struct ResumeInfo { + pub content_range: Option, + pub content_start: u64, } #[derive(thiserror::Error, Debug)] @@ -607,6 +632,9 @@ pub enum ServiceError { #[error("invalid device name")] InvalidDeviceName, + + #[error("Unknown CDN version {0}")] + UnknownCdnVersion(u32), } #[cfg_attr(feature = "unsend-futures", async_trait::async_trait(?Send))] @@ -614,6 +642,15 @@ pub enum ServiceError { pub trait PushService: MaybeSend { type ByteStream: futures::io::AsyncRead + MaybeSend + Unpin; + // will be better once we refactor this trait (= ditch it) and can manipulate requests ourselves + async fn head( + &mut self, + service: Endpoint, + path: &str, + additional_headers: &[(&str, &str)], + credentials_override: HttpAuthOverride, + ) -> Result, ServiceError>; + async fn get_json( &mut self, service: Endpoint, @@ -645,6 +682,16 @@ pub trait PushService: MaybeSend { for<'de> D: Deserialize<'de>, S: MaybeSend + Serialize; + async fn patch_file( + &mut self, + service: Endpoint, + path: &str, + additional_headers: &[(&str, &str)], + credentials_override: HttpAuthOverride, + contents: Vec, + content_type: String, + ) -> Result<(), ServiceError>; + async fn patch_json( &mut self, service: Endpoint, @@ -657,6 +704,14 @@ pub trait PushService: MaybeSend { for<'de> D: Deserialize<'de>, S: MaybeSend + Serialize; + async fn post_empty_body( + &mut self, + service: Endpoint, + path: &str, + additional_headers: &[(&str, &str)], + credentials_override: HttpAuthOverride, + ) -> Result, ServiceError>; + async fn post_json( &mut self, service: Endpoint, @@ -697,9 +752,6 @@ pub trait PushService: MaybeSend { path: &str, ) -> Result; - /// Upload larger file to CDN0 in legacy fashion, e.g. for attachments. - /// - /// Implementations are allowed to *panic* when the Read instance throws an IO-Error async fn post_to_cdn0<'s, C: std::io::Read + Send + 's>( &mut self, path: &str, @@ -707,6 +759,26 @@ pub trait PushService: MaybeSend { file: Option<(&str, &'s mut C)>, ) -> Result<(), ServiceError>; + /// Upload larger file to CDN2 + /// + /// Implementations are allowed to *panic* when the Read instance throws an IO-Error + async fn post_to_cdn2<'s, C: std::io::Read + Send + 's>( + &mut self, + path: &str, + value: &[(&str, &str)], + file: Option<(&str, &'s mut C)>, + ) -> Result; + + /// Upload larger file to CDN3 + /// + /// Implementations are allowed to *panic* when the Read instance throws an IO-Error + async fn post_to_cdn3<'s, C: std::io::Read + Send + 's>( + &mut self, + path: &str, + value: &[(&str, &str)], + file: Option<(&str, &'s mut C)>, + ) -> Result; + async fn ws( &mut self, path: &str, @@ -833,49 +905,171 @@ pub trait PushService: MaybeSend { .await } - /// Request AttachmentV2UploadAttributes - /// - /// Equivalent with getAttachmentV2UploadAttributes - async fn get_attachment_v2_upload_attributes( + async fn get_attachment_v4_upload_attributes( &mut self, - ) -> Result { + ) -> Result { self.get_json( Endpoint::Service, - "/v2/attachments/form/upload", + "/v4/attachments/form/upload", &[], HttpAuthOverride::NoOverride, ) .await } - /// Upload attachment to CDN + async fn get_attachment_resumable_upload_url( + &mut self, + attachment_upload_form: &AttachmentUploadForm, + ) -> Result { + let mut headers = attachment_upload_form.headers.clone(); + headers.insert("Content-Length".into(), "0".into()); + if attachment_upload_form.cdn == 2 { + headers.insert( + "Content-Type".into(), + "application/octet-stream".into(), + ); + } else if attachment_upload_form.cdn == 3 { + headers.insert("Upload-Defer-Length".into(), "1".into()); + headers.insert("Tus-Resumable".into(), "1.0.0".into()); + } else { + return Err(ServiceError::UnknownCdnVersion( + attachment_upload_form.cdn, + )); + }; + + let request_headers: Vec<(&str, &str)> = headers + .iter() + .map(|(k, v)| (k.as_str(), v.as_str())) + .collect(); + + let response = self + .post_empty_body( + Endpoint::Cdn(attachment_upload_form.cdn), + attachment_upload_form.signed_upload_location.path(), + request_headers.as_slice(), + HttpAuthOverride::Unidentified, // this is hack because I'm a smartass, Authorization is part of the supplied headers + ) + .await?; + + Ok(response.get("location").unwrap().parse().unwrap()) + } + + async fn get_attachment_resume_info_cdn3( + &mut self, + resumable_url: &Url, + mut headers: HashMap, + ) -> Result { + headers.insert("Tus-Resumable".into(), "1.0.0".into()); + let request_headers: Vec<(&str, &str)> = headers + .iter() + .map(|(k, v)| (k.as_str(), v.as_str())) + .collect(); + let response_headers = self + .head( + Endpoint::Cdn(3), + resumable_url.path(), + request_headers.as_slice(), + HttpAuthOverride::Unidentified, // this is assuming knowledge of not adding any auth + ) + .await?; + + let upload_offset = response_headers + .get("upload-offset") + .ok_or(ServiceError::InvalidFrameError { + reason: "no Upload-Offset header in response".into(), + })? + .parse() + .map_err(|_| ServiceError::InvalidFrameError { + reason: "invalid integer value for Upload-Offset header".into(), + })?; + + Ok(ResumeInfo { + content_range: None, + content_start: upload_offset, + }) + } + + /// Upload attachment /// /// Returns attachment ID and the attachment digest - async fn upload_attachment<'s, C: std::io::Read + Send + 's>( + async fn upload_attachment_v4< + 's, + R: std::io::Read + std::io::Seek + Send + 's, + >( &mut self, - attrs: &AttachmentV2UploadAttributes, - content: &'s mut C, - ) -> Result<(u64, Vec), ServiceError> { - let values = [ - ("acl", &attrs.acl as &str), - ("key", &attrs.key), - ("policy", &attrs.policy), - ("Content-Type", "application/octet-stream"), - ("x-amz-algorithm", &attrs.algorithm), - ("x-amz-credential", &attrs.credential), - ("x-amz-date", &attrs.date), - ("x-amz-signature", &attrs.signature), - ]; - - let mut digester = crate::digeststream::DigestingReader::new(content); - - self.post_to_cdn0( - "attachments/", - &values, - Some(("file", &mut digester)), - ) - .await?; - Ok((attrs.attachment_id, digester.finalize())) + cdn_id: u32, + resumable_url: &Url, + content_type: &str, + length: u64, + mut headers: HashMap, + mut content: R, + ) -> Result { + if cdn_id == 2 { + unimplemented!() + // self.post_to_cdn2( + // attachment.getResumableUploadSpec().getResumeLocation(), + // attachment.getData(), + // "application/octet-stream", + // attachment.getDataSize(), + // attachment.getIncremental(), + // attachment.getOutputStreamFactory(), + // attachment.getListener(), + // attachment.getCancelationSignal(), + // ) + } else { + let resume_info = self + .get_attachment_resume_info_cdn3(resumable_url, headers.clone()) + .await?; + + if resume_info.content_start == length { + let mut digester = + crate::digeststream::DigestingReader::new(&mut content); + let mut buf = Vec::new(); + digester.read_to_end(&mut buf).unwrap(); + return Ok(AttachmentDigest { + digest: digester.finalize(), + incremental_digest: None, + incremental_mac_chunk_size: 0, + }); + } + + let mut digester = + crate::digeststream::DigestingReader::new(&mut content); + digester + .seek(SeekFrom::Start(resume_info.content_start)) + .unwrap(); + + let mut buf = Vec::new(); + digester.read_to_end(&mut buf).unwrap(); + + headers.insert("Tus-Resumable".into(), "1.0.0".into()); + headers.insert( + "Upload-Offset".into(), + resume_info.content_start.to_string(), + ); + headers.insert("Upload-Length".into(), buf.len().to_string()); + + let request_headers: Vec<(&str, &str)> = headers + .iter() + .map(|(k, v)| (k.as_str(), v.as_str())) + .collect(); + + self.patch_file( + Endpoint::Cdn(3), + resumable_url.path(), + &request_headers, + HttpAuthOverride::Unidentified, + buf, + content_type.to_owned(), + ) + .await?; + + Ok(AttachmentDigest { + digest: digester.finalize(), + incremental_digest: None, + incremental_mac_chunk_size: 0, + }) + } } async fn get_messages( diff --git a/libsignal-service/src/sender.rs b/libsignal-service/src/sender.rs index f0c8fcfd7..dee8d6121 100644 --- a/libsignal-service/src/sender.rs +++ b/libsignal-service/src/sender.rs @@ -218,23 +218,39 @@ where }); // Request upload attributes - let attrs = self - .identified_ws - .get_attachment_v2_upload_attributes() + // TODO: we can actually store the upload spec to be able to resume the upload later + // if it fails or stalls (= we should at least split the API calls so clients can decide what to do) + let attachment_upload_form = self + .service + .get_attachment_v4_upload_attributes() .instrument(tracing::trace_span!("requesting upload attributes")) .await?; - let (id, digest) = self + + let resumable_upload_url = self .service - .upload_attachment(&attrs, &mut std::io::Cursor::new(&contents)) - .instrument(tracing::trace_span!("Uploading attachment")) + .get_attachment_resumable_upload_url(&attachment_upload_form) .await?; + let attachment_digest = self + .service + .upload_attachment_v4( + attachment_upload_form.cdn, + &resumable_upload_url, + &spec.content_type, + contents.len() as u64, + attachment_upload_form.headers, + &mut std::io::Cursor::new(&contents), + ) + .instrument(tracing::trace_span!("Uploading attachment")) + .await + .expect("HELL NO"); + Ok(AttachmentPointer { content_type: Some(spec.content_type), key: Some(key.to_vec()), size: Some(len as u32), // thumbnail: Option>, - digest: Some(digest), + digest: Some(attachment_digest.digest), file_name: spec.file_name, flags: Some( if spec.voice_note == Some(true) { @@ -257,8 +273,10 @@ where .expect("unix epoch in the past") .as_millis() as u64, ), - cdn_number: Some(0), - attachment_identifier: Some(AttachmentIdentifier::CdnId(id)), + cdn_number: Some(attachment_upload_form.cdn), + attachment_identifier: Some(AttachmentIdentifier::CdnKey( + attachment_upload_form.key, + )), ..Default::default() }) } diff --git a/libsignal-service/src/websocket.rs b/libsignal-service/src/websocket.rs index a998cfbcd..11067bcfd 100644 --- a/libsignal-service/src/websocket.rs +++ b/libsignal-service/src/websocket.rs @@ -19,7 +19,6 @@ use crate::proto::{ }; use crate::push_service::{MismatchedDevices, ServiceError}; -mod attachment_service; mod sender; type RequestStreamItem = ( @@ -501,6 +500,27 @@ impl SignalWebSocket { self.request_json(request).await } + pub(crate) async fn get_json_with_headers( + &mut self, + path: &str, + extra_headers: impl IntoIterator, + ) -> Result + where + for<'de> T: Deserialize<'de>, + { + let headers = extra_headers + .into_iter() + .map(|(key, value)| format!("{key}:{value}")) + .collect(); + let request = WebSocketRequestMessage { + path: Some(path.into()), + verb: Some("GET".into()), + headers, + ..Default::default() + }; + self.request_json(request).await + } + pub(crate) async fn put_json( &mut self, path: &str, diff --git a/libsignal-service/src/websocket/attachment_service.rs b/libsignal-service/src/websocket/attachment_service.rs deleted file mode 100644 index 4c6e17493..000000000 --- a/libsignal-service/src/websocket/attachment_service.rs +++ /dev/null @@ -1,11 +0,0 @@ -use crate::push_service::AttachmentV2UploadAttributes; - -use super::*; - -impl SignalWebSocket { - pub async fn get_attachment_v2_upload_attributes( - &mut self, - ) -> Result { - self.get_json("/v2/attachments/form/upload").await - } -}