diff --git a/Cargo.toml b/Cargo.toml index 8019adc..534d287 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,7 +23,9 @@ wstd-macro.workspace = true [dev-dependencies] anyhow.workspace = true +clap.workspace = true futures-lite.workspace = true +humantime.workspace = true serde_json.workspace = true [workspace] @@ -44,13 +46,16 @@ categories = [] authors = [ "Yoshua Wuyts ", "Pat Hickey ", + "Dan Gohman ", ] [workspace.dependencies] anyhow = "1" cargo_metadata = "0.18.1" +clap = { version = "4.5.26", features = ["derive"] } futures-core = "0.3.19" futures-lite = "1.12.0" +humantime = "2.1.0" heck = "0.5" http = "1.1" itoa = "1" diff --git a/examples/complex_http_client.rs b/examples/complex_http_client.rs new file mode 100644 index 0000000..8a7b2b6 --- /dev/null +++ b/examples/complex_http_client.rs @@ -0,0 +1,139 @@ +use anyhow::{anyhow, Result}; +use clap::{ArgAction, Parser}; +use std::str::FromStr; +use wstd::http::{ + body::BodyForthcoming, Client, HeaderMap, HeaderName, HeaderValue, Method, Request, Uri, +}; + +/// Complex HTTP client +/// +/// A somewhat more complex command-line HTTP client, implemented using +/// `wstd`, using WASI. +#[derive(Parser, Debug)] +#[command(version, about)] +struct Args { + /// The URL to request + url: Uri, + + /// Forward stdin to the request body + #[arg(long)] + body: bool, + + /// Add a header to the request + #[arg(long = "header", action = ArgAction::Append, value_name = "HEADER")] + headers: Vec, + + /// Add a trailer to the request + #[arg(long = "trailer", action = ArgAction::Append, value_name = "TRAILER")] + trailers: Vec, + + /// Method of the request + #[arg(long, default_value = "GET")] + method: Method, + + /// Set the connect timeout + #[arg(long, value_name = "DURATION")] + connect_timeout: Option, + + /// Set the first-byte timeout + #[arg(long, value_name = "DURATION")] + first_byte_timeout: Option, + + /// Set the between-bytes timeout + #[arg(long, value_name = "DURATION")] + between_bytes_timeout: Option, +} + +#[wstd::main] +async fn main() -> Result<()> { + let args = Args::parse(); + + // Create and configure the `Client` + + let mut client = Client::new(); + + if let Some(connect_timeout) = args.connect_timeout { + client.set_connect_timeout(*connect_timeout); + } + if let Some(first_byte_timeout) = args.first_byte_timeout { + client.set_first_byte_timeout(*first_byte_timeout); + } + if let Some(between_bytes_timeout) = args.between_bytes_timeout { + client.set_between_bytes_timeout(*between_bytes_timeout); + } + + // Create and configure the request. + + let mut request = Request::builder(); + + request = request.uri(args.url).method(args.method); + + for header in args.headers { + let mut parts = header.splitn(2, ": "); + let key = parts.next().unwrap(); + let value = parts + .next() + .ok_or_else(|| anyhow!("headers must be formatted like \"key: value\""))?; + request = request.header(key, value); + } + let mut trailers = HeaderMap::new(); + for trailer in args.trailers { + let mut parts = trailer.splitn(2, ": "); + let key = parts.next().unwrap(); + let value = parts + .next() + .ok_or_else(|| anyhow!("trailers must be formatted like \"key: value\""))?; + trailers.insert(HeaderName::from_str(key)?, HeaderValue::from_str(value)?); + } + + // Send the request. + + let request = request.body(BodyForthcoming)?; + + eprintln!("> {} / {:?}", request.method(), request.version()); + for (key, value) in request.headers().iter() { + let value = String::from_utf8_lossy(value.as_bytes()); + eprintln!("> {key}: {value}"); + } + + let (mut outgoing_body, response) = client.start_request(request).await?; + + if args.body { + wstd::io::copy(wstd::io::stdin(), &mut outgoing_body).await?; + } else { + wstd::io::copy(wstd::io::empty(), &mut outgoing_body).await?; + } + + if !trailers.is_empty() { + eprintln!("..."); + } + for (key, value) in trailers.iter() { + let value = String::from_utf8_lossy(value.as_bytes()); + eprintln!("> {key}: {value}"); + } + + Client::finish(outgoing_body, Some(trailers))?; + + let response = response.await?; + + // Print the response. + + eprintln!("< {:?} {}", response.version(), response.status()); + for (key, value) in response.headers().iter() { + let value = String::from_utf8_lossy(value.as_bytes()); + eprintln!("< {key}: {value}"); + } + + let mut body = response.into_body(); + wstd::io::copy(&mut body, wstd::io::stdout()).await?; + + let trailers = body.finish().await?; + if let Some(trailers) = trailers { + for (key, value) in trailers.iter() { + let value = String::from_utf8_lossy(value.as_bytes()); + eprintln!("< {key}: {value}"); + } + } + + Ok(()) +} diff --git a/examples/http_client.rs b/examples/http_client.rs new file mode 100644 index 0000000..12bc685 --- /dev/null +++ b/examples/http_client.rs @@ -0,0 +1,119 @@ +use anyhow::{anyhow, Result}; +use clap::{ArgAction, Parser}; +use wstd::http::{ + body::{IncomingBody, StreamedBody}, + request::Builder, + Body, Client, Method, Request, Response, Uri, +}; + +/// Simple HTTP client +/// +/// A simple command-line HTTP client, implemented using `wstd`, using WASI. +#[derive(Parser, Debug)] +#[command(version, about)] +struct Args { + /// The URL to request + url: Uri, + + /// Forward stdin to the request body + #[arg(long)] + body: bool, + + /// Add a header to the request + #[arg(long = "header", action = ArgAction::Append, value_name = "HEADER")] + headers: Vec, + + /// Method of the request + #[arg(long, default_value = "GET")] + method: Method, + + /// Set the connect timeout + #[arg(long, value_name = "DURATION")] + connect_timeout: Option, + + /// Set the first-byte timeout + #[arg(long, value_name = "DURATION")] + first_byte_timeout: Option, + + /// Set the between-bytes timeout + #[arg(long, value_name = "DURATION")] + between_bytes_timeout: Option, +} + +#[wstd::main] +async fn main() -> Result<()> { + let args = Args::parse(); + + // Create and configure the `Client` + + let mut client = Client::new(); + + if let Some(connect_timeout) = args.connect_timeout { + client.set_connect_timeout(*connect_timeout); + } + if let Some(first_byte_timeout) = args.first_byte_timeout { + client.set_first_byte_timeout(*first_byte_timeout); + } + if let Some(between_bytes_timeout) = args.between_bytes_timeout { + client.set_between_bytes_timeout(*between_bytes_timeout); + } + + // Create and configure the request. + + let mut request = Request::builder(); + + request = request.uri(args.url).method(args.method); + + for header in args.headers { + let mut parts = header.splitn(2, ": "); + let key = parts.next().unwrap(); + let value = parts + .next() + .ok_or_else(|| anyhow!("headers must be formatted like \"key: value\""))?; + request = request.header(key, value); + } + + // Send the request. + + async fn send_request( + client: &Client, + request: Builder, + body: B, + ) -> Result> { + let request = request.body(body)?; + + eprintln!("> {} / {:?}", request.method(), request.version()); + for (key, value) in request.headers().iter() { + let value = String::from_utf8_lossy(value.as_bytes()); + eprintln!("> {key}: {value}"); + } + + Ok(client.send(request).await?) + } + let response = if args.body { + send_request(&client, request, StreamedBody::new(wstd::io::stdin())).await + } else { + send_request(&client, request, wstd::io::empty()).await + }?; + + // Print the response. + + eprintln!("< {:?} {}", response.version(), response.status()); + for (key, value) in response.headers().iter() { + let value = String::from_utf8_lossy(value.as_bytes()); + eprintln!("< {key}: {value}"); + } + + let mut body = response.into_body(); + wstd::io::copy(&mut body, wstd::io::stdout()).await?; + + let trailers = body.finish().await?; + if let Some(trailers) = trailers { + for (key, value) in trailers.iter() { + let value = String::from_utf8_lossy(value.as_bytes()); + eprintln!("< {key}: {value}"); + } + } + + Ok(()) +} diff --git a/src/http/body.rs b/src/http/body.rs index a301d13..0373357 100644 --- a/src/http/body.rs +++ b/src/http/body.rs @@ -107,6 +107,27 @@ impl> Body for BoundedBody { } } +/// An HTTP body with an unknown length +#[derive(Debug)] +pub struct StreamedBody(S); + +impl StreamedBody { + /// Wrap an `AsyncRead` impl in a type that provides a [`Body`] implementation. + pub fn new(s: S) -> Self { + Self(s) + } +} +impl AsyncRead for StreamedBody { + async fn read(&mut self, buf: &mut [u8]) -> crate::io::Result { + self.0.read(buf).await + } +} +impl Body for StreamedBody { + fn len(&self) -> Option { + None + } +} + impl Body for Empty { fn len(&self) -> Option { Some(0) diff --git a/src/http/client.rs b/src/http/client.rs index b9107d9..1569d77 100644 --- a/src/http/client.rs +++ b/src/http/client.rs @@ -1,9 +1,21 @@ -use super::{body::IncomingBody, Body, Error, Request, Response, Result}; +use super::{ + body::{BodyForthcoming, IncomingBody, OutgoingBody}, + fields::header_map_to_wasi, + Body, Error, HeaderMap, Request, Response, Result, +}; use crate::http::request::try_into_outgoing; use crate::http::response::try_from_incoming; use crate::io::{self, AsyncOutputStream, AsyncPollable}; +use crate::runtime::WaitFor; use crate::time::Duration; -use wasi::http::types::{OutgoingBody, RequestOptions as WasiRequestOptions}; +use pin_project_lite::pin_project; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; +use wasi::http::types::{ + FutureIncomingResponse as WasiFutureIncomingResponse, OutgoingBody as WasiOutgoingBody, + RequestOptions as WasiRequestOptions, +}; /// An HTTP client. // Empty for now, but permits adding support for RequestOptions soon: @@ -19,22 +31,27 @@ impl Client { } /// Send an HTTP request. + /// + /// TODO: Should this automatically add a "Content-Length" header if the + /// body size is known? + /// + /// To respond with trailers, use [`Client::start_request`] instead. pub async fn send(&self, req: Request) -> Result> { // We don't use `body::OutputBody` here because we can report I/O // errors from the `copy` directly. let (wasi_req, body) = try_into_outgoing(req)?; let wasi_body = wasi_req.body().unwrap(); - let body_stream = wasi_body.write().unwrap(); + let wasi_stream = wasi_body.write().unwrap(); // 1. Start sending the request head let res = wasi::http::outgoing_handler::handle(wasi_req, self.wasi_options()?).unwrap(); // 2. Start sending the request body - io::copy(body, AsyncOutputStream::new(body_stream)).await?; + io::copy(body, AsyncOutputStream::new(wasi_stream)).await?; // 3. Finish sending the request body let trailers = None; - OutgoingBody::finish(wasi_body, trailers).unwrap(); + WasiOutgoingBody::finish(wasi_body, trailers).unwrap(); // 4. Receive the response AsyncPollable::new(res.subscribe()).wait_for().await; @@ -46,6 +63,91 @@ impl Client { try_from_incoming(res) } + /// Start sending an HTTP request, and return an `OutgoingBody` stream to + /// write the body to. + /// + /// The returned `OutgoingBody` must be consumed by [`Client::finish`] or + /// [`Client::fail`]. + pub async fn start_request( + &self, + req: Request, + ) -> Result<( + OutgoingBody, + impl Future>>, + )> { + let (wasi_req, _body_forthcoming) = try_into_outgoing(req)?; + let wasi_body = wasi_req.body().unwrap(); + let wasi_stream = wasi_body.write().unwrap(); + + // Start sending the request head. + let res = wasi::http::outgoing_handler::handle(wasi_req, self.wasi_options()?).unwrap(); + + let outgoing_body = OutgoingBody::new(AsyncOutputStream::new(wasi_stream), wasi_body); + + pin_project! { + #[must_use = "futures do nothing unless polled or .awaited"] + struct IncomingResponseFuture { + #[pin] + subscription: WaitFor, + wasi: WasiFutureIncomingResponse, + } + } + impl Future for IncomingResponseFuture { + type Output = Result>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + match this.subscription.poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(()) => Poll::Ready( + this.wasi + .get() + .unwrap() + .unwrap() + .map_err(Error::from) + .and_then(try_from_incoming), + ), + } + } + } + + let subscription = AsyncPollable::new(res.subscribe()).wait_for(); + let future = IncomingResponseFuture { + subscription, + wasi: res, + }; + + Ok((outgoing_body, future)) + } + + /// Finish the body, optionally with trailers. + /// + /// This is used with [`Client::start_request`]. + pub fn finish(body: OutgoingBody, trailers: Option) -> Result<()> { + let (stream, body) = body.consume(); + + // The stream is a child resource of the `OutgoingBody`, so ensure that + // it's dropped first. + drop(stream); + + let wasi_trailers = match trailers { + Some(trailers) => Some(header_map_to_wasi(&trailers)?), + None => None, + }; + + wasi::http::types::OutgoingBody::finish(body, wasi_trailers) + .expect("body length did not match Content-Length header value"); + Ok(()) + } + + /// Consume the `OutgoingBody` and indicate that the body was not + /// completed. + /// + /// This is used with [`Client::start_request`]. + pub fn fail(body: OutgoingBody) { + let (_stream, _body) = body.consume(); + } + /// Set timeout on connecting to HTTP server pub fn set_connect_timeout(&mut self, d: impl Into) { self.options_mut().connect_timeout = Some(d.into());