11#![ deny( missing_docs) ]
2- // TODO: Switch to interior mutability (e.g. use Mutexes or thread-local
3- // RefCells) and remove this, since even in single-threaded mode `static mut`
4- // references can be a hazard due to recursive access.
5- #![ allow( static_mut_refs) ]
62
73extern crate std;
84use core:: sync:: atomic:: { AtomicBool , Ordering } ;
@@ -15,11 +11,6 @@ use std::pin::Pin;
1511use std:: ptr;
1612use std:: sync:: Arc ;
1713use std:: task:: { Context , Poll , Wake , Waker } ;
18- use std:: vec:: Vec ;
19-
20- use futures:: channel:: oneshot;
21- use futures:: future:: FutureExt ;
22- use futures:: stream:: { FuturesUnordered , StreamExt } ;
2314
2415macro_rules! rtdebug {
2516 ( $( $f: tt) * ) => {
@@ -50,18 +41,25 @@ pub use stream_support::*;
5041#[ doc( hidden) ]
5142pub use subtask:: Subtask ;
5243
53- pub use futures ;
44+ type BoxFuture < ' a > = Pin < Box < dyn Future < Output = ( ) > + ' a > > ;
5445
55- type BoxFuture = Pin < Box < dyn Future < Output = ( ) > + ' static > > ;
46+ #[ cfg( feature = "async-spawn" ) ]
47+ mod spawn;
48+ #[ cfg( feature = "async-spawn" ) ]
49+ pub use spawn:: spawn;
50+ #[ cfg( not( feature = "async-spawn" ) ) ]
51+ mod spawn_disabled;
52+ #[ cfg( not( feature = "async-spawn" ) ) ]
53+ use spawn_disabled as spawn;
5654
5755/// Represents a task created by either a call to an async-lifted export or a
5856/// future run using `block_on` or `start_task`.
59- struct FutureState {
57+ struct FutureState < ' a > {
6058 /// Remaining work to do (if any) before this task can be considered "done".
6159 ///
6260 /// Note that we won't tell the host the task is done until this is drained
6361 /// and `waitables` is empty.
64- tasks : FuturesUnordered < BoxFuture > ,
62+ tasks : spawn :: Tasks < ' a > ,
6563
6664 /// The waitable set containing waitables created by this task, if any.
6765 waitable_set : Option < WaitableSet > ,
@@ -88,13 +86,13 @@ struct FutureState {
8886 waker_clone : Waker ,
8987}
9088
91- impl FutureState {
92- fn new ( future : BoxFuture ) -> FutureState {
89+ impl FutureState < ' _ > {
90+ fn new ( future : BoxFuture < ' _ > ) -> FutureState < ' _ > {
9391 let waker = Arc :: new ( FutureWaker :: default ( ) ) ;
9492 FutureState {
9593 waker_clone : waker. clone ( ) . into ( ) ,
9694 waker,
97- tasks : [ future ] . into_iter ( ) . collect ( ) ,
95+ tasks : spawn :: Tasks :: new ( future ) ,
9896 waitable_set : None ,
9997 waitables : BTreeMap :: new ( ) ,
10098 wasip3_task : cabi:: wasip3_task {
@@ -176,31 +174,32 @@ impl FutureState {
176174 // notification, if any.
177175 me. waker . 0 . store ( false , Ordering :: Relaxed ) ;
178176
179- // Poll our future, handling `SPAWNED` around this.
180- let poll;
181- unsafe {
182- poll = me. tasks . poll_next_unpin ( & mut context) ;
183- if !SPAWNED . is_empty ( ) {
184- me. tasks . extend ( SPAWNED . drain ( ..) ) ;
185- }
186- }
177+ // Poll our future, seeing if it was able to make progress.
178+ let poll = me. tasks . poll_next ( & mut context) ;
187179
188180 match poll {
189181 // A future completed, yay! Keep going to see if more have
190182 // completed.
191183 Poll :: Ready ( Some ( ( ) ) ) => ( ) ,
192184
193- // The `FuturesUnordered` list is empty meaning that there's no
194- // more work left to do, so we're done.
185+ // The task list is empty, but there might be remaining work
186+ // in terms of waitables through the cabi interface. In this
187+ // situation wait for all waitables to be resolved before
188+ // signaling that our own task is done.
195189 Poll :: Ready ( None ) => {
196- assert ! ( !me. remaining_work( ) ) ;
197190 assert ! ( me. tasks. is_empty( ) ) ;
198- break ( CALLBACK_CODE_EXIT , true ) ;
191+ if me. remaining_work ( ) {
192+ let waitable = me. waitable_set . as_ref ( ) . unwrap ( ) . as_raw ( ) ;
193+ break ( CALLBACK_CODE_WAIT | ( waitable << 4 ) , false ) ;
194+ } else {
195+ break ( CALLBACK_CODE_EXIT , true ) ;
196+ }
199197 }
200198
201- // Some future within `FuturesUnordered` is not ready yet. If
202- // our `waker` was signaled then that means this is a yield
203- // operation, otherwise it means we're blocking on something.
199+ // Some future within `self.tasks` is not ready yet. If our
200+ // `waker` was signaled then that means this is a yield
201+ // operation, otherwise it means we're blocking on
202+ // something.
204203 Poll :: Pending => {
205204 assert ! ( !me. tasks. is_empty( ) ) ;
206205 if me. waker . 0 . load ( Ordering :: Relaxed ) {
@@ -229,7 +228,7 @@ impl FutureState {
229228 }
230229 }
231230 }
232- let self_raw = self as * mut FutureState ;
231+ let self_raw = self as * mut FutureState < ' _ > ;
233232 self . wasip3_task . ptr = self_raw. cast ( ) ;
234233 let prev = unsafe { cabi:: wasip3_task_set ( & mut self . wasip3_task ) } ;
235234 let _reset = ResetTask ( prev) ;
@@ -238,7 +237,7 @@ impl FutureState {
238237 }
239238}
240239
241- impl Drop for FutureState {
240+ impl Drop for FutureState < ' _ > {
242241 fn drop ( & mut self ) {
243242 // If this state has active tasks then they need to be dropped which may
244243 // execute arbitrary code. This arbitrary code might require the p3 APIs
@@ -259,7 +258,7 @@ unsafe extern "C" fn waitable_register(
259258 callback : unsafe extern "C" fn ( * mut c_void , u32 ) ,
260259 callback_ptr : * mut c_void ,
261260) -> * mut c_void {
262- let ptr = ptr. cast :: < FutureState > ( ) ;
261+ let ptr = ptr. cast :: < FutureState < ' static > > ( ) ;
263262 assert ! ( !ptr. is_null( ) ) ;
264263 ( * ptr) . add_waitable ( waitable) ;
265264 match ( * ptr) . waitables . insert ( waitable, ( callback_ptr, callback) ) {
@@ -269,7 +268,7 @@ unsafe extern "C" fn waitable_register(
269268}
270269
271270unsafe extern "C" fn waitable_unregister ( ptr : * mut c_void , waitable : u32 ) -> * mut c_void {
272- let ptr = ptr. cast :: < FutureState > ( ) ;
271+ let ptr = ptr. cast :: < FutureState < ' static > > ( ) ;
273272 assert ! ( !ptr. is_null( ) ) ;
274273 ( * ptr) . remove_waitable ( waitable) ;
275274 match ( * ptr) . waitables . remove ( & waitable) {
@@ -291,10 +290,6 @@ impl Wake for FutureWaker {
291290 }
292291}
293292
294- /// Any newly-deferred work queued by calls to the `spawn` function while
295- /// polling the current task.
296- static mut SPAWNED : Vec < BoxFuture > = Vec :: new ( ) ;
297-
298293const EVENT_NONE : u32 = 0 ;
299294const EVENT_SUBTASK : u32 = 1 ;
300295const EVENT_STREAM_READ : u32 = 2 ;
@@ -386,7 +381,7 @@ pub unsafe fn callback(event0: u32, event1: u32, event2: u32) -> u32 {
386381 // Acquire our context-local state, assert it's not-null, and then reset
387382 // the state to null while we're running to help prevent any unintended
388383 // usage.
389- let state = context_get ( ) . cast :: < FutureState > ( ) ;
384+ let state = context_get ( ) . cast :: < FutureState < ' static > > ( ) ;
390385 assert ! ( !state. is_null( ) ) ;
391386 unsafe {
392387 context_set ( ptr:: null_mut ( ) ) ;
@@ -408,27 +403,23 @@ pub unsafe fn callback(event0: u32, event1: u32, event2: u32) -> u32 {
408403 }
409404}
410405
411- /// Defer the specified future to be run after the current async-lifted export
412- /// task has returned a value.
413- ///
414- /// The task will remain in a running state until all spawned futures have
415- /// completed.
416- pub fn spawn ( future : impl Future < Output = ( ) > + ' static ) {
417- unsafe { SPAWNED . push ( Box :: pin ( future) ) }
418- }
419-
420406/// Run the specified future to completion, returning the result.
421407///
422408/// This uses `waitable-set.wait` to poll for progress on any in-progress calls
423409/// to async-lowered imports as necessary.
424410// TODO: refactor so `'static` bounds aren't necessary
425- pub fn block_on < T : ' static > ( future : impl Future < Output = T > + ' static ) -> T {
426- let ( tx, mut rx) = oneshot:: channel ( ) ;
427- let state = & mut FutureState :: new ( Box :: pin ( future. map ( move |v| drop ( tx. send ( v) ) ) ) as BoxFuture ) ;
411+ pub fn block_on < T : ' static > ( future : impl Future < Output = T > ) -> T {
412+ let mut result = None ;
413+ let mut state = FutureState :: new ( Box :: pin ( async {
414+ result = Some ( future. await ) ;
415+ } ) ) ;
428416 let mut event = ( EVENT_NONE , 0 , 0 ) ;
429417 loop {
430418 match state. callback ( event. 0 , event. 1 , event. 2 ) {
431- ( _, true ) => break rx. try_recv ( ) . unwrap ( ) . unwrap ( ) ,
419+ ( _, true ) => {
420+ drop ( state) ;
421+ break result. unwrap ( ) ;
422+ }
432423 ( CALLBACK_CODE_YIELD , false ) => event = state. waitable_set . as_ref ( ) . unwrap ( ) . poll ( ) ,
433424 _ => event = state. waitable_set . as_ref ( ) . unwrap ( ) . wait ( ) ,
434425 }
0 commit comments