diff --git a/Cargo.toml b/Cargo.toml index d360479..8019adc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,7 @@ categories.workspace = true [dependencies] futures-core.workspace = true http.workspace = true +itoa.workspace = true pin-project-lite.workspace = true slab.workspace = true wasi.workspace = true @@ -52,6 +53,7 @@ futures-core = "0.3.19" futures-lite = "1.12.0" heck = "0.5" http = "1.1" +itoa = "1" pin-project-lite = "0.2.8" quote = "1.0" serde_json = "1" @@ -60,7 +62,8 @@ syn = "2.0" test-log = { version = "0.2", features = ["trace"] } test-programs = { path = "test-programs" } test-programs-artifacts = { path = "test-programs/artifacts" } -wasi = "0.13.1" +ureq = { version = "2.12.1", default-features = false } +wasi = "0.14.0" wasmtime = "26" wasmtime-wasi = "26" wasmtime-wasi-http = "26" diff --git a/examples/http_server.rs b/examples/http_server.rs new file mode 100644 index 0000000..b21eda4 --- /dev/null +++ b/examples/http_server.rs @@ -0,0 +1,94 @@ +use wstd::http::body::{BodyForthcoming, IncomingBody, OutgoingBody}; +use wstd::http::server::{Finished, Responder}; +use wstd::http::{IntoBody, Request, Response, StatusCode}; +use wstd::io::{copy, empty, AsyncWrite}; +use wstd::time::{Duration, Instant}; + +#[wstd::http_server] +async fn main(request: Request, responder: Responder) -> Finished { + match request.uri().path_and_query().unwrap().as_str() { + "/wait" => http_wait(request, responder).await, + "/echo" => http_echo(request, responder).await, + "/echo-headers" => http_echo_headers(request, responder).await, + "/echo-trailers" => http_echo_trailers(request, responder).await, + "/fail" => http_fail(request, responder).await, + "/bigfail" => http_bigfail(request, responder).await, + "/" => http_home(request, responder).await, + _ => http_not_found(request, responder).await, + } +} + +async fn http_home(_request: Request, responder: Responder) -> Finished { + // To send a single string as the response body, use `Responder::respond`. + responder + .respond(Response::new("Hello, wasi:http/proxy world!\n".into_body())) + .await +} + +async fn http_wait(_request: Request, responder: Responder) -> Finished { + // Get the time now + let now = Instant::now(); + + // Sleep for one second. + wstd::task::sleep(Duration::from_secs(1)).await; + + // Compute how long we slept for. + let elapsed = Instant::now().duration_since(now).as_millis(); + + // To stream data to the response body, use `Responder::start_response`. + let mut body = responder.start_response(Response::new(BodyForthcoming)); + let result = body + .write_all(format!("slept for {elapsed} millis\n").as_bytes()) + .await; + Finished::finish(body, result, None) +} + +async fn http_echo(mut request: Request, responder: Responder) -> Finished { + // Stream data from the request body to the response body. + let mut body = responder.start_response(Response::new(BodyForthcoming)); + let result = copy(request.body_mut(), &mut body).await; + Finished::finish(body, result, None) +} + +async fn http_fail(_request: Request, responder: Responder) -> Finished { + let body = responder.start_response(Response::new(BodyForthcoming)); + Finished::fail(body) +} + +async fn http_bigfail(_request: Request, responder: Responder) -> Finished { + async fn write_body(body: &mut OutgoingBody) -> wstd::io::Result<()> { + for _ in 0..0x10 { + body.write_all("big big big big\n".as_bytes()).await?; + } + body.flush().await?; + Ok(()) + } + + let mut body = responder.start_response(Response::new(BodyForthcoming)); + let _ = write_body(&mut body).await; + Finished::fail(body) +} + +async fn http_echo_headers(request: Request, responder: Responder) -> Finished { + let mut response = Response::builder(); + *response.headers_mut().unwrap() = request.into_parts().0.headers; + let response = response.body(empty()).unwrap(); + responder.respond(response).await +} + +async fn http_echo_trailers(request: Request, responder: Responder) -> Finished { + let body = responder.start_response(Response::new(BodyForthcoming)); + let (trailers, result) = match request.into_body().finish().await { + Ok(trailers) => (trailers, Ok(())), + Err(err) => (Default::default(), Err(std::io::Error::other(err))), + }; + Finished::finish(body, result, trailers) +} + +async fn http_not_found(_request: Request, responder: Responder) -> Finished { + let response = Response::builder() + .status(StatusCode::NOT_FOUND) + .body(empty()) + .unwrap(); + responder.respond(response).await +} diff --git a/macro/src/lib.rs b/macro/src/lib.rs index 2f8712c..fa78ee0 100644 --- a/macro/src/lib.rs +++ b/macro/src/lib.rs @@ -83,3 +83,103 @@ pub fn attr_macro_test(_attr: TokenStream, item: TokenStream) -> TokenStream { } .into() } + +/// Enables a HTTP server main function, for creating [HTTP servers]. +/// +/// [HTTP servers]: https://docs.rs/wstd/latest/wstd/http/server/index.html +/// +/// # Examples +/// +/// ```ignore +/// #[wstd::http_server] +/// async fn main(request: Request, responder: Responder) -> Finished { +/// responder +/// .respond(Response::new("Hello!\n".into_body())) +/// .await +/// } +/// ``` +#[proc_macro_attribute] +pub fn attr_macro_http_server(_attr: TokenStream, item: TokenStream) -> TokenStream { + let input = parse_macro_input!(item as ItemFn); + + if input.sig.asyncness.is_none() { + return quote_spanned! { input.sig.fn_token.span()=> + compile_error!("fn must be `async fn`"); + } + .into(); + } + + let output = &input.sig.output; + let inputs = &input.sig.inputs; + let name = &input.sig.ident; + let body = &input.block; + let attrs = &input.attrs; + let vis = &input.vis; + + if name != "main" { + return quote_spanned! { input.sig.ident.span()=> + compile_error!("only `async fn main` can be used for #[wstd::http_server]"); + } + .into(); + } + + quote! { + struct TheServer; + + impl ::wstd::wasi::exports::http::incoming_handler::Guest for TheServer { + fn handle( + request: ::wstd::wasi::http::types::IncomingRequest, + response_out: ::wstd::wasi::http::types::ResponseOutparam + ) { + #(#attrs)* + #vis async fn __run(#inputs) #output { + #body + } + + let responder = ::wstd::http::server::Responder::new(response_out); + let _finished: ::wstd::http::server::Finished = + match ::wstd::http::try_from_incoming_request(request) + { + Ok(request) => ::wstd::runtime::block_on(async { __run(request, responder).await }), + Err(err) => responder.fail(err), + }; + } + } + + ::wstd::wasi::http::proxy::export!(TheServer with_types_in ::wstd::wasi); + + // Provide an actual function named `main`. + // + // WASI HTTP server components don't use a traditional `main` function. + // They export a function named `handle` which takes a `Request` + // argument, and which may be called multiple times on the same + // instance. To let users write a familiar `fn main` in a file + // named src/main.rs, we provide this `wstd::http_server` macro, which + // transforms the user's `fn main` into the appropriate `handle` + // function. + // + // However, when the top-level file is named src/main.rs, rustc + // requires there to be a function named `main` somewhere in it. This + // requirement can be disabled using `#![no_main]`, however we can't + // use that automatically because macros can't contain inner + // attributes, and we don't want to require users to add `#![no_main]` + // in their own code. + // + // So, we include a definition of a function named `main` here, which + // isn't intended to ever be called, and exists just to satify the + // requirement for a `main` function. + // + // Users could use `#![no_main]` if they want to. Or, they could name + // their top-level file src/lib.rs and add + // ```toml + // [lib] + // crate-type = ["cdylib"] + // ``` + // to their Cargo.toml. With either of these, this "main" function will + // be ignored as dead code. + fn main() { + unreachable!("HTTP server components should be run with `handle` rather than `run`") + } + } + .into() +} diff --git a/src/http/body.rs b/src/http/body.rs index 7352cb4..a301d13 100644 --- a/src/http/body.rs +++ b/src/http/body.rs @@ -1,6 +1,8 @@ //! HTTP body types -use crate::io::{AsyncInputStream, AsyncRead, Cursor, Empty}; +use crate::http::fields::header_map_from_wasi; +use crate::io::{AsyncInputStream, AsyncOutputStream, AsyncRead, AsyncWrite, Cursor, Empty}; +use crate::runtime::AsyncPollable; use core::fmt; use http::header::{CONTENT_LENGTH, TRANSFER_ENCODING}; use wasi::http::types::IncomingBody as WasiIncomingBody; @@ -116,9 +118,9 @@ impl Body for Empty { pub struct IncomingBody { kind: BodyKind, // IMPORTANT: the order of these fields here matters. `body_stream` must - // be dropped before `_incoming_body`. + // be dropped before `incoming_body`. body_stream: AsyncInputStream, - _incoming_body: WasiIncomingBody, + incoming_body: WasiIncomingBody, } impl IncomingBody { @@ -130,9 +132,29 @@ impl IncomingBody { Self { kind, body_stream, - _incoming_body: incoming_body, + incoming_body, } } + + /// Consume this `IncomingBody` and return the trailers, if present. + pub async fn finish(self) -> Result, Error> { + // The stream is a child resource of the `IncomingBody`, so ensure that + // it's dropped first. + drop(self.body_stream); + + let trailers = WasiIncomingBody::finish(self.incoming_body); + + AsyncPollable::new(trailers.subscribe()).wait_for().await; + + let trailers = trailers.get().unwrap().unwrap()?; + + let trailers = match trailers { + None => None, + Some(trailers) => Some(header_map_from_wasi(trailers)?), + }; + + Ok(trailers) + } } impl AsyncRead for IncomingBody { @@ -177,3 +199,79 @@ impl From for Error { ErrorVariant::Other(e.to_string()).into() } } + +/// The output stream for the body, implementing [`AsyncWrite`]. Call +/// [`Responder::start_response`] to obtain one. Once the body is complete, +/// it must be declared finished, using [`OutgoingBody::finish`]. +#[must_use] +pub struct OutgoingBody { + // IMPORTANT: the order of these fields here matters. `stream` must + // be dropped before `body`. + stream: AsyncOutputStream, + body: wasi::http::types::OutgoingBody, + dontdrop: DontDropOutgoingBody, +} + +impl OutgoingBody { + pub(crate) fn new(stream: AsyncOutputStream, body: wasi::http::types::OutgoingBody) -> Self { + Self { + stream, + body, + dontdrop: DontDropOutgoingBody, + } + } + + pub(crate) fn consume(self) -> (AsyncOutputStream, wasi::http::types::OutgoingBody) { + let Self { + stream, + body, + dontdrop, + } = self; + + std::mem::forget(dontdrop); + + (stream, body) + } + + /// Return a reference to the underlying `AsyncOutputStream`. + /// + /// This usually isn't needed, as `OutgoingBody` implements `AsyncWrite` + /// too, however it is useful for code that expects to work with + /// `AsyncOutputStream` specifically. + pub fn stream(&mut self) -> &mut AsyncOutputStream { + &mut self.stream + } +} + +impl AsyncWrite for OutgoingBody { + async fn write(&mut self, buf: &[u8]) -> crate::io::Result { + self.stream.write(buf).await + } + + async fn flush(&mut self) -> crate::io::Result<()> { + self.stream.flush().await + } + + fn as_async_output_stream(&self) -> Option<&AsyncOutputStream> { + Some(&self.stream) + } +} + +/// A utility to ensure that `OutgoingBody` is either finished or failed, and +/// not implicitly dropped. +struct DontDropOutgoingBody; + +impl Drop for DontDropOutgoingBody { + fn drop(&mut self) { + unreachable!("`OutgoingBody::drop` called; `OutgoingBody`s should be consumed with `finish` or `fail`."); + } +} + +/// A placeholder for use as the type parameter to [`Response`] to indicate +/// that the body has not yet started. This is used with +/// [`Responder::start_response`], which has a `Response` +/// argument. +/// +/// To instead start the response and obtain the output stream for the body, +/// use [`Responder::respond`]. +pub struct BodyForthcoming; diff --git a/src/http/client.rs b/src/http/client.rs index ae6e948..f6d576e 100644 --- a/src/http/client.rs +++ b/src/http/client.rs @@ -20,6 +20,8 @@ impl Client { /// Send an HTTP request. pub async fn send(&self, req: Request) -> Result> { + // We don't use `body::OutputBody` here because we can report I/O + // errors from the `copy` directly. let (wasi_req, body) = try_into_outgoing(req)?; let wasi_body = wasi_req.body().unwrap(); let body_stream = wasi_body.write().unwrap(); diff --git a/src/http/error.rs b/src/http/error.rs index a32cf1c..bfa5c36 100644 --- a/src/http/error.rs +++ b/src/http/error.rs @@ -1,3 +1,4 @@ +use crate::http::fields::ToWasiHeaderError; use std::fmt; /// The `http` result type. @@ -78,9 +79,12 @@ impl From for Error { } } -impl From for Error { - fn from(e: WasiHttpHeaderError) -> Error { - ErrorVariant::WasiHeader(e).into() +impl From for Error { + fn from(error: ToWasiHeaderError) -> Error { + Error { + variant: ErrorVariant::WasiHeader(error.error), + context: vec![error.context], + } } } diff --git a/src/http/fields.rs b/src/http/fields.rs index 22f7093..cd41684 100644 --- a/src/http/fields.rs +++ b/src/http/fields.rs @@ -1,9 +1,9 @@ pub use http::header::{HeaderMap, HeaderName, HeaderValue}; -use super::{Error, Result}; -use wasi::http::types::Fields; +use super::Error; +use wasi::http::types::{Fields, HeaderError as WasiHttpHeaderError}; -pub(crate) fn header_map_from_wasi(wasi_fields: Fields) -> Result { +pub(crate) fn header_map_from_wasi(wasi_fields: Fields) -> Result { let mut output = HeaderMap::new(); for (key, value) in wasi_fields.entries() { let key = HeaderName::from_bytes(key.as_bytes()) @@ -15,12 +15,22 @@ pub(crate) fn header_map_from_wasi(wasi_fields: Fields) -> Result { Ok(output) } -pub(crate) fn header_map_to_wasi(header_map: &HeaderMap) -> Result { +pub(crate) fn header_map_to_wasi(header_map: &HeaderMap) -> Result { let wasi_fields = Fields::new(); for (key, value) in header_map { + // Unwrap because `HeaderMap` has already validated the headers. wasi_fields - .append(&key.as_str().to_owned(), &value.as_bytes().to_owned()) - .map_err(|e| Error::from(e).context("header named {key}"))?; + .append(key.as_str(), value.as_bytes()) + .map_err(|error| ToWasiHeaderError { + error, + context: format!("header {key}: {value:?}"), + })?; } Ok(wasi_fields) } + +#[derive(Debug)] +pub(crate) struct ToWasiHeaderError { + pub(crate) error: WasiHttpHeaderError, + pub(crate) context: String, +} diff --git a/src/http/method.rs b/src/http/method.rs index bd7c210..1f06eff 100644 --- a/src/http/method.rs +++ b/src/http/method.rs @@ -1,6 +1,6 @@ use wasi::http::types::Method as WasiMethod; -use super::Result; +use http::method::InvalidMethod; pub use http::Method; pub(crate) fn to_wasi_method(value: Method) -> WasiMethod { @@ -18,9 +18,7 @@ pub(crate) fn to_wasi_method(value: Method) -> WasiMethod { } } -// This will become useful once we support IncomingRequest -#[allow(dead_code)] -pub(crate) fn from_wasi_method(value: WasiMethod) -> Result { +pub(crate) fn from_wasi_method(value: WasiMethod) -> Result { Ok(match value { WasiMethod::Get => Method::GET, WasiMethod::Head => Method::HEAD, diff --git a/src/http/mod.rs b/src/http/mod.rs index 1bc1aa3..2c73d93 100644 --- a/src/http/mod.rs +++ b/src/http/mod.rs @@ -1,7 +1,7 @@ //! HTTP networking support //! pub use http::status::StatusCode; -pub use http::uri::Uri; +pub use http::uri::{Authority, PathAndQuery, Uri}; #[doc(inline)] pub use body::{Body, IntoBody}; @@ -9,8 +9,9 @@ pub use client::Client; pub use error::{Error, Result}; pub use fields::{HeaderMap, HeaderName, HeaderValue}; pub use method::Method; -pub use request::Request; +pub use request::{try_from_incoming_request, Request}; pub use response::Response; +pub use scheme::{InvalidUri, Scheme}; pub mod body; @@ -20,3 +21,5 @@ mod fields; mod method; mod request; mod response; +mod scheme; +pub mod server; diff --git a/src/http/request.rs b/src/http/request.rs index 21411ad..5b9aac5 100644 --- a/src/http/request.rs +++ b/src/http/request.rs @@ -1,10 +1,18 @@ -use super::{fields::header_map_to_wasi, method::to_wasi_method, Error, Result}; +use super::{ + body::{BodyKind, IncomingBody}, + error::WasiHttpErrorCode, + fields::{header_map_from_wasi, header_map_to_wasi}, + method::{from_wasi_method, to_wasi_method}, + scheme::{from_wasi_scheme, to_wasi_scheme}, + Authority, Error, HeaderMap, PathAndQuery, Uri, +}; +use crate::io::AsyncInputStream; use wasi::http::outgoing_handler::OutgoingRequest; -use wasi::http::types::Scheme; +use wasi::http::types::IncomingRequest; pub use http::Request; -pub(crate) fn try_into_outgoing(request: Request) -> Result<(OutgoingRequest, T)> { +pub(crate) fn try_into_outgoing(request: Request) -> Result<(OutgoingRequest, T), Error> { let wasi_req = OutgoingRequest::new(header_map_to_wasi(request.headers())?); let (parts, body) = request.into_parts(); @@ -16,11 +24,11 @@ pub(crate) fn try_into_outgoing(request: Request) -> Result<(OutgoingReque .map_err(|()| Error::other(format!("method rejected by wasi-http: {method:?}",)))?; // Set the url scheme - let scheme = match parts.uri.scheme().map(|s| s.as_str()) { - Some("http") => Scheme::Http, - Some("https") | None => Scheme::Https, - Some(other) => Scheme::Other(other.to_owned()), - }; + let scheme = parts + .uri + .scheme() + .map(to_wasi_scheme) + .unwrap_or(wasi::http::types::Scheme::Https); wasi_req .set_scheme(Some(&scheme)) .map_err(|()| Error::other(format!("scheme rejected by wasi-http: {scheme:?}")))?; @@ -34,7 +42,7 @@ pub(crate) fn try_into_outgoing(request: Request) -> Result<(OutgoingReque // Set the url path + query string if let Some(p_and_q) = parts.uri.path_and_query() { wasi_req - .set_path_with_query(Some(&p_and_q.to_string())) + .set_path_with_query(Some(p_and_q.as_str())) .map_err(|()| { Error::other(format!("path and query rejected by wasi-http {p_and_q:?}")) })?; @@ -43,3 +51,66 @@ pub(crate) fn try_into_outgoing(request: Request) -> Result<(OutgoingReque // All done; request is ready for send-off Ok((wasi_req, body)) } + +/// This is used by the `http_server` macro. +#[doc(hidden)] +pub fn try_from_incoming_request( + incoming: IncomingRequest, +) -> Result, WasiHttpErrorCode> { + // TODO: What's the right error code to use for invalid headers? + let headers: HeaderMap = header_map_from_wasi(incoming.headers()) + .map_err(|e| WasiHttpErrorCode::InternalError(Some(e.to_string())))?; + + let method = from_wasi_method(incoming.method()) + .map_err(|_| WasiHttpErrorCode::HttpRequestMethodInvalid)?; + let scheme = incoming.scheme().map(|scheme| { + from_wasi_scheme(scheme).expect("TODO: what shall we do with an invalid uri here?") + }); + let authority = incoming.authority().map(|authority| { + Authority::from_maybe_shared(authority) + .expect("TODO: what shall we do with an invalid uri authority here?") + }); + let path_and_query = incoming.path_with_query().map(|path_and_query| { + PathAndQuery::from_maybe_shared(path_and_query) + .expect("TODO: what shall we do with an invalid uri path-and-query here?") + }); + + // TODO: What's the right error code to use for invalid headers? + let kind = BodyKind::from_headers(&headers) + .map_err(|e| WasiHttpErrorCode::InternalError(Some(e.to_string())))?; + // `body_stream` is a child of `incoming_body` which means we cannot + // drop the parent before we drop the child + let incoming_body = incoming + .consume() + .expect("cannot call `consume` twice on incoming request"); + let body_stream = incoming_body + .stream() + .expect("cannot call `stream` twice on an incoming body"); + let body_stream = AsyncInputStream::new(body_stream); + + let body = IncomingBody::new(kind, body_stream, incoming_body); + + let mut uri = Uri::builder(); + if let Some(scheme) = scheme { + uri = uri.scheme(scheme); + } + if let Some(authority) = authority { + uri = uri.authority(authority); + } + if let Some(path_and_query) = path_and_query { + uri = uri.path_and_query(path_and_query); + } + // TODO: What's the right error code to use for an invalid uri? + let uri = uri + .build() + .map_err(|e| WasiHttpErrorCode::InternalError(Some(e.to_string())))?; + + let mut request = Request::builder().method(method).uri(uri); + if let Some(headers_mut) = request.headers_mut() { + *headers_mut = headers; + } + // TODO: What's the right error code to use for an invalid request? + request + .body(body) + .map_err(|e| WasiHttpErrorCode::InternalError(Some(e.to_string()))) +} diff --git a/src/http/response.rs b/src/http/response.rs index a2b243a..ed7577d 100644 --- a/src/http/response.rs +++ b/src/http/response.rs @@ -3,14 +3,16 @@ use wasi::http::types::IncomingResponse; use super::{ body::{BodyKind, IncomingBody}, fields::header_map_from_wasi, - Error, HeaderMap, Result, + Error, HeaderMap, }; use crate::io::AsyncInputStream; use http::StatusCode; pub use http::Response; -pub(crate) fn try_from_incoming(incoming: IncomingResponse) -> Result> { +pub(crate) fn try_from_incoming( + incoming: IncomingResponse, +) -> Result, Error> { let headers: HeaderMap = header_map_from_wasi(incoming.headers())?; // TODO: Does WASI guarantee that the incoming status is valid? let status = diff --git a/src/http/scheme.rs b/src/http/scheme.rs new file mode 100644 index 0000000..860ce35 --- /dev/null +++ b/src/http/scheme.rs @@ -0,0 +1,20 @@ +use wasi::http::types::Scheme as WasiScheme; + +pub use http::uri::{InvalidUri, Scheme}; +use std::str::FromStr; + +pub(crate) fn to_wasi_scheme(value: &Scheme) -> WasiScheme { + match value.as_str() { + "http" => WasiScheme::Http, + "https" => WasiScheme::Https, + other => WasiScheme::Other(other.to_owned()), + } +} + +pub(crate) fn from_wasi_scheme(value: WasiScheme) -> Result { + Ok(match value { + WasiScheme::Http => Scheme::HTTP, + WasiScheme::Https => Scheme::HTTPS, + WasiScheme::Other(other) => Scheme::from_str(&other)?, + }) +} diff --git a/src/http/server.rs b/src/http/server.rs new file mode 100644 index 0000000..2d2400e --- /dev/null +++ b/src/http/server.rs @@ -0,0 +1,199 @@ +//! HTTP servers +//! +//! The WASI HTTP server API uses the [typed main] idiom, with a `main` function +//! that takes a [`Request`] and a [`Responder`], and responds with a [`Response`], +//! using the [`http_server`] macro: +//! +//! ```no_run +//! #[wstd::http_server] +//! async fn main(request: Request, responder: Responder) -> Finished { +//! responder +//! .respond(Response::new("Hello!\n".into_body())) +//! .await +//! } +//! ``` +//! +//! [typed main]: https://sunfishcode.github.io/typed-main-wasi-presentation/chapter_1.html +//! [`Request`]: crate::http::Request +//! [`Responder`]: crate::http::server::Responder +//! [`Response`]: crate::http::Response +//! [`http_server`]: crate::http_server + +use super::{ + body::{BodyForthcoming, OutgoingBody}, + error::WasiHttpErrorCode, + fields::header_map_to_wasi, + Body, HeaderMap, Response, +}; +use crate::io::{copy, AsyncOutputStream}; +use http::header::CONTENT_LENGTH; +use wasi::exports::http::incoming_handler::ResponseOutparam; +use wasi::http::types::OutgoingResponse; + +/// This is passed into the [`http_server`] `main` function and holds the state +/// needed for a handler to produce a response, or fail. There are two ways to +/// respond, with [`Responder::start_response`] to stream the body in, or +/// [`Responder::respond`] to give the body as a string, byte array, or input +/// stream. See those functions for examples. +/// +/// [`http_server`]: crate::http_server +#[must_use] +pub struct Responder { + outparam: ResponseOutparam, +} + +impl Responder { + /// Start responding with the given `Response` and return an `OutgoingBody` + /// stream to write the body to. + /// + /// # Example + /// + /// ``` + /// # use wstd::http::{body::IncomingBody, BodyForthcoming, Response, Request}; + /// # use wstd::http::server::{Finished, Responder}; + /// # use crate::wstd::io::AsyncWrite; + /// # async fn example(responder: Responder) -> Finished { + /// let mut body = responder.start_response(Response::new(BodyForthcoming)); + /// let result = body + /// .write_all("Hello!\n".as_bytes()) + /// .await; + /// Finished::finish(body, result, None) + /// # } + /// ``` + pub fn start_response(self, response: Response) -> OutgoingBody { + let wasi_headers = header_map_to_wasi(response.headers()).expect("header error"); + let wasi_response = OutgoingResponse::new(wasi_headers); + let wasi_status = response.status().as_u16(); + + // Unwrap because `StatusCode` has already validated the status. + wasi_response.set_status_code(wasi_status).unwrap(); + + // Unwrap because we can be sure we only call these once. + let wasi_body = wasi_response.body().unwrap(); + let wasi_stream = wasi_body.write().unwrap(); + + // Tell WASI to start the show. + ResponseOutparam::set(self.outparam, Ok(wasi_response)); + + OutgoingBody::new(AsyncOutputStream::new(wasi_stream), wasi_body) + } + + /// Respond with the given `Response` which contains the body. + /// + /// If the body has a known length, a Content-Length header is automatically added. + /// + /// To respond with trailers, use [`Responder::start_response`] instead. + /// + /// # Example + /// + /// ``` + /// # use wstd::http::{body::IncomingBody, BodyForthcoming, IntoBody, Response, Request}; + /// # use wstd::http::server::{Finished, Responder}; + /// # + /// # async fn example(responder: Responder) -> Finished { + /// responder + /// .respond(Response::new("Hello!\n".into_body())) + /// .await + /// # } + /// ``` + pub async fn respond(self, response: Response) -> Finished { + let headers = response.headers(); + let status = response.status().as_u16(); + + let wasi_headers = header_map_to_wasi(headers).expect("header error"); + + // Consume the `response` and prepare to write the body. + let mut body = response.into_body(); + + // Automatically add a Content-Length header. + if let Some(len) = body.len() { + let mut buffer = itoa::Buffer::new(); + wasi_headers + .append(CONTENT_LENGTH.as_str(), buffer.format(len).as_bytes()) + .unwrap(); + } + + let wasi_response = OutgoingResponse::new(wasi_headers); + + // Unwrap because `StatusCode` has already validated the status. + wasi_response.set_status_code(status).unwrap(); + + // Unwrap because we can be sure we only call these once. + let wasi_body = wasi_response.body().unwrap(); + let wasi_stream = wasi_body.write().unwrap(); + + // Tell WASI to start the show. + ResponseOutparam::set(self.outparam, Ok(wasi_response)); + + let mut outgoing_body = OutgoingBody::new(AsyncOutputStream::new(wasi_stream), wasi_body); + + let result = copy(&mut body, &mut outgoing_body).await; + let trailers = None; + Finished::finish(outgoing_body, result, trailers) + } + + /// This is used by the `http_server` macro. + #[doc(hidden)] + pub fn new(outparam: ResponseOutparam) -> Self { + Self { outparam } + } + + /// This is used by the `http_server` macro. + #[doc(hidden)] + pub fn fail(self, err: WasiHttpErrorCode) -> Finished { + ResponseOutparam::set(self.outparam, Err(err)); + Finished(()) + } +} + +/// An opaque value returned from a handler indicating that the body is +/// finished, either by [`OutgoingBody::finish`] or [`OutgoingBody::fail`]. +pub struct Finished(pub(crate) ()); + +impl Finished { + /// Finish the body, optionally with trailers, and return a `Finished` + /// token to be returned from the [`http_server`] `main` function to indicate + /// that the response is finished. + /// + /// `result` is a `std::io::Result` for reporting any I/O errors that + /// occur while writing to the body stream. + /// + /// [`http_server`]: crate::http_server + pub fn finish( + body: OutgoingBody, + result: std::io::Result<()>, + trailers: Option, + ) -> Self { + let (stream, body) = body.consume(); + + // The stream is a child resource of the `OutgoingBody`, so ensure that + // it's dropped first. + drop(stream); + + // If there was an I/O error, panic and don't call `OutgoingBody::finish`. + result.expect("I/O error while writing the body"); + + let wasi_trailers = + trailers.map(|trailers| header_map_to_wasi(&trailers).expect("header error")); + + wasi::http::types::OutgoingBody::finish(body, wasi_trailers) + .expect("body length did not match Content-Length header value"); + + Self(()) + } + + /// Return a `Finished` token that can be returned from a handler to + /// indicate that the body is not finished and should be considered + /// corrupted. + pub fn fail(body: OutgoingBody) -> Self { + let (stream, _body) = body.consume(); + + // The stream is a child resource of the `OutgoingBody`, so ensure that + // it's dropped first. + drop(stream); + + // No need to do anything else; omitting the call to `finish` achieves + // the desired effect. + Self(()) + } +} diff --git a/src/lib.rs b/src/lib.rs index bf96675..fc7d3b0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -26,6 +26,12 @@ #![doc = include_str!("../tests/http_get.rs")] //! ``` //! +//! **HTTP Server** +//! +//! ```rust,no_run +#![doc = include_str!("../examples/http_server.rs")] +//! ``` +//! //! # Design Decisions //! //! This library is entirely self-contained. This means that it does not share @@ -49,6 +55,7 @@ //! is specific to that are exposed from here. pub mod future; +#[macro_use] pub mod http; pub mod io; pub mod iter; @@ -58,9 +65,14 @@ pub mod runtime; pub mod task; pub mod time; +pub use wstd_macro::attr_macro_http_server as http_server; pub use wstd_macro::attr_macro_main as main; pub use wstd_macro::attr_macro_test as test; +// Re-export the wasi crate for use by the `http_server` macro. +#[doc(hidden)] +pub use wasi; + pub mod prelude { pub use crate::future::FutureExt as _; pub use crate::http::Body as _; diff --git a/src/time/duration.rs b/src/time/duration.rs index 10d8103..700b244 100644 --- a/src/time/duration.rs +++ b/src/time/duration.rs @@ -41,6 +41,13 @@ impl Duration { std::time::Duration::from_micros(micros).into() } + /// Creates a new `Duration` from the specified number of nanoseconds. + #[must_use] + #[inline] + pub fn from_nanos(nanos: u64) -> Self { + std::time::Duration::from_nanos(nanos).into() + } + /// Creates a new `Duration` from the specified number of seconds represented /// as `f64`. /// @@ -70,6 +77,34 @@ impl Duration { pub fn from_secs_f32(secs: f32) -> Duration { std::time::Duration::from_secs_f32(secs).into() } + + /// Returns the number of whole seconds contained by this `Duration`. + #[must_use] + #[inline] + pub const fn as_secs(&self) -> u64 { + self.0 / 1_000_000_000 + } + + /// Returns the number of whole milliseconds contained by this `Duration`. + #[must_use] + #[inline] + pub const fn as_millis(&self) -> u128 { + (self.0 / 1_000_000) as u128 + } + + /// Returns the number of whole microseconds contained by this `Duration`. + #[must_use] + #[inline] + pub const fn as_micros(&self) -> u128 { + (self.0 / 1_000) as u128 + } + + /// Returns the total number of nanoseconds contained by this `Duration`. + #[must_use] + #[inline] + pub const fn as_nanos(&self) -> u128 { + self.0 as u128 + } } impl std::ops::Deref for Duration { @@ -140,3 +175,56 @@ impl IntoFuture for Duration { crate::task::sleep(self) } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_new_from_as() { + assert_eq!(Duration::new(456, 864209753).as_secs(), 456); + assert_eq!(Duration::new(456, 864209753).as_millis(), 456864); + assert_eq!(Duration::new(456, 864209753).as_micros(), 456864209); + assert_eq!(Duration::new(456, 864209753).as_nanos(), 456864209753); + + assert_eq!(Duration::from_secs(9876543210).as_secs(), 9876543210); + assert_eq!(Duration::from_secs(9876543210).as_millis(), 9876543210_000); + assert_eq!( + Duration::from_secs(9876543210).as_micros(), + 9876543210_000000 + ); + assert_eq!( + Duration::from_secs(9876543210).as_nanos(), + 9876543210_000000000 + ); + + assert_eq!(Duration::from_millis(9876543210).as_secs(), 9876543); + assert_eq!(Duration::from_millis(9876543210).as_millis(), 9876543210); + assert_eq!( + Duration::from_millis(9876543210).as_micros(), + 9876543210_000 + ); + assert_eq!( + Duration::from_millis(9876543210).as_nanos(), + 9876543210_000000 + ); + + assert_eq!(Duration::from_micros(9876543210).as_secs(), 9876); + assert_eq!(Duration::from_micros(9876543210).as_millis(), 9876543); + assert_eq!(Duration::from_micros(9876543210).as_micros(), 9876543210); + assert_eq!(Duration::from_micros(9876543210).as_nanos(), 9876543210_000); + + assert_eq!(Duration::from_nanos(9876543210).as_secs(), 9); + assert_eq!(Duration::from_nanos(9876543210).as_millis(), 9876); + assert_eq!(Duration::from_nanos(9876543210).as_micros(), 9876543); + assert_eq!(Duration::from_nanos(9876543210).as_nanos(), 9876543210); + } + + #[test] + fn test_from_secs_float() { + assert_eq!(Duration::from_secs_f64(158.9).as_secs(), 158); + assert_eq!(Duration::from_secs_f32(158.9).as_secs(), 158); + assert_eq!(Duration::from_secs_f64(159.1).as_secs(), 159); + assert_eq!(Duration::from_secs_f32(159.1).as_secs(), 159); + } +} diff --git a/src/time/instant.rs b/src/time/instant.rs index b9db4b9..02c87f7 100644 --- a/src/time/instant.rs +++ b/src/time/instant.rs @@ -30,7 +30,7 @@ impl Instant { /// Returns the amount of time elapsed from another instant to this one, or zero duration if /// that instant is later than this one. pub fn duration_since(&self, earlier: Instant) -> Duration { - Duration::from_micros(self.0.checked_sub(earlier.0).unwrap_or_default()) + Duration::from_nanos(self.0.checked_sub(earlier.0).unwrap_or_default()) } /// Returns the amount of time elapsed since this instant. @@ -90,3 +90,16 @@ impl IntoFuture for Instant { crate::task::sleep_until(self) } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_duration_since() { + let x = Instant::now(); + let d = Duration::new(456, 789); + let y = x + d; + assert_eq!(y.duration_since(x), d); + } +} diff --git a/test-programs/artifacts/Cargo.toml b/test-programs/artifacts/Cargo.toml index 94438a2..d35044a 100644 --- a/test-programs/artifacts/Cargo.toml +++ b/test-programs/artifacts/Cargo.toml @@ -11,6 +11,7 @@ publish = false anyhow.workspace = true test-log.workspace = true test-programs-artifacts.workspace = true +ureq.workspace = true wasmtime.workspace = true wasmtime-wasi.workspace = true wasmtime-wasi-http.workspace = true diff --git a/test-programs/artifacts/tests/http_server.rs b/test-programs/artifacts/tests/http_server.rs new file mode 100644 index 0000000..995298b --- /dev/null +++ b/test-programs/artifacts/tests/http_server.rs @@ -0,0 +1,73 @@ +use anyhow::Result; +use std::process::Command; + +#[test_log::test] +fn http_server() -> Result<()> { + use std::net::TcpStream; + use std::thread::sleep; + use std::time::Duration; + + // Run wasmtime serve. + // Enable -Scli because we currently don't have a way to build with the + // proxy adapter, so we build with the default adapter. + let mut wasmtime_process = Command::new("wasmtime") + .arg("serve") + .arg("-Scli") + .arg("--addr=127.0.0.1:8081") + .arg(test_programs_artifacts::HTTP_SERVER) + .spawn()?; + + // Clumsily wait for the server to accept connections. + 'wait: loop { + sleep(Duration::from_millis(100)); + if TcpStream::connect("127.0.0.1:8081").is_ok() { + break 'wait; + } + } + + // Do some tests! + + let body: String = ureq::get("http://127.0.0.1:8081").call()?.into_string()?; + assert_eq!(body, "Hello, wasi:http/proxy world!\n"); + + match ureq::get("http://127.0.0.1:8081/fail").call() { + Ok(body) => { + unreachable!("unexpected success from /fail: {:?}", body); + } + Err(ureq::Error::Transport(_transport)) => {} + Err(other) => { + unreachable!("unexpected error: {:?}", other); + } + } + + const MESSAGE: &[u8] = b"hello, echoserver!\n"; + + let body: String = ureq::get("http://127.0.0.1:8081/echo") + .send(MESSAGE)? + .into_string()?; + assert_eq!(body.as_bytes(), MESSAGE); + + let test_headers = [ + ("Red", "Rhubarb"), + ("Orange", "Carrots"), + ("Yellow", "Bananas"), + ("Green", "Broccoli"), + ("Blue", "Blueberries"), + ("Purple", "Beets"), + ]; + + let mut response = ureq::get("http://127.0.0.1:8081/echo-headers"); + for (name, value) in test_headers { + response = response.set(name, value); + } + let response = response.call()?; + + assert!(response.headers_names().len() >= test_headers.len()); + for (name, value) in test_headers { + assert_eq!(response.header(name), Some(value)); + } + + wasmtime_process.kill()?; + + Ok(()) +} diff --git a/test-programs/src/bin/http_server.rs b/test-programs/src/bin/http_server.rs new file mode 100644 index 0000000..e9fea26 --- /dev/null +++ b/test-programs/src/bin/http_server.rs @@ -0,0 +1 @@ +include!("../../../examples/http_server.rs");