Skip to content

Commit

Permalink
Address comments, fix tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
brendandburns committed Apr 19, 2023
1 parent c7b3f25 commit 7dd25d2
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 37 deletions.
2 changes: 1 addition & 1 deletion crates/wasi-http/src/http_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ impl WasiHttp {
let body = Full::<Bytes>::new(
self.streams
.get(&request.body)
.unwrap_or(&Stream::new())
.unwrap_or(&Stream::default())
.data
.clone(),
);
Expand Down
28 changes: 8 additions & 20 deletions crates/wasi-http/src/streams_impl.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::poll::Pollable;
use crate::r#struct::Stream;
use crate::streams::{InputStream, OutputStream, StreamError};
use crate::WasiHttp;
use anyhow::{anyhow, bail};
Expand Down Expand Up @@ -74,34 +73,23 @@ impl crate::streams::Host for WasiHttp {
this: OutputStream,
buf: Vec<u8>,
) -> wasmtime::Result<Result<u64, StreamError>> {
let len = buf.len();
match self.streams.get(&this) {
Some(st) => {
if st.closed {
bail!("stream is dropped!");
}
let data = &st.data;
let mut new = bytes::BytesMut::with_capacity(data.len() + buf.len());
new.put(data.clone());
new.put(bytes::Bytes::from(buf.clone()));
self.streams.insert(
this,
Stream {
closed: false,
data: new.freeze(),
},
);
let new_len = st.data.len() + len;
let mut new = bytes::BytesMut::with_capacity(new_len);
new.put(st.data.clone());
new.put(bytes::Bytes::from(buf));
self.streams.insert(this, new.freeze().into());
}
None => {
self.streams.insert(
this,
Stream {
closed: false,
data: bytes::Bytes::from(buf.clone()),
},
);
self.streams.insert(this, bytes::Bytes::from(buf).into());
}
}
Ok(Ok(buf.len().try_into()?))
Ok(Ok(len.try_into()?))
}

fn write_zeroes(
Expand Down
10 changes: 8 additions & 2 deletions crates/wasi-http/src/struct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::types::{Method, Scheme};
use bytes::Bytes;
use std::collections::HashMap;

#[derive(Clone)]
#[derive(Clone, Default)]
pub struct Stream {
pub closed: bool,
pub data: Bytes,
Expand Down Expand Up @@ -74,9 +74,15 @@ impl ActiveResponse {

impl Stream {
pub fn new() -> Self {
Self::default()
}
}

impl From<Bytes> for Stream {
fn from(bytes: Bytes) -> Self {
Self {
closed: false,
data: Bytes::new(),
data: bytes,
}
}
}
Expand Down
23 changes: 9 additions & 14 deletions crates/wasi-http/src/types_impl.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use crate::poll::Pollable;
use crate::r#struct::ActiveRequest;
use crate::r#struct::{ActiveRequest, Stream};
use crate::types::{
Error, Fields, FutureIncomingResponse, Headers, IncomingRequest, IncomingResponse,
IncomingStream, Method, OutgoingRequest, OutgoingResponse, OutgoingStream, ResponseOutparam,
Scheme, StatusCode, Trailers,
};
use crate::WasiHttp;
use anyhow::{anyhow, bail};
use std::collections::HashMap;
use std::collections::{HashMap, hash_map::Entry};

impl crate::types::Host for WasiHttp {
fn drop_fields(&mut self, fields: Fields) -> wasmtime::Result<()> {
Expand Down Expand Up @@ -123,12 +123,9 @@ impl crate::types::Host for WasiHttp {
bail!("unimplemented: drop_incoming_request")
}
fn drop_outgoing_request(&mut self, request: OutgoingRequest) -> wasmtime::Result<()> {
match self.requests.get(&request) {
Some(r) => {
self.streams.remove(&r.body);
self.requests.remove(&request);
}
None => { /* pass */ }
if let Entry::Occupied(e) = self.requests.entry(request) {
let r = e.remove();
self.streams.remove(&r.body);
}
Ok(())
}
Expand Down Expand Up @@ -198,6 +195,7 @@ impl crate::types::Host for WasiHttp {
if req.body == 0 {
req.body = self.streams_id_base;
self.streams_id_base = self.streams_id_base + 1;
self.streams.insert(req.body, Stream::default());
}
Ok(Ok(req.body))
}
Expand All @@ -212,12 +210,9 @@ impl crate::types::Host for WasiHttp {
bail!("unimplemented: set_response_outparam")
}
fn drop_incoming_response(&mut self, response: IncomingResponse) -> wasmtime::Result<()> {
match self.responses.get(&response) {
Some(r) => {
self.streams.remove(&r.body);
self.responses.remove(&response);
}
None => { /* pass */ }
if let Entry::Occupied(e) = self.responses.entry(response) {
let r = e.remove();
self.streams.remove(&r.body);
}
Ok(())
}
Expand Down

0 comments on commit 7dd25d2

Please sign in to comment.