Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch to attachments v4 #330

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion libsignal-service-actix/src/push_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -579,13 +579,14 @@ impl PushService for AwcPushService {

async fn post_to_cdn0<'s, C: std::io::Read + Send + 's>(
&mut self,
cdn_id: u32,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
async fn post_to_cdn0<'s, C: std::io::Read + Send + 's>(
&mut self,
cdn_id: u32,
async fn post_to_cdn<'s, C: std::io::Read + Send + 's>(
&mut self,
cdn_id: u32,

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(We probably want to retain upload capability to cdn0 for profile pics etc?)

path: &str,
value: &[(&str, &str)],
file: Option<(&str, &'s mut C)>,
) -> Result<(), ServiceError> {
let request = self.request(
Method::POST,
Endpoint::Cdn(0),
Endpoint::Cdn(cdn_id),
path,
&[],
HttpAuthOverride::NoOverride,
Expand Down
139 changes: 70 additions & 69 deletions libsignal-service-hyper/src/push_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,75 +531,76 @@ impl PushService for HyperPushService {
))
}

#[tracing::instrument(skip(self, value, file), fields(file = file.as_ref().map(|_| "")))]
async fn post_to_cdn0<'s, C: io::Read + Send + 's>(
&mut self,
path: &str,
value: &[(&str, &str)],
file: Option<(&str, &'s mut C)>,
) -> Result<(), ServiceError> {
let mut form = mpart_async::client::MultipartRequest::default();

// mpart-async has a peculiar ordering of the form items,
// and Amazon S3 expects them in a very specific order (i.e., the file contents should
// go last.
//
// mpart-async uses a VecDeque internally for ordering the fields in the order given.
//
// https://github.com/cetra3/mpart-async/issues/16

for &(k, v) in value {
form.add_field(k, v);
}

if let Some((filename, file)) = file {
// XXX Actix doesn't cope with none-'static lifetimes
// https://docs.rs/actix-web/3.2.0/actix_web/body/enum.Body.html
let mut buf = Vec::new();
file.read_to_end(&mut buf)
.expect("infallible Read instance");
form.add_stream(
"file",
filename,
"application/octet-stream",
futures::future::ok::<_, ()>(Bytes::from(buf)).into_stream(),
);
}

let content_type =
format!("multipart/form-data; boundary={}", form.get_boundary());

// XXX Amazon S3 needs the Content-Length, but we don't know it without depleting the whole
// stream. Sadly, Content-Length != contents.len(), but should include the whole form.
let mut body_contents = vec![];
while let Some(b) = form.next().await {
// Unwrap, because no error type was used above
body_contents.extend(b.unwrap());
}
tracing::trace!(
"Sending PUT with Content-Type={} and length {}",
content_type,
body_contents.len()
);

let response = self
.request(
Method::POST,
Endpoint::Cdn(0),
path,
&[],
HttpAuthOverride::NoOverride,
Some(RequestBody {
contents: body_contents,
content_type,
}),
)
.await?;

debug!("HyperPushService::PUT response: {:?}", response);

Ok(())
}
// #[tracing::instrument(skip(self, value, file), fields(file = file.as_ref().map(|_| "")))]
// async fn post_to_cdn0<'s, C: io::Read + Send + 's>(
// &mut self,
// cdn_id: u32,
// path: &str,
// value: &[(&str, &str)],
// file: Option<(&str, &'s mut C)>,
// ) -> Result<(), ServiceError> {
// let mut form = mpart_async::client::MultipartRequest::default();

// // mpart-async has a peculiar ordering of the form items,
// // and Amazon S3 expects them in a very specific order (i.e., the file contents should
// // go last.
// //
// // mpart-async uses a VecDeque internally for ordering the fields in the order given.
// //
// // https://github.com/cetra3/mpart-async/issues/16

// for &(k, v) in value {
// form.add_field(k, v);
// }

// if let Some((filename, file)) = file {
// // XXX Actix doesn't cope with none-'static lifetimes
// // https://docs.rs/actix-web/3.2.0/actix_web/body/enum.Body.html
// let mut buf = Vec::new();
// file.read_to_end(&mut buf)
// .expect("infallible Read instance");
// form.add_stream(
// "file",
// filename,
// "application/octet-stream",
// futures::future::ok::<_, ()>(Bytes::from(buf)).into_stream(),
// );
// }

// let content_type =
// format!("multipart/form-data; boundary={}", form.get_boundary());

// // XXX Amazon S3 needs the Content-Length, but we don't know it without depleting the whole
// // stream. Sadly, Content-Length != contents.len(), but should include the whole form.
// let mut body_contents = vec![];
// while let Some(b) = form.next().await {
// // Unwrap, because no error type was used above
// body_contents.extend(b.unwrap());
// }
// tracing::trace!(
// "Sending PUT with Content-Type={} and length {}",
// content_type,
// body_contents.len()
// );

// let response = self
// .request(
// Method::POST,
// Endpoint::Cdn(cdn_id),
// path,
// &[],
// HttpAuthOverride::NoOverride,
// Some(RequestBody {
// contents: body_contents,
// content_type,
// }),
// )
// .await?;

// debug!("HyperPushService::PUT response: {:?}", response);

// Ok(())
// }

async fn ws(
&mut self,
Expand Down
10 changes: 10 additions & 0 deletions libsignal-service/src/proto.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#![allow(clippy::all)]

use std::collections::HashMap;

use rand::{Rng, RngCore};
include!(concat!(env!("OUT_DIR"), "/signalservice.rs"));
include!(concat!(env!("OUT_DIR"), "/signal.rs"));
Expand Down Expand Up @@ -66,6 +68,14 @@ impl WebSocketResponseMessage {
}
}
}

pub fn headers(&self) -> HashMap<String, String> {
self.headers
.iter()
.filter_map(|kv| kv.split_once(":"))
.map(|(k, v)| (k.to_owned(), v.to_owned()))
.collect()
}
}

impl SyncMessage {
Expand Down
164 changes: 107 additions & 57 deletions libsignal-service/src/push_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub const DIRECTORY_AUTH_PATH: &str = "/v1/directory/auth";
pub const DIRECTORY_FEEDBACK_PATH: &str = "/v1/directory/feedback-v3/%s";
pub const SENDER_ACK_MESSAGE_PATH: &str = "/v1/messages/%s/%d";
pub const UUID_ACK_MESSAGE_PATH: &str = "/v1/messages/uuid/%s";
pub const ATTACHMENT_PATH: &str = "/v2/attachments/form/upload";
pub const ATTACHMENT_PATH: &str = "/v4/attachments/form/upload";

pub const PROFILE_PATH: &str = "/v1/profile/";

Expand Down Expand Up @@ -513,20 +513,38 @@ impl SignalServiceProfile {
}
}

#[derive(Debug, serde::Deserialize, Default)]
#[derive(Debug, Deserialize, Default)]
#[serde(rename_all = "camelCase")]
pub(crate) struct AttachmentUploadForm {
pub cdn: u64,
pub key: String,
pub headers: HashMap<String, String>,
pub signed_upload_location: String,
}

#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
pub(crate) struct AttachmentDigest {
pub digest: Vec<u8>,
pub incremental_digest: Option<Vec<u8>>,
pub incremental_mac_chunkSize: u64,
}

#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct AttachmentV2UploadAttributes {
key: String,
credential: String,
acl: String,
algorithm: String,
date: String,
policy: String,
signature: String,
// This is different from Java's implementation,
// and I (Ruben) am unsure why they decide to force-parse at upload-time instead of at registration
// time.
attachment_id: u64,
pub(crate) struct ResumableUploadSpec {
attachment_key: Vec<u8>,
attachment_iv: Vec<u8>,
cdn_key: String,
cdn_number: u32,
resume_location: String,
expiration_timestamp: u64,
headers: HashMap<String, String>,
}

#[derive(Debug)]
pub(crate) struct ResumeInfo {
pub offset: usize,
}

#[derive(thiserror::Error, Debug)]
Expand Down Expand Up @@ -607,6 +625,9 @@ pub enum ServiceError {

#[error("invalid device name")]
InvalidDeviceName,

#[error("Unknown CDN version {0}")]
UnknownCdnVersion(u64),
}

#[cfg_attr(feature = "unsend-futures", async_trait::async_trait(?Send))]
Expand Down Expand Up @@ -697,15 +718,25 @@ pub trait PushService: MaybeSend {
path: &str,
) -> Result<Self::ByteStream, ServiceError>;

/// Upload larger file to CDN0 in legacy fashion, e.g. for attachments.
/// Upload larger file to CDN2
///
/// Implementations are allowed to *panic* when the Read instance throws an IO-Error
async fn post_to_cdn2<'s, C: std::io::Read + Send + 's>(
&mut self,
path: &str,
value: &[(&str, &str)],
file: Option<(&str, &'s mut C)>,
) -> Result<AttachmentDigest, ServiceError>;

/// Upload larger file to CDN3
///
/// Implementations are allowed to *panic* when the Read instance throws an IO-Error
async fn post_to_cdn0<'s, C: std::io::Read + Send + 's>(
async fn post_to_cdn3<'s, C: std::io::Read + Send + 's>(
&mut self,
path: &str,
value: &[(&str, &str)],
file: Option<(&str, &'s mut C)>,
) -> Result<(), ServiceError>;
) -> Result<AttachmentDigest, ServiceError>;

async fn ws(
&mut self,
Expand Down Expand Up @@ -833,49 +864,68 @@ pub trait PushService: MaybeSend {
.await
}

/// Request AttachmentV2UploadAttributes
///
/// Equivalent with getAttachmentV2UploadAttributes
async fn get_attachment_v2_upload_attributes(
&mut self,
) -> Result<AttachmentV2UploadAttributes, ServiceError> {
self.get_json(
Endpoint::Service,
"/v2/attachments/form/upload",
&[],
HttpAuthOverride::NoOverride,
)
.await
}

/// Upload attachment to CDN
/// Upload attachment to CDN0
///
/// Returns attachment ID and the attachment digest
async fn upload_attachment<'s, C: std::io::Read + Send + 's>(
// async fn upload_attachment_v2<'s, C: std::io::Read + Send + 's>(
// &mut self,
// attrs: &AttachmentUploadForm,
// content: &'s mut C,
// ) -> Result<(u64, Vec<u8>), ServiceError> {
// let values = [
// ("acl", &attrs.acl as &str),
// ("key", &attrs.key),
// ("policy", &attrs.policy),
// ("Content-Type", "application/octet-stream"),
// ("x-amz-algorithm", &attrs.algorithm),
// ("x-amz-credential", &attrs.credential),
// ("x-amz-date", &attrs.date),
// ("x-amz-signature", &attrs.signature),
// ];

// let mut digester = crate::digeststream::DigestingReader::new(content);

// self.post_to_cdn0(
// "attachments/",
// &values,
// Some(("file", &mut digester)),
// )
// .await?;

// Ok((attrs.attachment_id, digester.finalize()))
// }
async fn upload_attachment_v4<'s, C: std::io::Read + Send + 's>(
&mut self,
attrs: &AttachmentV2UploadAttributes,
content: &'s mut C,
) -> Result<(u64, Vec<u8>), ServiceError> {
let values = [
("acl", &attrs.acl as &str),
("key", &attrs.key),
("policy", &attrs.policy),
("Content-Type", "application/octet-stream"),
("x-amz-algorithm", &attrs.algorithm),
("x-amz-credential", &attrs.credential),
("x-amz-date", &attrs.date),
("x-amz-signature", &attrs.signature),
];

let mut digester = crate::digeststream::DigestingReader::new(content);

self.post_to_cdn0(
"attachments/",
&values,
Some(("file", &mut digester)),
)
.await?;
Ok((attrs.attachment_id, digester.finalize()))
upload_form: &AttachmentUploadForm,
resumable_upload_spec: ResumableUploadSpec,
content: &mut C,
) -> Result<AttachmentDigest, ServiceError> {
if upload_form.cdn == 2 {
unimplemented!()
// self.post_to_cdn2(
// attachment.getResumableUploadSpec().getResumeLocation(),
// attachment.getData(),
// "application/octet-stream",
// attachment.getDataSize(),
// attachment.getIncremental(),
// attachment.getOutputStreamFactory(),
// attachment.getListener(),
// attachment.getCancelationSignal(),
// )
} else {
unimplemented!()
// self.post_to_cdn3(
// attachment.getResumableUploadSpec().getResumeLocation(),
// attachment.getData(),
// "application/offset+octet-stream",
// attachment.getDataSize(),
// attachment.getIncremental(),
// attachment.getOutputStreamFactory(),
// attachment.getListener(),
// attachment.getCancelationSignal(),
// attachment.getResumableUploadSpec().getHeaders(),
// )
}
}

async fn get_messages(
Expand Down
Loading