-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make streams owned by request/response that they are tied to. #6228
Changes from 1 commit
c7b3f25
a72d8e0
14b25d2
22a2dd5
bf2c3ba
b855a4b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,5 @@ | ||
use crate::r#struct::ActiveResponse; | ||
pub use crate::r#struct::WasiHttp; | ||
use crate::r#struct::{Stream, WasiHttp}; | ||
use crate::types::{RequestOptions, Scheme}; | ||
#[cfg(not(any(target_arch = "riscv64", target_arch = "s390x")))] | ||
use anyhow::anyhow; | ||
|
@@ -183,7 +183,8 @@ impl WasiHttp { | |
let body = Full::<Bytes>::new( | ||
self.streams | ||
.get(&request.body) | ||
.unwrap_or(&Bytes::new()) | ||
.unwrap_or(&Stream::new()) | ||
.data | ||
.clone(), | ||
); | ||
let t = timeout(first_bytes_timeout, sender.send_request(call.body(body)?)).await?; | ||
|
@@ -222,7 +223,13 @@ impl WasiHttp { | |
} | ||
response.body = self.streams_id_base; | ||
self.streams_id_base = self.streams_id_base + 1; | ||
self.streams.insert(response.body, buf.freeze()); | ||
self.streams.insert( | ||
response.body, | ||
Stream { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should probably also use the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done. |
||
closed: false, | ||
data: buf.freeze(), | ||
}, | ||
); | ||
self.responses.insert(response_id, response); | ||
Ok(response_id) | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,5 @@ | ||
use crate::poll::Pollable; | ||
use crate::r#struct::Stream; | ||
use crate::streams::{InputStream, OutputStream, StreamError}; | ||
use crate::WasiHttp; | ||
use anyhow::{anyhow, bail}; | ||
|
@@ -11,10 +12,14 @@ impl crate::streams::Host for WasiHttp { | |
stream: InputStream, | ||
len: u64, | ||
) -> wasmtime::Result<Result<(Vec<u8>, bool), StreamError>> { | ||
let s = self | ||
let st = self | ||
.streams | ||
.get_mut(&stream) | ||
.ok_or_else(|| anyhow!("stream not found: {stream}"))?; | ||
if st.closed { | ||
bail!("stream is dropped!"); | ||
} | ||
let s = &mut st.data; | ||
if len == 0 { | ||
Ok(Ok((bytes::Bytes::new().to_vec(), s.len() > 0))) | ||
} else if s.len() > len.try_into()? { | ||
|
@@ -31,10 +36,14 @@ impl crate::streams::Host for WasiHttp { | |
stream: InputStream, | ||
len: u64, | ||
) -> wasmtime::Result<Result<(u64, bool), StreamError>> { | ||
let s = self | ||
let st = self | ||
.streams | ||
.get_mut(&stream) | ||
.ok_or_else(|| anyhow!("stream not found: {stream}"))?; | ||
if st.closed { | ||
bail!("stream is dropped!"); | ||
} | ||
let s = &mut st.data; | ||
if len == 0 { | ||
Ok(Ok((0, s.len() > 0))) | ||
} else if s.len() > len.try_into()? { | ||
|
@@ -52,7 +61,11 @@ impl crate::streams::Host for WasiHttp { | |
} | ||
|
||
fn drop_input_stream(&mut self, stream: InputStream) -> wasmtime::Result<()> { | ||
self.streams.remove(&stream); | ||
let st = self | ||
.streams | ||
.get_mut(&stream) | ||
.ok_or_else(|| anyhow!("stream not found: {stream}"))?; | ||
st.closed = true; | ||
Ok(()) | ||
} | ||
|
||
|
@@ -62,14 +75,30 @@ impl crate::streams::Host for WasiHttp { | |
buf: Vec<u8>, | ||
) -> wasmtime::Result<Result<u64, StreamError>> { | ||
match self.streams.get(&this) { | ||
Some(data) => { | ||
Some(st) => { | ||
if st.closed { | ||
bail!("stream is dropped!"); | ||
brendandburns marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
let data = &st.data; | ||
let mut new = bytes::BytesMut::with_capacity(data.len() + buf.len()); | ||
new.put(data.clone()); | ||
new.put(bytes::Bytes::from(buf.clone())); | ||
self.streams.insert(this, new.freeze()); | ||
self.streams.insert( | ||
this, | ||
Stream { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If there was a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done. |
||
closed: false, | ||
data: new.freeze(), | ||
}, | ||
); | ||
} | ||
None => { | ||
self.streams.insert(this, bytes::Bytes::from(buf.clone())); | ||
self.streams.insert( | ||
this, | ||
Stream { | ||
closed: false, | ||
data: bytes::Bytes::from(buf.clone()), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You wouldn't need this clone if the call to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done. |
||
}, | ||
); | ||
} | ||
} | ||
Ok(Ok(buf.len().try_into()?)) | ||
|
@@ -111,7 +140,11 @@ impl crate::streams::Host for WasiHttp { | |
} | ||
|
||
fn drop_output_stream(&mut self, stream: OutputStream) -> wasmtime::Result<()> { | ||
self.streams.remove(&stream); | ||
let st = self | ||
.streams | ||
.get_mut(&stream) | ||
.ok_or_else(|| anyhow!("stream not found: {stream}"))?; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Doesn't There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure I'm following your suggestion, can you clarify? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, I'm referring to this method on the If it's in scope, you should be able to call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @rvolosatovs I made those ok_or_else(|| anyhow!(...)) suggestions in a prior PR and wasn't aware that Context could make that more succinct. Thank you. @brendandburns The suggested change here is |
||
st.closed = true; | ||
Ok(()) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,6 +2,12 @@ use crate::types::{Method, Scheme}; | |
use bytes::Bytes; | ||
use std::collections::HashMap; | ||
|
||
#[derive(Clone)] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done. |
||
pub struct Stream { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looking at the usages of this struct, it seems that a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. added. |
||
pub closed: bool, | ||
pub data: Bytes, | ||
} | ||
|
||
#[derive(Clone)] | ||
pub struct WasiHttp { | ||
pub request_id_base: u32, | ||
|
@@ -11,7 +17,7 @@ pub struct WasiHttp { | |
pub requests: HashMap<u32, ActiveRequest>, | ||
pub responses: HashMap<u32, ActiveResponse>, | ||
pub fields: HashMap<u32, HashMap<String, Vec<String>>>, | ||
pub streams: HashMap<u32, Bytes>, | ||
pub streams: HashMap<u32, Stream>, | ||
} | ||
|
||
#[derive(Clone)] | ||
|
@@ -66,6 +72,15 @@ impl ActiveResponse { | |
} | ||
} | ||
|
||
impl Stream { | ||
pub fn new() -> Self { | ||
Self { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we had a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done. |
||
closed: false, | ||
data: Bytes::new(), | ||
} | ||
} | ||
} | ||
|
||
impl WasiHttp { | ||
pub fn new() -> Self { | ||
Self { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -123,7 +123,13 @@ impl crate::types::Host for WasiHttp { | |
bail!("unimplemented: drop_incoming_request") | ||
} | ||
fn drop_outgoing_request(&mut self, request: OutgoingRequest) -> wasmtime::Result<()> { | ||
self.requests.remove(&request); | ||
match self.requests.get(&request) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit; since we only care about the Or even
https://doc.rust-lang.org/std/collections/hash_map/struct.OccupiedEntry.html#method.remove There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. Thanks for the suggestion, I'm still learning idiomatic rust) |
||
Some(r) => { | ||
self.streams.remove(&r.body); | ||
self.requests.remove(&request); | ||
} | ||
None => { /* pass */ } | ||
} | ||
Ok(()) | ||
} | ||
fn incoming_request_method(&mut self, _request: IncomingRequest) -> wasmtime::Result<Method> { | ||
|
@@ -206,7 +212,13 @@ impl crate::types::Host for WasiHttp { | |
bail!("unimplemented: set_response_outparam") | ||
} | ||
fn drop_incoming_response(&mut self, response: IncomingResponse) -> wasmtime::Result<()> { | ||
self.responses.remove(&response); | ||
match self.responses.get(&response) { | ||
Some(r) => { | ||
self.streams.remove(&r.body); | ||
self.responses.remove(&response); | ||
} | ||
None => { /* pass */ } | ||
} | ||
Ok(()) | ||
} | ||
fn drop_outgoing_response(&mut self, _response: OutgoingResponse) -> wasmtime::Result<()> { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we had
Default
implemented, this could have beenunwrap_or_default
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately,
HashMap::get(...)
returns a reference rather than a value, and there's no default for a reference. I could implement theDefault
trait for&Stream
references, but I'm not sure if that's what is best. Let me know.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense, I think this is fine.
I am making a few changes here to allow for the host to provide a custom connect implementation, so I'll take a deeper look at this as well next week
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#6272