Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ categories.workspace = true
[dependencies]
futures-core.workspace = true
http.workspace = true
itoa.workspace = true
pin-project-lite.workspace = true
slab.workspace = true
wasi.workspace = true
Expand Down Expand Up @@ -52,6 +53,7 @@ futures-core = "0.3.19"
futures-lite = "1.12.0"
heck = "0.5"
http = "1.1"
itoa = "1"
pin-project-lite = "0.2.8"
quote = "1.0"
serde_json = "1"
Expand All @@ -60,7 +62,8 @@ syn = "2.0"
test-log = { version = "0.2", features = ["trace"] }
test-programs = { path = "test-programs" }
test-programs-artifacts = { path = "test-programs/artifacts" }
wasi = "0.13.1"
ureq = { version = "2.12.1", default-features = false }
wasi = "0.14.0"
wasmtime = "26"
wasmtime-wasi = "26"
wasmtime-wasi-http = "26"
Expand Down
94 changes: 94 additions & 0 deletions examples/http_server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
use wstd::http::body::{BodyForthcoming, IncomingBody, OutgoingBody};
use wstd::http::server::{Finished, Responder};
use wstd::http::{IntoBody, Request, Response, StatusCode};
use wstd::io::{copy, empty, AsyncWrite};
use wstd::time::{Duration, Instant};

#[wstd::http_server]
async fn main(request: Request<IncomingBody>, responder: Responder) -> Finished {
match request.uri().path_and_query().unwrap().as_str() {
"/wait" => http_wait(request, responder).await,
"/echo" => http_echo(request, responder).await,
"/echo-headers" => http_echo_headers(request, responder).await,
"/echo-trailers" => http_echo_trailers(request, responder).await,
"/fail" => http_fail(request, responder).await,
"/bigfail" => http_bigfail(request, responder).await,
"/" => http_home(request, responder).await,
_ => http_not_found(request, responder).await,
}
}

async fn http_home(_request: Request<IncomingBody>, responder: Responder) -> Finished {
// To send a single string as the response body, use `Responder::respond`.
responder
.respond(Response::new("Hello, wasi:http/proxy world!\n".into_body()))
.await
}

async fn http_wait(_request: Request<IncomingBody>, responder: Responder) -> Finished {
// Get the time now
let now = Instant::now();

// Sleep for one second.
wstd::task::sleep(Duration::from_secs(1)).await;

// Compute how long we slept for.
let elapsed = Instant::now().duration_since(now).as_millis();

// To stream data to the response body, use `Responder::start_response`.
let mut body = responder.start_response(Response::new(BodyForthcoming));
let result = body
.write_all(format!("slept for {elapsed} millis\n").as_bytes())
.await;
Finished::finish(body, result, None)
}

async fn http_echo(mut request: Request<IncomingBody>, responder: Responder) -> Finished {
// Stream data from the request body to the response body.
let mut body = responder.start_response(Response::new(BodyForthcoming));
let result = copy(request.body_mut(), &mut body).await;
Finished::finish(body, result, None)
}

async fn http_fail(_request: Request<IncomingBody>, responder: Responder) -> Finished {
let body = responder.start_response(Response::new(BodyForthcoming));
Finished::fail(body)
}

async fn http_bigfail(_request: Request<IncomingBody>, responder: Responder) -> Finished {
async fn write_body(body: &mut OutgoingBody) -> wstd::io::Result<()> {
for _ in 0..0x10 {
body.write_all("big big big big\n".as_bytes()).await?;
}
body.flush().await?;
Ok(())
}

let mut body = responder.start_response(Response::new(BodyForthcoming));
let _ = write_body(&mut body).await;
Finished::fail(body)
}

async fn http_echo_headers(request: Request<IncomingBody>, responder: Responder) -> Finished {
let mut response = Response::builder();
*response.headers_mut().unwrap() = request.into_parts().0.headers;
let response = response.body(empty()).unwrap();
responder.respond(response).await
}

async fn http_echo_trailers(request: Request<IncomingBody>, responder: Responder) -> Finished {
let body = responder.start_response(Response::new(BodyForthcoming));
let (trailers, result) = match request.into_body().finish().await {
Ok(trailers) => (trailers, Ok(())),
Err(err) => (Default::default(), Err(std::io::Error::other(err))),
};
Finished::finish(body, result, trailers)
}

async fn http_not_found(_request: Request<IncomingBody>, responder: Responder) -> Finished {
let response = Response::builder()
.status(StatusCode::NOT_FOUND)
.body(empty())
.unwrap();
responder.respond(response).await
}
100 changes: 100 additions & 0 deletions macro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,103 @@ pub fn attr_macro_test(_attr: TokenStream, item: TokenStream) -> TokenStream {
}
.into()
}

/// Enables a HTTP server main function, for creating [HTTP servers].
///
/// [HTTP servers]: https://docs.rs/wstd/latest/wstd/http/server/index.html
///
/// # Examples
///
/// ```ignore
/// #[wstd::http_server]
/// async fn main(request: Request<IncomingBody>, responder: Responder) -> Finished {
/// responder
/// .respond(Response::new("Hello!\n".into_body()))
/// .await
/// }
/// ```
#[proc_macro_attribute]
pub fn attr_macro_http_server(_attr: TokenStream, item: TokenStream) -> TokenStream {
let input = parse_macro_input!(item as ItemFn);

if input.sig.asyncness.is_none() {
return quote_spanned! { input.sig.fn_token.span()=>
compile_error!("fn must be `async fn`");
}
.into();
}

let output = &input.sig.output;
let inputs = &input.sig.inputs;
let name = &input.sig.ident;
let body = &input.block;
let attrs = &input.attrs;
let vis = &input.vis;

if name != "main" {
return quote_spanned! { input.sig.ident.span()=>
compile_error!("only `async fn main` can be used for #[wstd::http_server]");
}
.into();
}

quote! {
struct TheServer;

impl ::wstd::wasi::exports::http::incoming_handler::Guest for TheServer {
fn handle(
request: ::wstd::wasi::http::types::IncomingRequest,
response_out: ::wstd::wasi::http::types::ResponseOutparam
) {
#(#attrs)*
#vis async fn __run(#inputs) #output {
#body
}

let responder = ::wstd::http::server::Responder::new(response_out);
let _finished: ::wstd::http::server::Finished =
match ::wstd::http::try_from_incoming_request(request)
{
Ok(request) => ::wstd::runtime::block_on(async { __run(request, responder).await }),
Err(err) => responder.fail(err),
};
}
}

::wstd::wasi::http::proxy::export!(TheServer with_types_in ::wstd::wasi);

// Provide an actual function named `main`.
//
// WASI HTTP server components don't use a traditional `main` function.
// They export a function named `handle` which takes a `Request`
// argument, and which may be called multiple times on the same
// instance. To let users write a familiar `fn main` in a file
// named src/main.rs, we provide this `wstd::http_server` macro, which
// transforms the user's `fn main` into the appropriate `handle`
// function.
//
// However, when the top-level file is named src/main.rs, rustc
// requires there to be a function named `main` somewhere in it. This
// requirement can be disabled using `#![no_main]`, however we can't
// use that automatically because macros can't contain inner
// attributes, and we don't want to require users to add `#![no_main]`
// in their own code.
//
// So, we include a definition of a function named `main` here, which
// isn't intended to ever be called, and exists just to satify the
// requirement for a `main` function.
//
// Users could use `#![no_main]` if they want to. Or, they could name
// their top-level file src/lib.rs and add
// ```toml
// [lib]
// crate-type = ["cdylib"]
// ```
// to their Cargo.toml. With either of these, this "main" function will
// be ignored as dead code.
fn main() {
unreachable!("HTTP server components should be run with `handle` rather than `run`")
}
}
.into()
}
106 changes: 102 additions & 4 deletions src/http/body.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
//! HTTP body types

use crate::io::{AsyncInputStream, AsyncRead, Cursor, Empty};
use crate::http::fields::header_map_from_wasi;
use crate::io::{AsyncInputStream, AsyncOutputStream, AsyncRead, AsyncWrite, Cursor, Empty};
use crate::runtime::AsyncPollable;
use core::fmt;
use http::header::{CONTENT_LENGTH, TRANSFER_ENCODING};
use wasi::http::types::IncomingBody as WasiIncomingBody;
Expand Down Expand Up @@ -116,9 +118,9 @@ impl Body for Empty {
pub struct IncomingBody {
kind: BodyKind,
// IMPORTANT: the order of these fields here matters. `body_stream` must
// be dropped before `_incoming_body`.
// be dropped before `incoming_body`.
body_stream: AsyncInputStream,
_incoming_body: WasiIncomingBody,
incoming_body: WasiIncomingBody,
}

impl IncomingBody {
Expand All @@ -130,9 +132,29 @@ impl IncomingBody {
Self {
kind,
body_stream,
_incoming_body: incoming_body,
incoming_body,
}
}

/// Consume this `IncomingBody` and return the trailers, if present.
pub async fn finish(self) -> Result<Option<HeaderMap>, Error> {
// The stream is a child resource of the `IncomingBody`, so ensure that
// it's dropped first.
drop(self.body_stream);

let trailers = WasiIncomingBody::finish(self.incoming_body);

AsyncPollable::new(trailers.subscribe()).wait_for().await;

let trailers = trailers.get().unwrap().unwrap()?;

let trailers = match trailers {
None => None,
Some(trailers) => Some(header_map_from_wasi(trailers)?),
};

Ok(trailers)
}
}

impl AsyncRead for IncomingBody {
Expand Down Expand Up @@ -177,3 +199,79 @@ impl From<InvalidContentLength> for Error {
ErrorVariant::Other(e.to_string()).into()
}
}

/// The output stream for the body, implementing [`AsyncWrite`]. Call
/// [`Responder::start_response`] to obtain one. Once the body is complete,
/// it must be declared finished, using [`OutgoingBody::finish`].
#[must_use]
pub struct OutgoingBody {
// IMPORTANT: the order of these fields here matters. `stream` must
// be dropped before `body`.
stream: AsyncOutputStream,
body: wasi::http::types::OutgoingBody,
dontdrop: DontDropOutgoingBody,
}

impl OutgoingBody {
pub(crate) fn new(stream: AsyncOutputStream, body: wasi::http::types::OutgoingBody) -> Self {
Self {
stream,
body,
dontdrop: DontDropOutgoingBody,
}
}

pub(crate) fn consume(self) -> (AsyncOutputStream, wasi::http::types::OutgoingBody) {
let Self {
stream,
body,
dontdrop,
} = self;

std::mem::forget(dontdrop);

(stream, body)
}

/// Return a reference to the underlying `AsyncOutputStream`.
///
/// This usually isn't needed, as `OutgoingBody` implements `AsyncWrite`
/// too, however it is useful for code that expects to work with
/// `AsyncOutputStream` specifically.
pub fn stream(&mut self) -> &mut AsyncOutputStream {
&mut self.stream
}
}

impl AsyncWrite for OutgoingBody {
async fn write(&mut self, buf: &[u8]) -> crate::io::Result<usize> {
self.stream.write(buf).await
}

async fn flush(&mut self) -> crate::io::Result<()> {
self.stream.flush().await
}

fn as_async_output_stream(&self) -> Option<&AsyncOutputStream> {
Some(&self.stream)
}
}

/// A utility to ensure that `OutgoingBody` is either finished or failed, and
/// not implicitly dropped.
struct DontDropOutgoingBody;

impl Drop for DontDropOutgoingBody {
fn drop(&mut self) {
unreachable!("`OutgoingBody::drop` called; `OutgoingBody`s should be consumed with `finish` or `fail`.");
}
}

/// A placeholder for use as the type parameter to [`Response`] to indicate
/// that the body has not yet started. This is used with
/// [`Responder::start_response`], which has a `Response<BodyForthcoming>`
/// argument.
///
/// To instead start the response and obtain the output stream for the body,
/// use [`Responder::respond`].
pub struct BodyForthcoming;
2 changes: 2 additions & 0 deletions src/http/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ impl Client {

/// Send an HTTP request.
pub async fn send<B: Body>(&self, req: Request<B>) -> Result<Response<IncomingBody>> {
// We don't use `body::OutputBody` here because we can report I/O
// errors from the `copy` directly.
let (wasi_req, body) = try_into_outgoing(req)?;
let wasi_body = wasi_req.body().unwrap();
let body_stream = wasi_body.write().unwrap();
Expand Down
10 changes: 7 additions & 3 deletions src/http/error.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::http::fields::ToWasiHeaderError;
use std::fmt;

/// The `http` result type.
Expand Down Expand Up @@ -78,9 +79,12 @@ impl From<WasiHttpErrorCode> for Error {
}
}

impl From<WasiHttpHeaderError> for Error {
fn from(e: WasiHttpHeaderError) -> Error {
ErrorVariant::WasiHeader(e).into()
impl From<ToWasiHeaderError> for Error {
fn from(error: ToWasiHeaderError) -> Error {
Error {
variant: ErrorVariant::WasiHeader(error.error),
context: vec![error.context],
}
}
}

Expand Down
Loading
Loading