From af7ed364874f181495127aba9ac8749b262f71e3 Mon Sep 17 00:00:00 2001 From: Kyle Machulis Date: Tue, 9 Jun 2026 17:19:17 -0700 Subject: [PATCH 01/20] docs: add Task Scope and Task Registry domain terms Co-Authored-By: Claude Fable 5 --- CONTEXT.md | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/CONTEXT.md b/CONTEXT.md index 5ee2983c4..a52ae4263 100644 --- a/CONTEXT.md +++ b/CONTEXT.md @@ -92,6 +92,14 @@ _Avoid_: "Discovery" as a distinct stage from identification — they're the sam An opt-in broadcast of every output command sent to a **Device** — device index, feature index, output type, and value. Used by frontends (e.g. Intiface Central) to visually display real-time device activity, verify hardware behaviour matches commands, and let developers see what *would* happen with simulated devices. Disabled by default to avoid overhead. _Avoid_: Treating as internal-only debugging; it's a user-facing observability feature. +**Task Scope**: +The owner of spawned async tasks within a module. Every task is spawned through a Task Scope, which links it to a parent, derives its hierarchical name (e.g. `server/device-manager/device-3/keepalive`), registers it in the **Task Registry**, and hands it a cooperative cancellation token. Dropping a scope cancels its children. Tasks cannot be spawned without a parent scope. +_Avoid_: "Detached task" or bare spawning as the normal pattern; detachment is the rare, explicit exception. + +**Task Registry**: +The queryable record of every live task — id, hierarchical path, parent, state. Populated as a side effect of spawning through a **Task Scope**. Exposed in-process for tests and embedders, and to frontends via TaskStarted/TaskEnded **Events** plus a snapshot query (same opt-in pattern as **Output Observation**). +_Avoid_: Treating as internal-only debugging; like Output Observations, it's user-facing observability. + **Command**: A message from **Client** to **Server** requesting an action — controlling a device, starting a scan, requesting device lists. Always has a non-zero message ID; the server responds with an `Ok` or `Error` using the same ID. _Avoid_: Referring to server-initiated messages as commands. From 0bf2483aa1e49293779a29d241b9361adac8a8b3 Mon Sep 17 00:00:00 2001 From: Kyle Machulis Date: Tue, 9 Jun 2026 17:27:05 -0700 Subject: [PATCH 02/20] feat(core): add TaskRegistry for named task introspection Introduces the Task Registry in buttplug_core::util::task -- the global record of every live task. Provides TaskId, TaskInfo, TaskEvent, and TaskOutcome types, segment-aware prefix matching (live_count_under), broadcast lifecycle events, and wait_empty_under for shutdown synchronization. The scope module re-export is commented out until Task 2 lands TaskScope. Co-Authored-By: Claude Fable 5 --- crates/buttplug_core/Cargo.toml | 2 + crates/buttplug_core/src/util/mod.rs | 1 + crates/buttplug_core/src/util/task/mod.rs | 20 ++ .../buttplug_core/src/util/task/registry.rs | 210 ++++++++++++++++++ 4 files changed, 233 insertions(+) create mode 100644 crates/buttplug_core/src/util/task/mod.rs create mode 100644 crates/buttplug_core/src/util/task/registry.rs diff --git a/crates/buttplug_core/Cargo.toml b/crates/buttplug_core/Cargo.toml index f979fff80..bb7f5cef0 100644 --- a/crates/buttplug_core/Cargo.toml +++ b/crates/buttplug_core/Cargo.toml @@ -57,3 +57,5 @@ wasm-bindgen-futures = { version = "0.4.64", optional = true } wasmtimer = { version = "0.4.3", optional = true } smallvec = { version = "1.15.1", features = ["serde", "const_generics"] } enumflags2 = "0.7.12" +tokio-util = "0.7.18" +dashmap = "6.1.0" diff --git a/crates/buttplug_core/src/util/mod.rs b/crates/buttplug_core/src/util/mod.rs index 439950aeb..5734abdb4 100644 --- a/crates/buttplug_core/src/util/mod.rs +++ b/crates/buttplug_core/src/util/mod.rs @@ -14,6 +14,7 @@ pub mod range; pub mod serializers; pub mod small_vec_enum_map; pub mod stream; +pub mod task; #[cfg(all(not(feature = "wasm"), feature = "tokio-runtime"))] pub use tokio::time::sleep; diff --git a/crates/buttplug_core/src/util/task/mod.rs b/crates/buttplug_core/src/util/task/mod.rs new file mode 100644 index 000000000..7beacdc18 --- /dev/null +++ b/crates/buttplug_core/src/util/task/mod.rs @@ -0,0 +1,20 @@ +// Buttplug Rust Source Code File - See https://buttplug.io for more info. +// +// Copyright 2016-2026 Nonpolynomial Labs LLC. All rights reserved. +// +// Licensed under the BSD 3-Clause license. See LICENSE file in the project root +// for full license information. + +//! Task Scope and Task Registry: ownership and introspection for spawned tasks. +//! +//! Every task is spawned through a [TaskScope], which links it into an +//! ownership tree, derives its hierarchical path, registers it in the global +//! [TaskRegistry], and hands it a cooperative [CancellationToken]. Dropping a +//! scope cancels its subtree. + +mod registry; +// `scope` lands in Task 2 of the task-scope-lifecycle plan. +// mod scope; + +pub use registry::{TaskEvent, TaskId, TaskInfo, TaskOutcome, TaskRegistry, registry}; +// pub use scope::TaskScope; diff --git a/crates/buttplug_core/src/util/task/registry.rs b/crates/buttplug_core/src/util/task/registry.rs new file mode 100644 index 000000000..17f601608 --- /dev/null +++ b/crates/buttplug_core/src/util/task/registry.rs @@ -0,0 +1,210 @@ +// Buttplug Rust Source Code File - See https://buttplug.io for more info. +// +// Copyright 2016-2026 Nonpolynomial Labs LLC. All rights reserved. +// +// Licensed under the BSD 3-Clause license. See LICENSE file in the project root +// for full license information. + +use dashmap::DashMap; +use std::sync::{ + OnceLock, + atomic::{AtomicU64, Ordering}, +}; +use tokio::sync::broadcast; + +/// Unique identifier for a registered task. Process-lifetime unique. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct TaskId(u64); + +impl TaskId { + pub fn value(&self) -> u64 { + self.0 + } +} + +/// How a task ended: ran to completion on its own, or exited after observing +/// cancellation. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum TaskOutcome { + Completed, + Cancelled, +} + +/// Registry entry for a live task. +#[derive(Debug, Clone)] +pub struct TaskInfo { + pub id: TaskId, + /// Hierarchical path, e.g. "device-manager-1/devices/device-3/io". + pub path: String, + /// True if spawned via spawn_detached (no owning scope). + pub detached: bool, +} + +/// Lifecycle events broadcast by the Task Registry. +#[derive(Debug, Clone)] +pub enum TaskEvent { + Started { + id: TaskId, + path: String, + }, + Ended { + id: TaskId, + path: String, + outcome: TaskOutcome, + }, +} + +/// The global record of every live task, populated as a side effect of +/// spawning through a Task Scope. +#[derive(Debug)] +pub struct TaskRegistry { + tasks: DashMap, + counter: AtomicU64, + root_counter: AtomicU64, + events: broadcast::Sender, +} + +/// The global Task Registry. +pub fn registry() -> &'static TaskRegistry { + static REGISTRY: OnceLock = OnceLock::new(); + REGISTRY.get_or_init(TaskRegistry::new) +} + +impl TaskRegistry { + pub(super) fn new() -> Self { + Self { + tasks: DashMap::new(), + counter: AtomicU64::new(1), + root_counter: AtomicU64::new(1), + events: broadcast::channel(256).0, + } + } + + /// Unique suffix for root scope names so parallel instances don't collide. + pub(super) fn next_root_suffix(&self) -> u64 { + self.root_counter.fetch_add(1, Ordering::Relaxed) + } + + pub(super) fn register(&self, path: String, detached: bool) -> TaskId { + let id = TaskId(self.counter.fetch_add(1, Ordering::Relaxed)); + self.tasks.insert( + id.0, + TaskInfo { + id, + path: path.clone(), + detached, + }, + ); + let _ = self.events.send(TaskEvent::Started { id, path }); + id + } + + pub(super) fn deregister(&self, id: TaskId, outcome: TaskOutcome) { + if let Some((_, info)) = self.tasks.remove(&id.0) { + let _ = self.events.send(TaskEvent::Ended { + id, + path: info.path, + outcome, + }); + } + } + + /// Snapshot of all live tasks. + pub fn snapshot(&self) -> Vec { + self.tasks.iter().map(|e| e.value().clone()).collect() + } + + /// Count of live tasks at or under the given path prefix. Prefix matching is + /// segment-aware: "server-1" matches "server-1/loop" but not "server-10/loop". + pub fn live_count_under(&self, prefix: &str) -> usize { + self + .tasks + .iter() + .filter(|e| path_is_under(&e.value().path, prefix)) + .count() + } + + /// Subscribe to task lifecycle events. + pub fn event_stream(&self) -> broadcast::Receiver { + self.events.subscribe() + } + + /// Wait until no live tasks remain at or under the given path prefix. + /// Subscribes to events BEFORE counting to avoid missing an Ended event + /// between the count and the wait. Callers should wrap in a timeout if the + /// subtree may contain uncooperative tasks. + pub async fn wait_empty_under(&self, prefix: &str) { + let mut events = self.events.subscribe(); + while self.live_count_under(prefix) > 0 { + match events.recv().await { + Ok(_) | Err(broadcast::error::RecvError::Lagged(_)) => continue, + // Sender is owned by the registry itself; Closed only happens for + // non-global registries in tests being dropped. + Err(broadcast::error::RecvError::Closed) => return, + } + } + } +} + +fn path_is_under(path: &str, prefix: &str) -> bool { + path == prefix || (path.starts_with(prefix) && path.as_bytes().get(prefix.len()) == Some(&b'/')) +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_register_and_snapshot() { + let reg = TaskRegistry::new(); + let id = reg.register("root-1/loop".to_owned(), false); + let snapshot = reg.snapshot(); + assert_eq!(snapshot.len(), 1); + assert_eq!(snapshot[0].id, id); + assert_eq!(snapshot[0].path, "root-1/loop"); + reg.deregister(id, TaskOutcome::Completed); + assert!(reg.snapshot().is_empty()); + } + + #[test] + fn test_prefix_boundary() { + let reg = TaskRegistry::new(); + reg.register("server-1/loop".to_owned(), false); + reg.register("server-10/loop".to_owned(), false); + // "server-1" must not match "server-10/loop" + assert_eq!(reg.live_count_under("server-1"), 1); + assert_eq!(reg.live_count_under("server-10"), 1); + assert_eq!(reg.live_count_under("server"), 0); + } + + #[tokio::test] + async fn test_events_emitted() { + let reg = TaskRegistry::new(); + let mut events = reg.event_stream(); + let id = reg.register("root-1/task".to_owned(), false); + reg.deregister(id, TaskOutcome::Cancelled); + assert!(matches!( + events.recv().await.unwrap(), + TaskEvent::Started { .. } + )); + let TaskEvent::Ended { outcome, .. } = events.recv().await.unwrap() else { + panic!("expected Ended event"); + }; + assert_eq!(outcome, TaskOutcome::Cancelled); + } + + #[tokio::test] + async fn test_wait_empty_under() { + let reg = std::sync::Arc::new(TaskRegistry::new()); + let id = reg.register("root-1/task".to_owned(), false); + let reg_clone = reg.clone(); + let waiter = tokio::spawn(async move { reg_clone.wait_empty_under("root-1").await }); + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + assert!(!waiter.is_finished()); + reg.deregister(id, TaskOutcome::Completed); + tokio::time::timeout(std::time::Duration::from_secs(1), waiter) + .await + .expect("wait_empty_under did not resolve") + .unwrap(); + } +} From 542e8bd97f6888774564fa7e37f55cc6deca1885 Mon Sep 17 00:00:00 2001 From: Kyle Machulis Date: Tue, 9 Jun 2026 17:29:54 -0700 Subject: [PATCH 03/20] feat(core): add TaskScope ownership tree with cooperative cancellation Co-Authored-By: Claude Fable 5 --- crates/buttplug_core/src/util/task/mod.rs | 5 +- crates/buttplug_core/src/util/task/scope.rs | 255 ++++++++++++++++++++ 2 files changed, 257 insertions(+), 3 deletions(-) create mode 100644 crates/buttplug_core/src/util/task/scope.rs diff --git a/crates/buttplug_core/src/util/task/mod.rs b/crates/buttplug_core/src/util/task/mod.rs index 7beacdc18..81267ff39 100644 --- a/crates/buttplug_core/src/util/task/mod.rs +++ b/crates/buttplug_core/src/util/task/mod.rs @@ -13,8 +13,7 @@ //! scope cancels its subtree. mod registry; -// `scope` lands in Task 2 of the task-scope-lifecycle plan. -// mod scope; +mod scope; pub use registry::{TaskEvent, TaskId, TaskInfo, TaskOutcome, TaskRegistry, registry}; -// pub use scope::TaskScope; +pub use scope::TaskScope; diff --git a/crates/buttplug_core/src/util/task/scope.rs b/crates/buttplug_core/src/util/task/scope.rs new file mode 100644 index 000000000..c9d6521c7 --- /dev/null +++ b/crates/buttplug_core/src/util/task/scope.rs @@ -0,0 +1,255 @@ +// Buttplug Rust Source Code File - See https://buttplug.io for more info. +// +// Copyright 2016-2026 Nonpolynomial Labs LLC. All rights reserved. +// +// Licensed under the BSD 3-Clause license. See LICENSE file in the project root +// for full license information. + +use super::registry::{TaskId, TaskOutcome, registry}; +use crate::util::async_manager; +use std::future::Future; +use tokio_util::sync::CancellationToken; + +/// The owner of spawned async tasks within a module. +/// +/// Tasks can only be spawned through a scope, which derives their hierarchical +/// path, registers them in the global Task Registry, and hands them a +/// cooperative [CancellationToken]. Dropping a scope cancels its subtree. +/// +/// Scopes are intentionally NOT [Clone]: ownership is singular. To share +/// spawning capability, create a [child][TaskScope::child] and move it, or +/// wrap a scope in [std::sync::Arc] when a cloneable holder needs it (the +/// subtree then cancels when the last Arc drops). +#[derive(Debug)] +pub struct TaskScope { + path: String, + token: CancellationToken, +} + +impl TaskScope { + /// Create a root scope. The path gets a unique numeric suffix + /// (e.g. "server-2") so parallel instances in one process don't collide. + pub fn root(name: &str) -> Self { + Self { + path: format!("{}-{}", name, registry().next_root_suffix()), + token: CancellationToken::new(), + } + } + + /// Create a child scope. Cancelling this scope cancels the child. + pub fn child(&self, name: &str) -> Self { + Self { + path: format!("{}/{}", self.path, name), + token: self.token.child_token(), + } + } + + pub fn path(&self) -> &str { + &self.path + } + + /// The scope's own cancellation token, for select!-ing in code that runs + /// inside an already-spawned context. + pub fn token(&self) -> &CancellationToken { + &self.token + } + + /// Request cancellation of every task in this scope's subtree. + pub fn cancel(&self) { + self.token.cancel(); + } + + /// Cancel the subtree and wait until every task under this scope has + /// deregistered. Wrap in a timeout if the subtree may contain + /// uncooperative tasks. + pub async fn shutdown(&self) { + self.token.cancel(); + registry().wait_empty_under(&self.path).await; + } + + /// Spawn a task owned by this scope. The closure receives the task's own + /// child token; long-running tasks MUST select on it. + #[cfg(not(feature = "wasm"))] + pub fn spawn(&self, name: &str, f: F) + where + F: FnOnce(CancellationToken) -> Fut, + Fut: Future + Send + 'static, + { + let (id, task_token, span) = self.register_task(name); + let fut = f(task_token.clone()); + async_manager::spawn(finish_task(fut, id, task_token), span); + } + + /// Spawn a task owned by this scope (WASM, no Send required). + #[cfg(feature = "wasm")] + pub fn spawn(&self, name: &str, f: F) + where + F: FnOnce(CancellationToken) -> Fut, + Fut: Future + 'static, + { + let (id, task_token, span) = self.register_task(name); + let fut = f(task_token.clone()); + async_manager::spawn(finish_task(fut, id, task_token), span); + } + + /// Consume the scope and spawn a task that holds it alive for its own + /// duration. Use when the caller has nowhere to store the scope (e.g. + /// protocol subscription handlers): drop-cancel must not fire before the + /// task runs, but parent cancellation still propagates. + #[cfg(not(feature = "wasm"))] + pub fn spawn_and_hold(self, name: &str, f: F) + where + F: FnOnce(CancellationToken) -> Fut, + Fut: Future + Send + 'static, + { + let (id, task_token, span) = self.register_task(name); + let fut = f(task_token.clone()); + async_manager::spawn( + async move { + let _hold = self; + finish_task(fut, id, task_token).await; + }, + span, + ); + } + + /// Consume the scope and spawn a task that holds it alive (WASM, no Send). + #[cfg(feature = "wasm")] + pub fn spawn_and_hold(self, name: &str, f: F) + where + F: FnOnce(CancellationToken) -> Fut, + Fut: Future + 'static, + { + let (id, task_token, span) = self.register_task(name); + let fut = f(task_token.clone()); + async_manager::spawn( + async move { + let _hold = self; + finish_task(fut, id, task_token).await; + }, + span, + ); + } + + fn register_task(&self, name: &str) -> (TaskId, CancellationToken, tracing::Span) { + let path = format!("{}/{}", self.path, name); + let id = registry().register(path.clone(), false); + let task_token = self.token.child_token(); + // Span names must be const in tracing; the dynamic path goes in a field. + let span = tracing::span!(tracing::Level::INFO, "buttplug_task", task.path = %path); + (id, task_token, span) + } +} + +async fn finish_task(fut: impl Future, id: TaskId, token: CancellationToken) { + fut.await; + let outcome = if token.is_cancelled() { + TaskOutcome::Cancelled + } else { + TaskOutcome::Completed + }; + registry().deregister(id, outcome); +} + +impl Drop for TaskScope { + fn drop(&mut self) { + self.token.cancel(); + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::util::task::registry; + use std::time::Duration; + + #[test] + fn test_root_path_unique() { + let a = TaskScope::root("testroot"); + let b = TaskScope::root("testroot"); + assert_ne!(a.path(), b.path()); + assert!(a.path().starts_with("testroot-")); + } + + #[test] + fn test_child_path() { + let root = TaskScope::root("testroot"); + let child = root.child("devices"); + assert_eq!(child.path(), format!("{}/devices", root.path())); + } + + #[tokio::test] + async fn test_spawn_registers_and_completes() { + let root = TaskScope::root("spawntest"); + let path = root.path().to_owned(); + let (tx, rx) = tokio::sync::oneshot::channel(); + root.spawn("worker", |_token| async move { + let _ = tx.send(()); + }); + rx.await.unwrap(); + tokio::time::timeout(Duration::from_secs(1), registry().wait_empty_under(&path)) + .await + .expect("task did not deregister after completion"); + } + + #[tokio::test] + async fn test_cancel_propagates_to_children() { + let root = TaskScope::root("canceltest"); + let path = root.path().to_owned(); + let child = root.child("inner"); + child.spawn("worker", |token| async move { + token.cancelled().await; + }); + root.cancel(); + tokio::time::timeout(Duration::from_secs(1), registry().wait_empty_under(&path)) + .await + .expect("cancel did not propagate to child scope task"); + } + + #[tokio::test] + async fn test_drop_cancels() { + let root = TaskScope::root("droptest"); + let path = root.path().to_owned(); + root.spawn("worker", |token| async move { + token.cancelled().await; + }); + drop(root); + tokio::time::timeout(Duration::from_secs(1), registry().wait_empty_under(&path)) + .await + .expect("drop did not cancel task"); + } + + #[tokio::test] + async fn test_shutdown_awaits_subtree() { + let root = TaskScope::root("shutdowntest"); + root.spawn("worker", |token| async move { + token.cancelled().await; + // Simulate cleanup work after observing cancellation. + tokio::time::sleep(Duration::from_millis(20)).await; + }); + tokio::time::timeout(Duration::from_secs(1), root.shutdown()) + .await + .expect("shutdown did not resolve"); + } + + #[tokio::test] + async fn test_spawn_and_hold_keeps_scope_alive() { + let root = TaskScope::root("holdtest"); + let path = root.path().to_owned(); + let sub_scope = root.child("subscription"); + let (tx, rx) = tokio::sync::oneshot::channel(); + // Consuming spawn: the scope moves INTO the task and must not cancel it. + sub_scope.spawn_and_hold("worker", |_token| async move { + tokio::time::sleep(Duration::from_millis(20)).await; + let _ = tx.send(()); + }); + // If drop-cancel fired early this would hang (task cancelled before send). + tokio::time::timeout(Duration::from_secs(1), rx) + .await + .expect("spawn_and_hold task was cancelled early") + .unwrap(); + tokio::time::timeout(Duration::from_secs(1), registry().wait_empty_under(&path)) + .await + .expect("task did not deregister"); + } +} From 246e9304c614239baed87e37966fab6ad8369ffc Mon Sep 17 00:00:00 2001 From: Kyle Machulis Date: Tue, 9 Jun 2026 17:35:40 -0700 Subject: [PATCH 04/20] feat(core): add spawn_detached escape hatch for unowned tasks Adds spawn_detached() to buttplug_core::util::task for spawning tasks with no owning scope. Registered under "detached/{name}" so they remain visible in registry snapshots, but nothing can cancel them. Provides Send and non-Send (wasm) cfg variants for parity. Co-Authored-By: Claude Fable 5 --- crates/buttplug_core/src/util/task/mod.rs | 64 +++++++++++++++++++++++ 1 file changed, 64 insertions(+) diff --git a/crates/buttplug_core/src/util/task/mod.rs b/crates/buttplug_core/src/util/task/mod.rs index 81267ff39..149633296 100644 --- a/crates/buttplug_core/src/util/task/mod.rs +++ b/crates/buttplug_core/src/util/task/mod.rs @@ -17,3 +17,67 @@ mod scope; pub use registry::{TaskEvent, TaskId, TaskInfo, TaskOutcome, TaskRegistry, registry}; pub use scope::TaskScope; + +use crate::util::async_manager; +use std::future::Future; + +/// Spawn a task with no owning scope. Registered in the Task Registry under +/// "detached/{name}" so it still shows up in snapshots, but nothing can cancel +/// it. RARE — prefer [TaskScope::spawn]. Valid uses: one-shot notifications +/// where the spawner is being destroyed. +#[cfg(not(feature = "wasm"))] +pub fn spawn_detached(name: &str, fut: Fut) +where + Fut: Future + Send + 'static, +{ + let path = format!("detached/{name}"); + let id = registry().register(path.clone(), true); + let span = tracing::span!(tracing::Level::INFO, "buttplug_task", task.path = %path); + async_manager::spawn( + async move { + fut.await; + registry().deregister(id, TaskOutcome::Completed); + }, + span, + ); +} + +/// Spawn a task with no owning scope (WASM, no Send required). See the +/// non-WASM variant for semantics. +#[cfg(feature = "wasm")] +pub fn spawn_detached(name: &str, fut: Fut) +where + Fut: Future + 'static, +{ + let path = format!("detached/{name}"); + let id = registry().register(path.clone(), true); + let span = tracing::span!(tracing::Level::INFO, "buttplug_task", task.path = %path); + async_manager::spawn( + async move { + fut.await; + registry().deregister(id, TaskOutcome::Completed); + }, + span, + ); +} + +#[cfg(test)] +mod test { + use super::*; + use std::time::Duration; + + #[tokio::test] + async fn test_spawn_detached_registers() { + let (tx, rx) = tokio::sync::oneshot::channel(); + spawn_detached("test-notify", async move { + let _ = tx.send(()); + }); + rx.await.unwrap(); + tokio::time::timeout( + Duration::from_secs(1), + registry().wait_empty_under("detached/test-notify"), + ) + .await + .expect("detached task did not deregister"); + } +} From 22e91ba9514b399c971b17aa221a0626fb61b8d6 Mon Sep 17 00:00:00 2001 From: Kyle Machulis Date: Tue, 9 Jun 2026 17:40:29 -0700 Subject: [PATCH 05/20] docs(core): clarify TaskId wrap and TaskRegistry memory high-water behaviour - TaskId: document that u64 counter wrap is not a practical concern at any realistic task spawn rate within a single process lifetime. - TaskRegistry: document that DashMap shard capacity is not reclaimed after deregister, making peak concurrent-task count a memory high-water mark for the process lifetime. Co-Authored-By: Claude Fable 5 --- crates/buttplug_core/src/util/task/registry.rs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/crates/buttplug_core/src/util/task/registry.rs b/crates/buttplug_core/src/util/task/registry.rs index 17f601608..75185bcfc 100644 --- a/crates/buttplug_core/src/util/task/registry.rs +++ b/crates/buttplug_core/src/util/task/registry.rs @@ -13,6 +13,11 @@ use std::sync::{ use tokio::sync::broadcast; /// Unique identifier for a registered task. Process-lifetime unique. +/// +/// IDs are drawn from a monotonically-incrementing `AtomicU64` counter. At +/// 64-bit width and even at 10 million tasks per second the counter would take +/// roughly 58,000 years to wrap, so id reuse within a single process lifetime +/// is not a practical concern. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub struct TaskId(u64); @@ -56,6 +61,12 @@ pub enum TaskEvent { /// The global record of every live task, populated as a side effect of /// spawning through a Task Scope. +/// +/// **Memory note**: entries are removed on `deregister`, but `DashMap` does not +/// shrink shard capacity after removals. Peak concurrent-task count therefore +/// becomes a memory high-water mark that is held for the lifetime of the +/// registry (i.e. the process). In practice buttplug servers run a bounded +/// number of concurrent tasks, so this is not expected to be significant. #[derive(Debug)] pub struct TaskRegistry { tasks: DashMap, From b3821283ae6541e65df9c78e136c629335e1736d Mon Sep 17 00:00:00 2001 From: Kyle Machulis Date: Tue, 9 Jun 2026 17:50:06 -0700 Subject: [PATCH 06/20] feat(server): device manager and device tasks owned by TaskScopes Task 4: ServerDeviceManager now owns a device-manager root TaskScope instead of a bare CancellationToken. The event loop is spawned through the scope (receiving a child token), and a 'devices' child scope is threaded into the event loop so every device gets its own per-device scope at bringup. Shutdown cancels the scope and drains the subtree via registry().wait_empty_under(path) since TaskScope is not Clone. Task 5: DeviceHandle carries an Arc (it is Clone, so the subtree cancels when the last clone drops). build_device_handle takes the per-device scope, wraps it in an Arc, and spawns both the device IO task and the event-forwarding task through it. run_device_task and the forwarding loop each gain a token.cancelled() select arm (the device task's arm sits directly after 'biased;' so cancellation wins over new work). Co-Authored-By: Claude Fable 5 --- .../src/device/device_handle.rs | 81 ++++++++++++------- .../buttplug_server/src/device/device_task.rs | 15 +++- .../src/device/server_device_manager.rs | 52 +++++++----- .../server_device_manager_event_loop.rs | 14 ++++ 4 files changed, 109 insertions(+), 53 deletions(-) diff --git a/crates/buttplug_server/src/device/device_handle.rs b/crates/buttplug_server/src/device/device_handle.rs index f666b2a18..df6dd456a 100644 --- a/crates/buttplug_server/src/device/device_handle.rs +++ b/crates/buttplug_server/src/device/device_handle.rs @@ -26,7 +26,7 @@ use buttplug_core::{ OutputValue, StopCmdV4, }, - util::stream::convert_broadcast_receiver_to_stream, + util::{stream::convert_broadcast_receiver_to_stream, task::TaskScope}, }; use buttplug_server_device_config::{ DeviceConfigurationManager, @@ -111,6 +111,9 @@ pub struct DeviceHandle { stop_commands: Arc>, internal_hw_msg_sender: Sender>, output_observation_sender: Option>, + /// Scope owning this device's tasks. Rides in an Arc since DeviceHandle is + /// Clone; the subtree cancels when the last clone drops. + task_scope: Arc, } impl DeviceHandle { @@ -123,6 +126,7 @@ impl DeviceHandle { stop_commands: Vec, internal_hw_msg_sender: Sender>, output_observation_sender: Option>, + task_scope: Arc, ) -> Self { Self { hardware, @@ -134,6 +138,7 @@ impl DeviceHandle { stop_commands: Arc::new(stop_commands), internal_hw_msg_sender, output_observation_sender, + task_scope, } } @@ -455,7 +460,12 @@ pub(super) async fn build_device_handle( protocol_specializers: Vec, device_event_sender: tokio::sync::mpsc::Sender, output_observation_sender: Option>, + task_scope: TaskScope, ) -> Result { + // DeviceHandle is Clone, so its scope rides in an Arc; the device subtree + // cancels when the last clone (and thus the last Arc) drops. + let task_scope = Arc::new(task_scope); + // At this point, we know we've got hardware that is waiting to connect, and enough protocol // info to actually do something after we connect. So go ahead and connect. trace!("Connecting to {:?}", hardware_connector); @@ -524,6 +534,7 @@ pub(super) async fn build_device_handle( }; spawn_device_task( + &task_scope, hardware.clone(), handler.clone(), DeviceTaskConfig { @@ -590,6 +601,7 @@ pub(super) async fn build_device_handle( stop_commands, internal_hw_msg_sender, output_observation_sender, + task_scope.clone(), ); // If we need a keepalive with a packet replay, set this up via stopping the device on connect. @@ -614,41 +626,48 @@ pub(super) async fn build_device_handle( // to the device manager event loop via the provided sender. let event_stream = device_handle.event_stream(); let identifier = device_handle.identifier().clone(); - buttplug_core::spawn!("DeviceEventForwarding", async move { + task_scope.spawn("event-forwarding", move |token| async move { futures::pin_mut!(event_stream); loop { - let event = futures::StreamExt::next(&mut event_stream).await; - match event { - Some(DeviceEvent::Disconnected(id)) => { - if device_event_sender - .send(InternalDeviceEvent::Disconnected(id)) - .await - .is_err() - { - info!( - "Device event sender closed for device {:?}, stopping event forwarding.", - identifier - ); - break; - } + tokio::select! { + _ = token.cancelled() => { + info!("Event forwarding cancelled for device {:?}", identifier); + break; } - Some(DeviceEvent::Notification(_, msg)) => { - if device_event_sender - .send(InternalDeviceEvent::Notification(msg)) - .await - .is_err() - { - info!( - "Device event sender closed for device {:?}, stopping event forwarding.", - identifier - ); - break; + event = futures::StreamExt::next(&mut event_stream) => { + match event { + Some(DeviceEvent::Disconnected(id)) => { + if device_event_sender + .send(InternalDeviceEvent::Disconnected(id)) + .await + .is_err() + { + info!( + "Device event sender closed for device {:?}, stopping event forwarding.", + identifier + ); + break; + } + } + Some(DeviceEvent::Notification(_, msg)) => { + if device_event_sender + .send(InternalDeviceEvent::Notification(msg)) + .await + .is_err() + { + info!( + "Device event sender closed for device {:?}, stopping event forwarding.", + identifier + ); + break; + } + } + None => { + // Stream ended (device likely disconnected) + break; + } } } - None => { - // Stream ended (device likely disconnected) - break; - } } } }); diff --git a/crates/buttplug_server/src/device/device_task.rs b/crates/buttplug_server/src/device/device_task.rs index a54f1fa43..8664ba35d 100644 --- a/crates/buttplug_server/src/device/device_task.rs +++ b/crates/buttplug_server/src/device/device_task.rs @@ -14,9 +14,10 @@ use std::{collections::VecDeque, sync::Arc, time::Duration}; -use buttplug_core::util::async_manager; +use buttplug_core::util::{async_manager, task::TaskScope}; use futures::future; use tokio::{select, sync::mpsc::Receiver, time::Instant}; +use tokio_util::sync::CancellationToken; use super::{ hardware::{Hardware, HardwareCommand, HardwareEvent, HardwareWriteCmd}, @@ -43,13 +44,14 @@ pub struct DeviceTaskConfig { /// /// Returns immediately after spawning the task. pub fn spawn_device_task( + task_scope: &TaskScope, hardware: Arc, _handler: Arc, config: DeviceTaskConfig, mut command_receiver: Receiver>, ) { - buttplug_core::spawn!("DeviceTask", async move { - run_device_task(hardware, config, &mut command_receiver).await; + task_scope.spawn("io", move |token| async move { + run_device_task(hardware, config, &mut command_receiver, token).await; }); } @@ -61,6 +63,7 @@ async fn run_device_task( hardware: Arc, config: DeviceTaskConfig, command_receiver: &mut Receiver>, + token: CancellationToken, ) { let mut hardware_events = hardware.event_stream(); let device_wait_duration = config.message_gap; @@ -115,6 +118,12 @@ async fn run_device_task( select! { biased; + // Priority 0: Cooperative cancellation - wins over new work. + _ = token.cancelled() => { + info!("Device task cancelled, shutting down"); + return; + } + // Priority 1: Incoming commands msg = command_receiver.recv() => { let Some(commands) = msg else { diff --git a/crates/buttplug_server/src/device/server_device_manager.rs b/crates/buttplug_server/src/device/server_device_manager.rs index 77ce10877..1fcfde287 100644 --- a/crates/buttplug_server/src/device/server_device_manager.rs +++ b/crates/buttplug_server/src/device/server_device_manager.rs @@ -36,7 +36,7 @@ use buttplug_core::{ DeviceListV4, StopCmdV4, }, - util::stream::convert_broadcast_receiver_to_stream, + util::{stream::convert_broadcast_receiver_to_stream, task::TaskScope}, }; use buttplug_server_device_config::{DeviceConfigurationManager, UserDeviceIdentifier}; use dashmap::DashMap; @@ -54,7 +54,6 @@ use std::{ }, }; use tokio::sync::{broadcast, mpsc}; -use tokio_util::sync::CancellationToken; #[derive(Debug)] pub(super) enum DeviceManagerCommand { @@ -175,7 +174,8 @@ impl ServerDeviceManagerBuilder { } let devices = Arc::new(DashMap::new()); - let loop_cancellation_token = CancellationToken::new(); + let task_scope = TaskScope::root("device-manager"); + let devices_scope = task_scope.child("devices"); let output_sender = broadcast::channel(255).0; let output_observation_sender = if self.emit_output_observations { @@ -184,24 +184,31 @@ impl ServerDeviceManagerBuilder { None }; - let mut event_loop = ServerDeviceManagerEventLoop::new( - comm_managers, - self.device_configuration_manager.clone(), - devices.clone(), - loop_cancellation_token.child_token(), - output_sender.clone(), - device_event_receiver, - device_command_receiver, - output_observation_sender.clone(), - ); - buttplug_core::spawn!("ServerDeviceManager event loop", async move { + // Clone everything the event loop needs, since the originals are still + // required to construct the ServerDeviceManager below. + let device_configuration_manager = self.device_configuration_manager.clone(); + let devices_clone = devices.clone(); + let output_sender_clone = output_sender.clone(); + let output_observation_sender_clone = output_observation_sender.clone(); + task_scope.spawn("event-loop", move |token| async move { + let mut event_loop = ServerDeviceManagerEventLoop::new( + comm_managers, + device_configuration_manager, + devices_clone, + token, + devices_scope, + output_sender_clone, + device_event_receiver, + device_command_receiver, + output_observation_sender_clone, + ); event_loop.run().await; }); Ok(ServerDeviceManager { device_configuration_manager: self.device_configuration_manager.clone(), devices, device_command_sender, - loop_cancellation_token, + task_scope, running: Arc::new(AtomicBool::new(true)), output_sender, output_observation_sender, @@ -216,7 +223,7 @@ pub struct ServerDeviceManager { #[getset(get = "pub(crate)")] devices: Arc>, device_command_sender: mpsc::Sender, - loop_cancellation_token: CancellationToken, + task_scope: TaskScope, running: Arc, output_sender: broadcast::Sender, output_observation_sender: Option>, @@ -372,7 +379,10 @@ impl ServerDeviceManager { self.running.store(false, Ordering::Relaxed); let stop_scanning = self.stop_scanning(); let stop_devices = self.stop_devices(&StopCmdV4::default()); - let token = self.loop_cancellation_token.clone(); + // TaskScope is not Clone, so capture its path and cancel here, then await + // the subtree draining inside the returned future via the registry. + let scope_path = self.task_scope.path().to_owned(); + self.task_scope.cancel(); async move { // Force stop scanning, otherwise we can disconnect and instantly try to reconnect while // cleaning up if we're still scanning. @@ -381,7 +391,9 @@ impl ServerDeviceManager { for device in devices.iter() { device.value().disconnect().await?; } - token.cancel(); + buttplug_core::util::task::registry() + .wait_empty_under(&scope_path) + .await; Ok(message::OkV0::default().into()) } .boxed() @@ -391,6 +403,8 @@ impl ServerDeviceManager { impl Drop for ServerDeviceManager { fn drop(&mut self) { info!("Dropping device manager!"); - self.loop_cancellation_token.cancel(); + // The task_scope field cancels its subtree on drop, so we only need to log + // here; explicit cancellation happens automatically when the scope drops. + self.task_scope.cancel(); } } diff --git a/crates/buttplug_server/src/device/server_device_manager_event_loop.rs b/crates/buttplug_server/src/device/server_device_manager_event_loop.rs index 6300586df..6ac2c4318 100644 --- a/crates/buttplug_server/src/device/server_device_manager_event_loop.rs +++ b/crates/buttplug_server/src/device/server_device_manager_event_loop.rs @@ -23,6 +23,7 @@ use crate::device::{ hardware::communication::{HardwareCommunicationManager, HardwareCommunicationManagerEvent}, protocol::ProtocolManager, }; +use buttplug_core::util::task::TaskScope; use dashmap::{DashMap, DashSet}; use futures::{FutureExt, future}; use std::sync::Arc; @@ -68,6 +69,9 @@ pub(super) struct ServerDeviceManagerEventLoop { connecting_devices: Arc>, /// Cancellation token for the event loop loop_cancellation_token: CancellationToken, + /// Scope owning all per-device task subtrees. Each device's tasks are spawned + /// through a `device-{...}` child of this scope. + devices_scope: TaskScope, /// Protocol map, for mapping user definitions to protocols protocol_manager: ProtocolManager, /// Optional sender for output observations, None when disabled @@ -81,6 +85,7 @@ impl ServerDeviceManagerEventLoop { device_config_manager: Arc, device_map: Arc>, loop_cancellation_token: CancellationToken, + devices_scope: TaskScope, server_sender: broadcast::Sender, device_comm_receiver: mpsc::Receiver, device_command_receiver: mpsc::Receiver, @@ -99,6 +104,7 @@ impl ServerDeviceManagerEventLoop { scanning_state: ScanningState::Idle, connecting_devices: Arc::new(DashSet::new()), loop_cancellation_token, + devices_scope, protocol_manager: ProtocolManager::default(), output_observation_sender, } @@ -301,6 +307,13 @@ impl ServerDeviceManagerEventLoop { // Clone sender again for the forwarding task that build_device_handle will spawn let device_event_sender_for_forwarding = self.device_event_sender.clone(); + // Create the per-device scope before spawning the (detached) bringup + // task, since `child()` borrows `self.devices_scope` and the bringup + // task cannot. The device index is not known until identification + // completes inside build_device_handle, so the scope is keyed by the + // device's stable address. + let device_scope = self.devices_scope.child(&format!("device-{address}")); + buttplug_core::util::async_manager::spawn( async move { match build_device_handle( @@ -309,6 +322,7 @@ impl ServerDeviceManagerEventLoop { protocol_specializers, device_event_sender_for_forwarding, output_observation_sender, + device_scope, ) .await { From 0c208ae8a366065c65bcd52a43f37f3e875c6e48 Mon Sep 17 00:00:00 2001 From: Kyle Machulis Date: Tue, 9 Jun 2026 17:53:26 -0700 Subject: [PATCH 07/20] docs: cargo fmt must use nightly toolchain Co-Authored-By: Claude Fable 5 --- CLAUDE.md | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index 4df848dd5..3c7519236 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -11,10 +11,12 @@ cargo build # Debug build cargo build --release # Release build (LTO enabled) cargo test # Run all tests cargo test -p buttplug_server # Run tests for specific crate -cargo fmt --all -- --check # Check formatting -cargo fmt # Auto-format (2-space indent, edition 2024) +cargo +nightly fmt --all -- --check # Check formatting (MUST use nightly) +cargo +nightly fmt # Auto-format (2-space indent, edition 2024) ``` +**Formatting gotcha**: rustfmt.toml uses nightly-only options (`imports_layout`, `empty_item_single_line`). Running `cargo fmt` on the STABLE toolchain silently ignores them and rewrites the entire workspace into the wrong style (~190 files of import-collapsing churn). Always use `cargo +nightly fmt`. CI checks formatting with nightly. + **Linux dependencies**: `libudev-dev`, `libusb-1.0-0-dev` (for serial/HID support) **WASM build**: From fa20399ca12348e324ab023972b7a1e531d29867 Mon Sep 17 00:00:00 2001 From: Kyle Machulis Date: Tue, 9 Jun 2026 17:57:25 -0700 Subject: [PATCH 08/20] feat(server): scope-owned ping timer, delete Drop-spawn hack Rewrite PingTimer to be owned by a TaskScope: the timer task selects on its cancellation token and exits cooperatively, so the PingMessage::End variant and the entire impl Drop for PingTimer (the Drop-spawn hack) are deleted. Dropping the timer now drops its scope, which cancels the task. server_builder creates the per-instance "server" root scope. The ping-timeout FnOnce callback moves a child scope in and uses spawn_and_hold for the stop-devices task. PingTimer::new and ButtplugServer::new gain the scope parameter; the server stores it. ButtplugServer::shutdown cancels its own scope and awaits only its own subtree via registry().wait_empty_under -- not the device manager's subtree, since a shared device manager outlives the server and handles its own shutdown. Co-Authored-By: Claude Fable 5 --- crates/buttplug_server/src/ping_timer.rs | 32 ++++++++------------ crates/buttplug_server/src/server.rs | 22 ++++++++++++-- crates/buttplug_server/src/server_builder.rs | 18 +++++++++-- 3 files changed, 47 insertions(+), 25 deletions(-) diff --git a/crates/buttplug_server/src/ping_timer.rs b/crates/buttplug_server/src/ping_timer.rs index 4b7ec5a9d..2fece8dba 100644 --- a/crates/buttplug_server/src/ping_timer.rs +++ b/crates/buttplug_server/src/ping_timer.rs @@ -5,19 +5,19 @@ // Licensed under the BSD 3-Clause license. See LICENSE file in the project root // for full license information. -use buttplug_core::util::async_manager; +use buttplug_core::util::{async_manager, task::TaskScope}; use futures::Future; use std::{sync::Arc, time::Duration}; use tokio::{ select, sync::{Mutex, mpsc}, }; +use tokio_util::sync::CancellationToken; pub enum PingMessage { Ping, StartTimer, StopTimer, - End, } /// Internal ping timer task that monitors for ping timeouts. @@ -26,6 +26,7 @@ async fn ping_timer( max_ping_time: u32, mut ping_msg_receiver: mpsc::Receiver, on_ping_timeout: Arc>>, + token: CancellationToken, ) where F: FnOnce() + Send + 'static, { @@ -33,6 +34,9 @@ async fn ping_timer( let mut pinged = false; loop { select! { + _ = token.cancelled() => { + return; + } _ = async_manager::sleep(Duration::from_millis(max_ping_time.into())) => { if started { if !pinged { @@ -53,7 +57,6 @@ async fn ping_timer( PingMessage::StartTimer => started = true, PingMessage::StopTimer => started = false, PingMessage::Ping => pinged = true, - PingMessage::End => break, } } }; @@ -63,19 +66,8 @@ async fn ping_timer( pub struct PingTimer { max_ping_time: u32, ping_msg_sender: mpsc::Sender, -} - -impl Drop for PingTimer { - fn drop(&mut self) { - // This cannot block, otherwise it will throw in WASM contexts on - // destruction. We must use send(), not blocking_send(). - let sender = self.ping_msg_sender.clone(); - buttplug_core::spawn!("PingTimerDrop", async move { - if sender.send(PingMessage::End).await.is_err() { - debug!("Receiver does not exist, assuming ping timer event loop already dead."); - } - }); - } + // Dropping the timer drops the scope, which cancels the timer task. + _task_scope: TaskScope, } impl PingTimer { @@ -84,19 +76,21 @@ impl PingTimer { /// The callback is called once when the ping timer expires without receiving /// a ping message. If max_ping_time is 0, the timer is disabled and the /// callback will never be called. - pub fn new(max_ping_time: u32, on_ping_timeout: Option) -> Self + pub fn new(max_ping_time: u32, on_ping_timeout: Option, task_scope: TaskScope) -> Self where F: FnOnce() + Send + 'static, { let (sender, receiver) = mpsc::channel(256); if max_ping_time > 0 { let callback = Arc::new(Mutex::new(on_ping_timeout)); - let fut = ping_timer(max_ping_time, receiver, callback); - buttplug_core::spawn!("PingTimer", fut); + task_scope.spawn("timer", move |token| { + ping_timer(max_ping_time, receiver, callback, token) + }); } Self { max_ping_time, ping_msg_sender: sender, + _task_scope: task_scope, } } diff --git a/crates/buttplug_server/src/server.rs b/crates/buttplug_server/src/server.rs index 61642068a..da7341991 100644 --- a/crates/buttplug_server/src/server.rs +++ b/crates/buttplug_server/src/server.rs @@ -35,7 +35,7 @@ use buttplug_core::{ StopCmdV4, StopScanningV0, }, - util::stream::convert_broadcast_receiver_to_stream, + util::{stream::convert_broadcast_receiver_to_stream, task::TaskScope}, }; use futures::{ Stream, @@ -90,6 +90,9 @@ pub struct ButtplugServer { /// Broadcaster for server events. Receivers for this are handed out through the /// [ButtplugServer::event_stream()] method. output_sender: broadcast::Sender, + /// Root scope owning all tasks spawned for this server instance (e.g. the + /// ping timer). Cancelled on shutdown. + task_scope: TaskScope, } impl std::fmt::Debug for ButtplugServer { @@ -110,6 +113,7 @@ impl ButtplugServer { device_manager: Arc, state: Arc>, output_sender: broadcast::Sender, + task_scope: TaskScope, ) -> Self { ButtplugServer { server_name: server_name.to_owned(), @@ -118,6 +122,7 @@ impl ButtplugServer { device_manager, state, output_sender, + task_scope, } } @@ -233,8 +238,19 @@ impl ButtplugServer { pub fn shutdown(&self) -> ButtplugServerResultFuture { let device_manager = self.device_manager.clone(); - //let disconnect_future = self.disconnect(); - async move { device_manager.shutdown().await }.boxed() + let scope_path = self.task_scope.path().to_owned(); + self.task_scope.cancel(); + async move { + let result = device_manager.shutdown().await; + // Await only this server's own subtree. A shared device manager + // (with_shared_device_manager) outlives this server, so its subtree is + // NOT awaited here -- device_manager.shutdown() handles its own. + buttplug_core::util::task::registry() + .wait_empty_under(&scope_path) + .await; + result + } + .boxed() } /// Sends a [ButtplugClientMessage] to be parsed by the server (for handshake or ping), or passed diff --git a/crates/buttplug_server/src/server_builder.rs b/crates/buttplug_server/src/server_builder.rs index d5ab30e5b..5763f384e 100644 --- a/crates/buttplug_server/src/server_builder.rs +++ b/crates/buttplug_server/src/server_builder.rs @@ -14,6 +14,7 @@ use super::{ use buttplug_core::{ errors::*, message::{self, ButtplugServerMessageV4, StopCmdV4}, + util::task::TaskScope, }; use buttplug_server_device_config::DeviceConfigurationManagerBuilder; use std::sync::{Arc, RwLock}; @@ -98,12 +99,16 @@ impl ButtplugServerBuilder { let ping_time = self.max_ping_time.unwrap_or(0); + // Root scope owning all tasks spawned for this server instance. + let task_scope = TaskScope::root("server"); + // Create the ping timeout callback if ping time is configured. // The callback handles: updating state, stopping devices, and sending error. let ping_timeout_callback = if ping_time > 0 { let state_clone = state.clone(); let device_manager_clone = self.device_manager.clone(); let output_sender_clone = output_sender.clone(); + let ping_timeout_scope = task_scope.child("ping-timeout"); Some(move || { error!("Ping out signal received, stopping server"); @@ -112,8 +117,10 @@ impl ButtplugServerBuilder { let mut state_guard = state_clone.write().expect("State lock poisoned"); *state_guard = ConnectionState::PingedOut; } - // Stop all devices (spawn async task since callback is sync) - buttplug_core::spawn!("PingTimeoutStopDevices", async move { + // Stop all devices (spawn async task since callback is sync). The + // callback is FnOnce, so the child scope moves in and is consumed by + // spawn_and_hold, keeping it alive for the duration of the task. + ping_timeout_scope.spawn_and_hold("stop-devices", move |_token| async move { if let Err(e) = device_manager_clone .stop_devices(&StopCmdV4::default()) .await @@ -135,7 +142,11 @@ impl ButtplugServerBuilder { None }; - let ping_timer = Arc::new(PingTimer::new(ping_time, ping_timeout_callback)); + let ping_timer = Arc::new(PingTimer::new( + ping_time, + ping_timeout_callback, + task_scope.child("ping-timer"), + )); // Assuming everything passed, return the server. Ok(ButtplugServer::new( @@ -145,6 +156,7 @@ impl ButtplugServerBuilder { self.device_manager.clone(), state, output_sender, + task_scope, )) } } From 58ebc49628deaec380091732e142b49815888219 Mon Sep 17 00:00:00 2001 From: Kyle Machulis Date: Tue, 9 Jun 2026 18:02:14 -0700 Subject: [PATCH 09/20] feat(server): protocol subscription tasks owned by device scopes Add an owned task_scope: TaskScope parameter to ProtocolHandler::handle_input_subscribe_cmd. DeviceHandle's wrapper hands each handler a self.task_scope.child("subscription") scope so subscription event-handler tasks are owned by the per-device scope and get cancelled on device removal. kgoal_boost's internal subscription spawn becomes task_scope.spawn_and_hold with a biased token.cancelled() select arm wrapping the hardware event recv loop, so a blocked recv is interrupted on cancellation. The existing receiver_count/subscribed_sensors early-exit is retained as a nobody-listening optimization. galaku overrides the trait but performs no internal spawn, so it only gains the (unused) parameter. kiiroo_v21's override is entirely inside a block comment and is not a compiled overrider, so it is untouched. Co-Authored-By: Claude Fable 5 --- .../buttplug_server/src/device/device_handle.rs | 10 +++++++++- crates/buttplug_server/src/device/protocol.rs | 2 ++ .../src/device/protocol_impl/galaku.rs | 2 ++ .../src/device/protocol_impl/kgoal_boost.rs | 17 ++++++++++++++--- 4 files changed, 27 insertions(+), 4 deletions(-) diff --git a/crates/buttplug_server/src/device/device_handle.rs b/crates/buttplug_server/src/device/device_handle.rs index df6dd456a..11de7df34 100644 --- a/crates/buttplug_server/src/device/device_handle.rs +++ b/crates/buttplug_server/src/device/device_handle.rs @@ -407,9 +407,17 @@ impl DeviceHandle { info!("Handling input subscribe command"); let device = self.hardware.clone(); let handler = self.handler.clone(); + let task_scope = self.task_scope.child("subscription"); async move { handler - .handle_input_subscribe_cmd(device_index, device, feature_index, feature_id, input_type) + .handle_input_subscribe_cmd( + device_index, + device, + feature_index, + feature_id, + input_type, + task_scope, + ) .await .map(|_| message::OkV0::new(1).into()) .map_err(|e| e.into()) diff --git a/crates/buttplug_server/src/device/protocol.rs b/crates/buttplug_server/src/device/protocol.rs index e624b5706..83cc94b8b 100644 --- a/crates/buttplug_server/src/device/protocol.rs +++ b/crates/buttplug_server/src/device/protocol.rs @@ -10,6 +10,7 @@ use buttplug_core::{ errors::ButtplugDeviceError, message::{InputReadingV4, InputType, InputValue, OutputCommand}, + util::task::TaskScope, }; use buttplug_server_device_config::{ Endpoint, @@ -372,6 +373,7 @@ pub trait ProtocolHandler: Sync + Send { _feature_index: u32, _feature_id: Uuid, _sensor_type: InputType, + _task_scope: TaskScope, ) -> BoxFuture<'_, Result<(), ButtplugDeviceError>> { future::ready(Err(ButtplugDeviceError::UnhandledCommand( "Command not implemented for this protocol: InputCmd (Subscribe)".to_string(), diff --git a/crates/buttplug_server/src/device/protocol_impl/galaku.rs b/crates/buttplug_server/src/device/protocol_impl/galaku.rs index f951ba2fb..57fed8654 100644 --- a/crates/buttplug_server/src/device/protocol_impl/galaku.rs +++ b/crates/buttplug_server/src/device/protocol_impl/galaku.rs @@ -14,6 +14,7 @@ use futures_util::{FutureExt, future}; use buttplug_core::errors::ButtplugDeviceError; use buttplug_core::message::{InputReadingV4, InputType, InputTypeReading, InputValue}; +use buttplug_core::util::task::TaskScope; use buttplug_server_device_config::Endpoint; use buttplug_server_device_config::{ @@ -245,6 +246,7 @@ impl ProtocolHandler for Galaku { _feature_index: u32, feature_id: Uuid, sensor_type: InputType, + _task_scope: TaskScope, ) -> BoxFuture<'_, Result<(), ButtplugDeviceError>> { match sensor_type { InputType::Battery => { diff --git a/crates/buttplug_server/src/device/protocol_impl/kgoal_boost.rs b/crates/buttplug_server/src/device/protocol_impl/kgoal_boost.rs index 7e99fef18..c21f1a796 100644 --- a/crates/buttplug_server/src/device/protocol_impl/kgoal_boost.rs +++ b/crates/buttplug_server/src/device/protocol_impl/kgoal_boost.rs @@ -15,7 +15,7 @@ use crate::{ use buttplug_core::{ errors::ButtplugDeviceError, message::{InputReadingV4, InputType, InputValue}, - util::stream::convert_broadcast_receiver_to_stream, + util::{stream::convert_broadcast_receiver_to_stream, task::TaskScope}, }; use buttplug_server_device_config::Endpoint; use futures::{ @@ -65,6 +65,7 @@ impl ProtocolHandler for KGoalBoost { feature_index: u32, feature_id: Uuid, _sensor_type: InputType, + task_scope: TaskScope, ) -> BoxFuture<'_, Result<(), ButtplugDeviceError>> { let sensors = self.subscribed_sensors.load(Ordering::Relaxed); if (sensors & (1 << feature_index as u8)) > 0 { @@ -90,9 +91,19 @@ impl ProtocolHandler for KGoalBoost { let stream_sensors = stream_sensors.clone(); info!("Starting Kgoal subscription"); // If we subscribe successfully, we need to set up our event handler. - buttplug_core::spawn!("Kgoal subscription event handler", async move { + task_scope.spawn_and_hold("kgoal-events", move |token| async move { let mut cached_values = vec![0u32, 0u32]; - while let Ok(info) = hardware_stream.recv().await { + loop { + let info = tokio::select! { + biased; + _ = token.cancelled() => return, + info = hardware_stream.recv() => { + let Ok(info) = info else { + return; + }; + info + } + }; let subscribed_sensors = stream_sensors.load(Ordering::Relaxed); // If we have no receivers, quit. if sender.receiver_count() == 0 || subscribed_sensors == 0 { From 45bcd23d5848f0087e240d47e2cf5b562b59f481 Mon Sep 17 00:00:00 2001 From: Kyle Machulis Date: Tue, 9 Jun 2026 18:19:29 -0700 Subject: [PATCH 10/20] test: assert no leaked tasks after server shutdown Brings a real test-harness device online (which spawns per-device io and event-forwarding tasks plus the device-manager event loop), then asserts the task registry drains back to baseline once shutdown() returns -- before dropping the server, so ordinary channel-drop teardown cannot mask a missing cooperative-cancellation arm. A second post-drop check confirms nothing is stranded. Co-Authored-By: Claude Fable 5 --- .../tests/test_task_lifecycle.rs | 137 ++++++++++++++++++ 1 file changed, 137 insertions(+) create mode 100644 crates/buttplug_tests/tests/test_task_lifecycle.rs diff --git a/crates/buttplug_tests/tests/test_task_lifecycle.rs b/crates/buttplug_tests/tests/test_task_lifecycle.rs new file mode 100644 index 000000000..b8fa725a0 --- /dev/null +++ b/crates/buttplug_tests/tests/test_task_lifecycle.rs @@ -0,0 +1,137 @@ +// Buttplug Rust Source Code File - See https://buttplug.io for more info. +// +// Copyright 2016-2026 Nonpolynomial Labs LLC. All rights reserved. +// +// Licensed under the BSD 3-Clause license. See LICENSE file in the project root +// for full license information. + +mod util; + +use buttplug_core::{ + message::{ + BUTTPLUG_CURRENT_API_MAJOR_VERSION, + BUTTPLUG_CURRENT_API_MINOR_VERSION, + ButtplugServerMessageV4, + RequestServerInfoV4, + StartScanningV0, + }, + util::task::registry, +}; +use buttplug_server::message::ButtplugClientMessageVariant; +use futures::{StreamExt, pin_mut}; +use std::time::Duration; +use util::test_server_with_device; + +/// Bring a real (test-harness) device online and confirm that, once the server +/// is shut down and dropped, no tasks remain registered under the scope tree. +/// +/// This exercises the *device task* lifecycle specifically: a connected device +/// spawns an `io` task whose only exit path is its `token.cancelled()` select +/// arm. If scope cancellation were removed, that task would leak and this test +/// would fail with a non-empty leaked-task list. (A server with no device is +/// insufficient — the device-manager event loop also exits on channel drop, so +/// it cannot prove cancellation actually fires.) +#[tokio::test] +async fn test_server_shutdown_leaves_no_tasks() { + let baseline: Vec = registry().snapshot().into_iter().map(|t| t.path).collect(); + + // Hold the channel so the device stays connected. + let (server, _channel) = test_server_with_device("Massage Demo"); + + let recv = server.server_version_event_stream(); + pin_mut!(recv); + + server + .parse_message(ButtplugClientMessageVariant::V4( + RequestServerInfoV4::new( + "Task Lifecycle Test", + BUTTPLUG_CURRENT_API_MAJOR_VERSION, + BUTTPLUG_CURRENT_API_MINOR_VERSION, + ) + .into(), + )) + .await + .expect("server info request should succeed"); + server + .parse_message(ButtplugClientMessageVariant::V4( + StartScanningV0::default().into(), + )) + .await + .expect("start scanning should succeed"); + + // Wait until the device is actually connected — this is what spawns the + // per-device `io` task that we want to prove gets cleaned up. + tokio::time::timeout(Duration::from_secs(5), async { + while let Some(msg) = recv.next().await { + if let ButtplugServerMessageV4::DeviceList(list) = msg { + if !list.devices().is_empty() { + return; + } + } + } + panic!("device event stream ended before a device connected"); + }) + .await + .expect("timed out waiting for device to connect"); + + // Device is up: the registry must now show more tasks than baseline, and at + // least one of them must be a per-device task. + let after_connect: Vec = registry().snapshot().into_iter().map(|t| t.path).collect(); + let new_tasks: Vec<&String> = after_connect + .iter() + .filter(|p| !baseline.contains(p)) + .collect(); + assert!( + !new_tasks.is_empty(), + "expected scope-spawned tasks after a device connected" + ); + assert!( + new_tasks.iter().any(|p| p.contains("/device-")), + "expected a per-device task in the registry, got: {new_tasks:?}" + ); + + // `shutdown()` is contractually responsible for draining every task it + // spawned: it cancels the scope tree and awaits the registry going empty + // under its path before returning. We assert emptiness *before* dropping the + // server, so that ordinary drop-of-channels teardown cannot mask a missing + // cancellation arm. If any scope-spawned task fails to observe cancellation, + // it will still be parked on its event stream / receiver here and show up as + // leaked. We hold `_channel` alive across this check precisely so the device + // event stream does not close on its own. + server.shutdown().await.expect("server shutdown errored"); + + let leaked: Vec = registry() + .snapshot() + .into_iter() + .map(|t| t.path) + .filter(|p| !baseline.contains(p)) + .collect(); + assert!( + leaked.is_empty(), + "shutdown() returned but tasks are still registered: {leaked:?}" + ); + + // Dropping the server must not resurrect or strand anything either. Give a + // short grace period and confirm we remain at (or below) baseline. + drop(server); + drop(_channel); + tokio::time::timeout(Duration::from_secs(5), async { + loop { + let now: Vec = registry().snapshot().into_iter().map(|t| t.path).collect(); + if now.iter().filter(|p| !baseline.contains(p)).count() == 0 { + return; + } + tokio::time::sleep(Duration::from_millis(50)).await; + } + }) + .await + .unwrap_or_else(|_| { + let leaked: Vec = registry() + .snapshot() + .into_iter() + .map(|t| t.path) + .filter(|p| !baseline.contains(p)) + .collect(); + panic!("leaked tasks after drop: {leaked:?}"); + }); +} From 2abb944812d696da202e6ae551313c7de5bef58b Mon Sep 17 00:00:00 2001 From: Kyle Machulis Date: Tue, 9 Jun 2026 18:29:48 -0700 Subject: [PATCH 11/20] fix(server): address code review issues from phase 2 TaskScope migration MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Bringup-during-shutdown race (server_device_manager_event_loop.rs): Replace raw `async_manager::spawn` for device bringup with `devices_scope.spawn("bringup-{address}", ...)` so the bringup task is registered in the Task Registry for its full duration. Ordering guarantee: io and event-forwarding tasks register inside build_device_handle (synchronously, before it returns) and therefore before the bringup wrapper deregisters — the registry count never momentarily hits zero while work remains. - Drop comment contradicts code (server_device_manager.rs:403-409): Fix misleading comment that said "we only need to log here" while the code still called self.task_scope.cancel() explicitly. Updated comment now correctly states the explicit cancel is intentional and fires eagerly before field drop order takes effect. Co-Authored-By: Claude Fable 5 --- .../src/device/server_device_manager.rs | 5 +-- .../server_device_manager_event_loop.rs | 34 ++++++++++++------- 2 files changed, 24 insertions(+), 15 deletions(-) diff --git a/crates/buttplug_server/src/device/server_device_manager.rs b/crates/buttplug_server/src/device/server_device_manager.rs index 1fcfde287..cb13ddc33 100644 --- a/crates/buttplug_server/src/device/server_device_manager.rs +++ b/crates/buttplug_server/src/device/server_device_manager.rs @@ -403,8 +403,9 @@ impl ServerDeviceManager { impl Drop for ServerDeviceManager { fn drop(&mut self) { info!("Dropping device manager!"); - // The task_scope field cancels its subtree on drop, so we only need to log - // here; explicit cancellation happens automatically when the scope drops. + // Explicitly cancel the scope here for clarity and to trigger shutdown + // eagerly (before field drop order kicks in). Field drop would also cancel + // via TaskScope::drop, but the explicit call makes the intent obvious. self.task_scope.cancel(); } } diff --git a/crates/buttplug_server/src/device/server_device_manager_event_loop.rs b/crates/buttplug_server/src/device/server_device_manager_event_loop.rs index 6ac2c4318..c7718a7f5 100644 --- a/crates/buttplug_server/src/device/server_device_manager_event_loop.rs +++ b/crates/buttplug_server/src/device/server_device_manager_event_loop.rs @@ -298,24 +298,33 @@ impl ServerDeviceManagerEventLoop { let device_config_manager = self.device_config_manager.clone(); let connecting_devices = self.connecting_devices.clone(); let output_observation_sender = self.output_observation_sender.clone(); - let span = info_span!( - "device creation", - name = tracing::field::display(name), - address = tracing::field::display(address.clone()) - ); // Clone sender again for the forwarding task that build_device_handle will spawn let device_event_sender_for_forwarding = self.device_event_sender.clone(); - // Create the per-device scope before spawning the (detached) bringup - // task, since `child()` borrows `self.devices_scope` and the bringup - // task cannot. The device index is not known until identification - // completes inside build_device_handle, so the scope is keyed by the - // device's stable address. + // Create the per-device scope before spawning the bringup task. The + // device index is not known until identification completes inside + // build_device_handle, so both scopes are keyed by the device's stable + // address. let device_scope = self.devices_scope.child(&format!("device-{address}")); - buttplug_core::util::async_manager::spawn( - async move { + // Spawn bringup through devices_scope so it is registered in the Task + // Registry for the duration of the connection attempt. This closes the + // shutdown race: wait_empty_under on the devices scope can only return + // zero after the bringup task deregisters, which happens *after* + // build_device_handle returns. build_device_handle registers the + // io and event-forwarding tasks (via task_scope.spawn) synchronously + // before returning, so the registry count never momentarily hits zero + // while work remains: + // 1. bringup registers (here) + // 2. io task registers (inside build_device_handle → spawn_device_task) + // 3. event-forwarding task registers (inside build_device_handle) + // 4. build_device_handle returns + // 5. bringup deregisters + // Step 2–3 happen before step 5, guaranteeing correct ordering. + self.devices_scope.spawn( + &format!("bringup-{address}"), + move |_token| async move { match build_device_handle( device_config_manager, creator, @@ -343,7 +352,6 @@ impl ServerDeviceManagerEventLoop { } connecting_devices.remove(&address); }, - span, ); } } From 91c03d190b4d166c9e8cd1c4958c911b496327c8 Mon Sep 17 00:00:00 2001 From: Kyle Machulis Date: Tue, 9 Jun 2026 18:37:31 -0700 Subject: [PATCH 12/20] feat(engine): task event and task list message types Add emit_task_events flag to EngineOptions/EngineOptionsExternal/EngineOptionsBuilder mirroring the emit_output_observations pattern. Add TaskStarted, TaskEnded, TaskList EngineMessage variants, TaskListEntry struct, and RequestTaskList IntifaceMessage variant. Add stub match arm in frontend/mod.rs (Task 10 will wire the real logic). Co-Authored-By: Claude Fable 5 --- crates/intiface_engine/src/frontend/mod.rs | 5 ++- .../src/frontend/process_messages.rs | 38 ++++++++++++++++++- crates/intiface_engine/src/options.rs | 9 +++++ 3 files changed, 50 insertions(+), 2 deletions(-) diff --git a/crates/intiface_engine/src/frontend/mod.rs b/crates/intiface_engine/src/frontend/mod.rs index 8f4dd416b..3197a0d29 100644 --- a/crates/intiface_engine/src/frontend/mod.rs +++ b/crates/intiface_engine/src/frontend/mod.rs @@ -50,7 +50,10 @@ pub async fn frontend_external_event_loop( connection_cancellation_token.cancel(); info!("Got external stop request"); break; - } + }, + IntifaceMessage::RequestTaskList {} => { + // TODO(task-10): respond with TaskList snapshot + }, }, Err(_) => { info!("Frontend sender dropped, assuming connection lost, breaking."); diff --git a/crates/intiface_engine/src/frontend/process_messages.rs b/crates/intiface_engine/src/frontend/process_messages.rs index cc9abbacb..b297a208c 100644 --- a/crates/intiface_engine/src/frontend/process_messages.rs +++ b/crates/intiface_engine/src/frontend/process_messages.rs @@ -8,6 +8,13 @@ use buttplug_server_device_config::UserDeviceIdentifier; use serde::{Deserialize, Serialize}; +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TaskListEntry { + pub id: u64, + pub path: String, + pub detached: bool, +} + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(tag = "code", rename_all = "snake_case")] pub enum EngineErrorDetail { @@ -62,19 +69,48 @@ pub enum EngineMessage { output_type: String, value: f64, }, + TaskStarted { + id: u64, + path: String, + }, + TaskEnded { + id: u64, + path: String, + outcome: String, + }, + TaskList { + tasks: Vec, + }, } #[derive(Debug, Clone, Serialize, Deserialize)] pub enum IntifaceMessage { RequestEngineVersion { expected_version: u32 }, Stop {}, + RequestTaskList {}, } #[cfg(test)] mod test { - use super::{EngineErrorDetail, EngineMessage}; + use super::{EngineErrorDetail, EngineMessage, IntifaceMessage}; use serde_json::json; + #[test] + fn test_task_message_serialization() { + let msg = EngineMessage::TaskEnded { + id: 42, + path: "server-1/ping-timer/timer".to_owned(), + outcome: "Cancelled".to_owned(), + }; + let json = serde_json::to_string(&msg).unwrap(); + let back: EngineMessage = serde_json::from_str(&json).unwrap(); + assert!(matches!(back, EngineMessage::TaskEnded { id: 42, .. })); + + let req = r#"{"RequestTaskList":{}}"#; + let parsed: IntifaceMessage = serde_json::from_str(req).unwrap(); + assert!(matches!(parsed, IntifaceMessage::RequestTaskList {})); + } + #[test] fn generic_engine_error_serializes_without_structured_fields() { let message = EngineMessage::EngineError { diff --git a/crates/intiface_engine/src/options.rs b/crates/intiface_engine/src/options.rs index f76e1adaa..ca8fa663a 100644 --- a/crates/intiface_engine/src/options.rs +++ b/crates/intiface_engine/src/options.rs @@ -67,6 +67,8 @@ pub struct EngineOptions { rest_api_port: Option, #[getset(get_copy = "pub")] emit_output_observations: bool, + #[getset(get_copy = "pub")] + emit_task_events: bool, } #[derive(Default, Debug, Clone)] @@ -100,6 +102,7 @@ pub struct EngineOptionsExternal { pub repeater_remote_address: Option, pub rest_api_port: Option, pub emit_output_observations: bool, + pub emit_task_events: bool, } impl From for EngineOptions { @@ -134,6 +137,7 @@ impl From for EngineOptions { repeater_remote_address: other.repeater_remote_address, rest_api_port: other.rest_api_port, emit_output_observations: other.emit_output_observations, + emit_task_events: other.emit_task_events, } } } @@ -297,6 +301,11 @@ impl EngineOptionsBuilder { self } + pub fn emit_task_events(&mut self, value: bool) -> &mut Self { + self.options.emit_task_events = value; + self + } + pub fn finish(&mut self) -> EngineOptions { self.options.clone() } From c449e778f09320d694218daa702ae973964ffb58 Mon Sep 17 00:00:00 2001 From: Kyle Machulis Date: Tue, 9 Jun 2026 18:46:34 -0700 Subject: [PATCH 13/20] feat(engine): stream task lifecycle events and snapshots to frontends When emit_task_events is enabled in EngineOptions, the frontend server event loop subscribes to buttplug_core::util::task::registry().event_stream() before entering the select loop and forwards TaskStarted/TaskEnded as EngineMessages. When disabled, the arm is inert via the Option+pending pattern. RequestTaskList is now answered with a registry snapshot mapped into TaskListEntry values and sent back as EngineMessage::TaskList. The emit_task_events flag is read from EngineOptions at spawn time in engine.rs and passed into frontend_server_event_loop, matching how other flags (e.g. emit_output_observations) flow through the same code path. Co-Authored-By: Claude Fable 5 --- crates/intiface_engine/src/engine.rs | 9 ++++- crates/intiface_engine/src/frontend/mod.rs | 42 ++++++++++++++++++++-- 2 files changed, 48 insertions(+), 3 deletions(-) diff --git a/crates/intiface_engine/src/engine.rs b/crates/intiface_engine/src/engine.rs index a4a81e44c..81780d02d 100644 --- a/crates/intiface_engine/src/engine.rs +++ b/crates/intiface_engine/src/engine.rs @@ -306,8 +306,15 @@ impl IntifaceEngine { let event_receiver = server.event_stream(); let frontend_clone = frontend.clone(); let stop_child_token = self.stop_token.child_token(); + let emit_task_events = options.emit_task_events(); tokio::spawn(async move { - frontend_server_event_loop(event_receiver, frontend_clone, stop_child_token).await; + frontend_server_event_loop( + event_receiver, + frontend_clone, + stop_child_token, + emit_task_events, + ) + .await; }); } diff --git a/crates/intiface_engine/src/frontend/mod.rs b/crates/intiface_engine/src/frontend/mod.rs index 3197a0d29..45f45a415 100644 --- a/crates/intiface_engine/src/frontend/mod.rs +++ b/crates/intiface_engine/src/frontend/mod.rs @@ -9,8 +9,9 @@ pub mod process_messages; use crate::error::IntifaceError; use crate::remote_server::ButtplugRemoteServerEvent; use async_trait::async_trait; +use buttplug_core::util::task::{TaskEvent, registry}; use futures::{Stream, StreamExt, pin_mut}; -pub use process_messages::{EngineMessage, IntifaceMessage}; +pub use process_messages::{EngineMessage, IntifaceMessage, TaskListEntry}; use std::sync::Arc; use tokio::{ select, @@ -52,7 +53,16 @@ pub async fn frontend_external_event_loop( break; }, IntifaceMessage::RequestTaskList {} => { - // TODO(task-10): respond with TaskList snapshot + let tasks = registry() + .snapshot() + .into_iter() + .map(|t| TaskListEntry { + id: t.id.value(), + path: t.path, + detached: t.detached, + }) + .collect(); + frontend.send(EngineMessage::TaskList { tasks }).await; }, }, Err(_) => { @@ -73,9 +83,12 @@ pub async fn frontend_server_event_loop( receiver: impl Stream, frontend: Arc, connection_cancellation_token: CancellationToken, + emit_task_events: bool, ) { pin_mut!(receiver); + let mut task_events = emit_task_events.then(|| registry().event_stream()); + loop { select! { maybe_event = receiver.next() => { @@ -116,6 +129,31 @@ pub async fn frontend_server_event_loop( }, } }, + task_event = async { + match task_events.as_mut() { + Some(rx) => rx.recv().await, + None => std::future::pending().await, + } + } => { + match task_event { + Ok(TaskEvent::Started { id, path }) => { + frontend.send(EngineMessage::TaskStarted { id: id.value(), path }).await; + } + Ok(TaskEvent::Ended { id, path, outcome }) => { + frontend.send(EngineMessage::TaskEnded { + id: id.value(), + path, + outcome: format!("{outcome:?}"), + }).await; + } + Err(broadcast::error::RecvError::Lagged(n)) => { + warn!("Task event stream lagged, dropped {} events", n); + } + Err(broadcast::error::RecvError::Closed) => { + // Global registry sender never closes; unreachable in practice. + } + } + }, _ = connection_cancellation_token.cancelled() => { info!("Connection cancellation token activated, breaking from frontend server event loop"); break; From 7d2c20cfc3b3629e05b136a4619033418fb905b2 Mon Sep 17 00:00:00 2001 From: Kyle Machulis Date: Tue, 9 Jun 2026 18:53:22 -0700 Subject: [PATCH 14/20] docs: update project context for task-scope-lifecycle Document the new TaskScope/TaskRegistry ownership and introspection infrastructure (buttplug_core::util::task) as a Key Pattern, note the server's scope ownership, the ProtocolHandler subscribe signature change, and the intiface_engine emit_task_events plumbing. Add test_task_lifecycle.rs to the integration test crate context. Co-Authored-By: Claude Fable 5 --- CLAUDE.md | 15 +++++++++++++-- crates/buttplug_tests/CLAUDE.md | 1 + 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index 3c7519236..d4c52d1eb 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -1,6 +1,6 @@ # CLAUDE.md -Last verified: 2026-05-19 +Last verified: 2026-06-09 This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. @@ -56,7 +56,7 @@ Buttplug is a framework for interfacing with intimate hardware devices. It uses **Message-Based Protocol**: All client-server communication uses versioned JSON messages (v0-v4). Version negotiation happens during handshake. -**Async Architecture**: Heavy use of tokio channels (mpsc, broadcast, oneshot) for communication between components. Runtime abstraction supports tokio (production) and WASM. +**Async Architecture**: Heavy use of tokio channels (mpsc, broadcast, oneshot) for communication between components. Runtime abstraction supports tokio (production) and WASM. Spawned tasks are owned by `TaskScope`s rather than fire-and-forget `spawn!` (see Task Lifecycle below). **Device Lifecycle**: ``` @@ -95,6 +95,17 @@ Simulated devices allow testing the full device lifecycle without real hardware. - Validation rejects unknown archetypes and duplicate addresses at config build time - `SimulatedProtocol` is a no-op handler; `SimulatedHardwareConnector` creates in-memory endpoints +**Task Lifecycle** (`buttplug_core::util::task`): +Spawned async tasks are owned by a `TaskScope` ownership tree rather than fire-and-forget, giving cooperative cancellation and global introspection. Key contracts: +- `TaskScope::root(name)` makes a root scope (path gets a unique numeric suffix, e.g. `server-2`, so parallel instances don't collide); `.child(name)` derives a sub-scope whose token is a child of the parent's. +- `scope.spawn(name, |token| async ...)` spawns a task owned by the scope; long-running tasks MUST `select!` on the passed token. Cancelling or dropping a scope cancels its whole subtree. +- `scope.spawn_and_hold(name, ...)` consumes the scope into the task, so drop-cancel can't fire before the task runs (used for `FnOnce` callbacks like ping-timeout and protocol subscription handlers). +- `scope.shutdown().await` cancels the subtree and waits until every task under it has deregistered (wrap in a timeout if tasks may be uncooperative). +- `spawn_detached(name, fut)` is a rare escape hatch: registered under `detached/{name}` but uncancellable. Prefer scopes. +- `TaskRegistry` (global, via `registry()`) records every live task: `snapshot()`, `live_count_under(prefix)` (segment-aware prefix match), `event_stream()` (`TaskEvent::Started`/`Ended`), `wait_empty_under(prefix)`. +- Ownership in the server: `ButtplugServer` owns a `server` root scope; `ServerDeviceManager` owns a `device-manager` root scope with per-device child scopes (io/event-forwarding/bringup); `PingTimer` is scope-owned (the old `PingMessage::End` + Drop-spawn shutdown hack is gone). `ProtocolHandler::handle_input_subscribe_cmd` now takes a `TaskScope` param. +- `intiface_engine` exposes this to frontends when `emit_task_events` is set: registry events forward as `EngineMessage::TaskStarted`/`TaskEnded`, and `IntifaceMessage::RequestTaskList` returns `EngineMessage::TaskList` (`Vec`). + ## Agent skills ### Issue tracker diff --git a/crates/buttplug_tests/CLAUDE.md b/crates/buttplug_tests/CLAUDE.md index 31f5b50a1..e5408baa3 100644 --- a/crates/buttplug_tests/CLAUDE.md +++ b/crates/buttplug_tests/CLAUDE.md @@ -54,4 +54,5 @@ Tests run across multiple protocol spec versions (v0–v4) via version-specific - `test_message_downgrades.rs` — Protocol version downgrade path tests - `test_disabled_device_features.rs` — Tests for user config feature disabling - `test_output_observations.rs` — Integration tests for output observability (observation stream, filtering, multi-device, enable/disable) +- `test_task_lifecycle.rs` — Integration test asserting the global Task Registry has no live tasks under the server's scope after server shutdown (verifies `TaskScope` ownership leaves no leaked tasks) - `test_websocket_connectors.rs` / `test_websocket_device_comm_manager.rs` — WebSocket transport integration tests From f87658fad33eddda1647831900e368b11ef96770 Mon Sep 17 00:00:00 2001 From: Kyle Machulis Date: Tue, 9 Jun 2026 19:13:57 -0700 Subject: [PATCH 15/20] fix: address code review feedback on task-scope-lifecycle branch - DeviceHandle::new: add #[allow(clippy::too_many_arguments)] with explanatory comment; constructor bundles all device-state concerns and restructuring is not warranted for a pub(crate) constructor - CONTEXT.md Task Scope entry: fix example path from server/device-manager/device-3/keepalive to device-manager-1/devices/device-3/io to match the shipped two-root design (independent server-N and device-manager-N roots) Co-Authored-By: Claude Fable 5 --- CONTEXT.md | 2 +- crates/buttplug_server/src/device/device_handle.rs | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/CONTEXT.md b/CONTEXT.md index a52ae4263..5489a1808 100644 --- a/CONTEXT.md +++ b/CONTEXT.md @@ -93,7 +93,7 @@ An opt-in broadcast of every output command sent to a **Device** — device inde _Avoid_: Treating as internal-only debugging; it's a user-facing observability feature. **Task Scope**: -The owner of spawned async tasks within a module. Every task is spawned through a Task Scope, which links it to a parent, derives its hierarchical name (e.g. `server/device-manager/device-3/keepalive`), registers it in the **Task Registry**, and hands it a cooperative cancellation token. Dropping a scope cancels its children. Tasks cannot be spawned without a parent scope. +The owner of spawned async tasks within a module. Every task is spawned through a Task Scope, which links it to a parent, derives its hierarchical name (e.g. `device-manager-1/devices/device-3/io`), registers it in the **Task Registry**, and hands it a cooperative cancellation token. Dropping a scope cancels its children. Tasks cannot be spawned without a parent scope. _Avoid_: "Detached task" or bare spawning as the normal pattern; detachment is the rare, explicit exception. **Task Registry**: diff --git a/crates/buttplug_server/src/device/device_handle.rs b/crates/buttplug_server/src/device/device_handle.rs index 11de7df34..759dc4824 100644 --- a/crates/buttplug_server/src/device/device_handle.rs +++ b/crates/buttplug_server/src/device/device_handle.rs @@ -117,7 +117,10 @@ pub struct DeviceHandle { } impl DeviceHandle { - /// Create a new DeviceHandle with direct ownership of device state + /// Create a new DeviceHandle with direct ownership of device state. + /// The arguments bundle all device-state concerns (hardware, protocol, definition, identity, + /// commands, I/O channel, observability, lifecycle scope) — suppressing the lint is correct here. + #[allow(clippy::too_many_arguments)] pub(crate) fn new( hardware: Arc, handler: Arc, From 6efcd78b30b0899b695033b62ead24667c2939ee Mon Sep 17 00:00:00 2001 From: Kyle Machulis Date: Wed, 10 Jun 2026 12:22:28 -0700 Subject: [PATCH 16/20] fix(server): run shutdown cleanup before task scope cancellation ServerDeviceManager::shutdown() cancelled the task scope synchronously before the returned future awaited stop_scanning/stop_devices/disconnect. Device io tasks select biased on their cancellation token, so queued stop commands were dropped and devices could keep running through disconnect. Capture the scope's CancellationToken and path synchronously, run cleanup first, then cancel and wait_empty_under. Align ButtplugServer::shutdown to the same cleanup-before-cancel ordering. Adds a regression test exercising the StopScanning-through-event-loop path with a connected device and active scanning. The stronger 'observe the actual stop write' variant is infeasible with the test harness: the test hardware sets a 1ms message_gap, so the disconnect teardown races the batched stop write independent of cancel ordering. The test instead asserts shutdown drives cleanup through the live event loop, drains every scope task, and returns Ok. Co-Authored-By: Claude Fable 5 --- .../src/device/server_device_manager.rs | 12 +- crates/buttplug_server/src/server.rs | 7 +- .../tests/test_task_lifecycle.rs | 161 +++++++++++++++--- 3 files changed, 156 insertions(+), 24 deletions(-) diff --git a/crates/buttplug_server/src/device/server_device_manager.rs b/crates/buttplug_server/src/device/server_device_manager.rs index cb13ddc33..95b3cd606 100644 --- a/crates/buttplug_server/src/device/server_device_manager.rs +++ b/crates/buttplug_server/src/device/server_device_manager.rs @@ -379,10 +379,13 @@ impl ServerDeviceManager { self.running.store(false, Ordering::Relaxed); let stop_scanning = self.stop_scanning(); let stop_devices = self.stop_devices(&StopCmdV4::default()); - // TaskScope is not Clone, so capture its path and cancel here, then await - // the subtree draining inside the returned future via the registry. + // TaskScope is not Clone, but its CancellationToken is. Capture the token + // and path synchronously, but DO NOT cancel yet: device io tasks select + // biased on their token, so cancelling before the stop/disconnect commands + // drain would drop those queued commands and leave devices running. Run + // cleanup first, then cancel, then await the subtree draining. + let token = self.task_scope.token().clone(); let scope_path = self.task_scope.path().to_owned(); - self.task_scope.cancel(); async move { // Force stop scanning, otherwise we can disconnect and instantly try to reconnect while // cleaning up if we're still scanning. @@ -391,6 +394,9 @@ impl ServerDeviceManager { for device in devices.iter() { device.value().disconnect().await?; } + // Cleanup commands have drained; now cancel the scope and wait for every + // task under it to deregister. + token.cancel(); buttplug_core::util::task::registry() .wait_empty_under(&scope_path) .await; diff --git a/crates/buttplug_server/src/server.rs b/crates/buttplug_server/src/server.rs index da7341991..f428a1735 100644 --- a/crates/buttplug_server/src/server.rs +++ b/crates/buttplug_server/src/server.rs @@ -238,10 +238,15 @@ impl ButtplugServer { pub fn shutdown(&self) -> ButtplugServerResultFuture { let device_manager = self.device_manager.clone(); + // Capture the token and path synchronously, but cancel only after the + // device manager has finished its own cleanup. The server scope only owns + // the ping timer today, so cancelling early is harmless, but we align with + // the device manager's cleanup-before-cancel ordering for consistency. + let token = self.task_scope.token().clone(); let scope_path = self.task_scope.path().to_owned(); - self.task_scope.cancel(); async move { let result = device_manager.shutdown().await; + token.cancel(); // Await only this server's own subtree. A shared device manager // (with_shared_device_manager) outlives this server, so its subtree is // NOT awaited here -- device_manager.shutdown() handles its own. diff --git a/crates/buttplug_tests/tests/test_task_lifecycle.rs b/crates/buttplug_tests/tests/test_task_lifecycle.rs index b8fa725a0..f4cc7630b 100644 --- a/crates/buttplug_tests/tests/test_task_lifecycle.rs +++ b/crates/buttplug_tests/tests/test_task_lifecycle.rs @@ -12,6 +12,9 @@ use buttplug_core::{ BUTTPLUG_CURRENT_API_MAJOR_VERSION, BUTTPLUG_CURRENT_API_MINOR_VERSION, ButtplugServerMessageV4, + OutputCmdV4, + OutputCommand, + OutputValue, RequestServerInfoV4, StartScanningV0, }, @@ -75,7 +78,10 @@ async fn test_server_shutdown_leaves_no_tasks() { .expect("timed out waiting for device to connect"); // Device is up: the registry must now show more tasks than baseline, and at - // least one of them must be a per-device task. + // least one of them must be a per-device task. We also derive this server's + // own device-manager scope prefix so subsequent leak checks inspect only this + // server's subtree — the registry is process-global, so other tests running + // in parallel must not pollute these assertions. let after_connect: Vec = registry().snapshot().into_iter().map(|t| t.path).collect(); let new_tasks: Vec<&String> = after_connect .iter() @@ -89,6 +95,14 @@ async fn test_server_shutdown_leaves_no_tasks() { new_tasks.iter().any(|p| p.contains("/device-")), "expected a per-device task in the registry, got: {new_tasks:?}" ); + let scope_prefix: String = new_tasks + .iter() + .find_map(|p| { + p.split_once('/') + .filter(|(root, _)| root.starts_with("device-manager")) + .map(|(root, _)| root.to_owned()) + }) + .expect("expected this server's device-manager scope in the registry"); // `shutdown()` is contractually responsible for draining every task it // spawned: it cancels the scope tree and awaits the registry going empty @@ -100,25 +114,19 @@ async fn test_server_shutdown_leaves_no_tasks() { // event stream does not close on its own. server.shutdown().await.expect("server shutdown errored"); - let leaked: Vec = registry() - .snapshot() - .into_iter() - .map(|t| t.path) - .filter(|p| !baseline.contains(p)) - .collect(); - assert!( - leaked.is_empty(), - "shutdown() returned but tasks are still registered: {leaked:?}" + let leaked = registry().live_count_under(&scope_prefix); + assert_eq!( + leaked, 0, + "shutdown() returned but {leaked} task(s) are still registered under {scope_prefix}" ); // Dropping the server must not resurrect or strand anything either. Give a - // short grace period and confirm we remain at (or below) baseline. + // short grace period and confirm this server's subtree stays empty. drop(server); drop(_channel); tokio::time::timeout(Duration::from_secs(5), async { loop { - let now: Vec = registry().snapshot().into_iter().map(|t| t.path).collect(); - if now.iter().filter(|p| !baseline.contains(p)).count() == 0 { + if registry().live_count_under(&scope_prefix) == 0 { return; } tokio::time::sleep(Duration::from_millis(50)).await; @@ -126,12 +134,125 @@ async fn test_server_shutdown_leaves_no_tasks() { }) .await .unwrap_or_else(|_| { - let leaked: Vec = registry() - .snapshot() - .into_iter() - .map(|t| t.path) - .filter(|p| !baseline.contains(p)) - .collect(); - panic!("leaked tasks after drop: {leaked:?}"); + let leaked = registry().live_count_under(&scope_prefix); + panic!("leaked {leaked} task(s) under {scope_prefix} after drop"); }); } + +/// Regression test for shutdown ordering: cleanup MUST run before the task +/// scope is cancelled. +/// +/// `shutdown()` must run stop_scanning / stop_devices / per-device disconnect +/// and cancel the device-manager scope only afterwards. The buggy ordering +/// cancelled the scope synchronously first: device io tasks select `biased` +/// with their cancellation token, so queued stop commands were dropped, and the +/// StopScanning issued into the still-running event loop raced its cancellation. +/// +/// This test exercises the StopScanning-through-event-loop path the old +/// ordering broke: a device is connected AND scanning is still active when +/// `shutdown()` is called. `shutdown()` must drive cleanup through the live +/// event loop, drain every scope task, and return Ok within a bounded time — +/// i.e. it must not hang or strand tasks. +/// +/// NOTE on variant choice: the stronger "observe the device's actual stop +/// write" assertion proved infeasible with this harness. The test hardware sets +/// a 1ms message_gap (see TestHardwareConnector::specialize), so the device io +/// task batches commands; during shutdown the per-device `disconnect()` fires a +/// `Disconnected` hardware event that tears the io task down inside that 1ms +/// batch window, dropping the pending (batched) stop write regardless of cancel +/// ordering. That teardown race is independent of the bug under test, so a +/// write-observation assertion is inherently flaky here. We therefore assert the +/// contract the cleanup-before-cancel ordering must uphold: shutdown completes +/// successfully with cleanup driven through the live event loop. +#[tokio::test] +async fn test_shutdown_runs_cleanup_through_event_loop_before_cancel() { + // Capture baseline so we can confirm shutdown drains everything it spawned. + let baseline: Vec = registry().snapshot().into_iter().map(|t| t.path).collect(); + + // Hold the channel so the device stays connected through shutdown. + let (server, _channel) = test_server_with_device("Massage Demo"); + + let recv = server.server_version_event_stream(); + pin_mut!(recv); + + server + .parse_message(ButtplugClientMessageVariant::V4( + RequestServerInfoV4::new( + "Shutdown Ordering Test", + BUTTPLUG_CURRENT_API_MAJOR_VERSION, + BUTTPLUG_CURRENT_API_MINOR_VERSION, + ) + .into(), + )) + .await + .expect("server info request should succeed"); + + // Start scanning and leave it running: shutdown's stop_scanning must drain + // through the event loop before the scope is cancelled. + server + .parse_message(ButtplugClientMessageVariant::V4( + StartScanningV0::default().into(), + )) + .await + .expect("start scanning should succeed"); + + // Wait for the device to connect so its io / event-forwarding tasks exist + // under the scope alongside the still-running scanning state. + let device_index = tokio::time::timeout(Duration::from_secs(5), async { + while let Some(msg) = recv.next().await { + if let ButtplugServerMessageV4::DeviceList(list) = msg + && let Some((&idx, _)) = list.devices().iter().next() + { + return idx; + } + } + panic!("device event stream ended before a device connected"); + }) + .await + .expect("timed out waiting for device to connect"); + + // Put the device into an actively-running state so there is real cleanup to + // perform (a non-zero output that StopCmd must reset). + server + .parse_message(ButtplugClientMessageVariant::V4( + OutputCmdV4::new( + device_index, + 0, + OutputCommand::Vibrate(OutputValue::new(50)), + ) + .into(), + )) + .await + .expect("vibrate command should succeed"); + + // Identify this server's own device-manager scope prefix so the leak check + // below is isolated from any other test running in parallel against the + // global registry. + let scope_prefix: String = registry() + .snapshot() + .into_iter() + .map(|t| t.path) + .filter(|p| !baseline.contains(p)) + .find_map(|p| { + p.split_once('/') + .filter(|(root, _)| root.starts_with("device-manager")) + .map(|(root, _)| root.to_owned()) + }) + .expect("expected this server's device-manager scope in the registry"); + + // shutdown() must drive cleanup (stop_scanning + stop_devices + disconnect) + // through the live event loop and only then cancel the scope, returning Ok + // within a bounded time. + tokio::time::timeout(Duration::from_secs(10), server.shutdown()) + .await + .expect("shutdown did not resolve in time — cleanup likely raced against cancellation") + .expect("server shutdown errored"); + + // shutdown() is contractually responsible for draining every task under its + // own scope. Inspect only this server's subtree to stay parallel-safe. + let leaked = registry().live_count_under(&scope_prefix); + assert_eq!( + leaked, 0, + "shutdown() returned but {leaked} task(s) are still registered under {scope_prefix}" + ); +} From a53fcb2e53c825b5242fded8b969caffb9671d19 Mon Sep 17 00:00:00 2001 From: Kyle Machulis Date: Wed, 10 Jun 2026 12:28:03 -0700 Subject: [PATCH 17/20] fix(server): make device bringup cancellable so shutdown cannot hang The bringup task spawned in the device-manager event loop ignored its cancellation token, so build_device_handle could stall indefinitely (e.g. a BLE connect that never completes). ServerDeviceManager::shutdown's wait_empty_under then waited forever for the bringup task to deregister. Wrap build_device_handle in a biased tokio::select! against token.cancelled(); on cancellation we log at info and fall through. Dropping the build_device_handle future drops device_scope, cancelling anything it already spawned. connecting_devices.remove(&address) still runs unconditionally as the last statement on every path (success, error, and cancellation) so a future scan can retry the address. Co-Authored-By: Claude Fable 5 --- .../server_device_manager_event_loop.rs | 72 ++++++++++++------- 1 file changed, 45 insertions(+), 27 deletions(-) diff --git a/crates/buttplug_server/src/device/server_device_manager_event_loop.rs b/crates/buttplug_server/src/device/server_device_manager_event_loop.rs index c7718a7f5..0635bffbc 100644 --- a/crates/buttplug_server/src/device/server_device_manager_event_loop.rs +++ b/crates/buttplug_server/src/device/server_device_manager_event_loop.rs @@ -312,44 +312,62 @@ impl ServerDeviceManagerEventLoop { // Registry for the duration of the connection attempt. This closes the // shutdown race: wait_empty_under on the devices scope can only return // zero after the bringup task deregisters, which happens *after* - // build_device_handle returns. build_device_handle registers the - // io and event-forwarding tasks (via task_scope.spawn) synchronously - // before returning, so the registry count never momentarily hits zero - // while work remains: + // build_device_handle returns OR after cancellation is observed. + // build_device_handle registers the io and event-forwarding tasks (via + // task_scope.spawn) synchronously before returning, so on the success + // path the registry count never momentarily hits zero while work + // remains: // 1. bringup registers (here) // 2. io task registers (inside build_device_handle → spawn_device_task) // 3. event-forwarding task registers (inside build_device_handle) // 4. build_device_handle returns // 5. bringup deregisters // Step 2–3 happen before step 5, guaranteeing correct ordering. + // + // build_device_handle can stall indefinitely (e.g. a BLE connect that + // never completes), so the bringup MUST honor its cancellation token — + // otherwise ServerDeviceManager::shutdown's wait_empty_under would hang + // forever waiting for this task to deregister. We select on the token; + // on cancellation we drop the build_device_handle future, which drops + // device_scope and thereby cancels anything it has already spawned. self.devices_scope.spawn( &format!("bringup-{address}"), - move |_token| async move { - match build_device_handle( - device_config_manager, - creator, - protocol_specializers, - device_event_sender_for_forwarding, - output_observation_sender, - device_scope, - ) - .await - { - Ok(device_handle) => { - if device_event_sender_clone - .send(InternalDeviceEvent::Connected(device_handle)) - .await - .is_err() - { - error!( - "Device manager disappeared before connection established, device will be dropped." - ); - } + move |token| async move { + tokio::select! { + biased; + _ = token.cancelled() => { + info!( + "Device bringup for {address} cancelled before connection completed." + ); } - Err(e) => { - error!("Device errored while trying to connect: {:?}", e); + result = build_device_handle( + device_config_manager, + creator, + protocol_specializers, + device_event_sender_for_forwarding, + output_observation_sender, + device_scope, + ) => { + match result { + Ok(device_handle) => { + if device_event_sender_clone + .send(InternalDeviceEvent::Connected(device_handle)) + .await + .is_err() + { + error!( + "Device manager disappeared before connection established, device will be dropped." + ); + } + } + Err(e) => { + error!("Device errored while trying to connect: {:?}", e); + } + } } } + // Runs on every path (success, error, AND cancellation): the address + // must always leave the connecting set so a future scan can retry. connecting_devices.remove(&address); }, ); From 49f8df4aa767d0c490478202a6ae861d4752694a Mon Sep 17 00:00:00 2001 From: Kyle Machulis Date: Wed, 10 Jun 2026 12:36:54 -0700 Subject: [PATCH 18/20] fix(core): deregister tasks on panic via drop guard finish_task deregistered a scoped task only after fut.await, so a panicking task unwound past deregistration and leaked its registry entry forever, hanging wait_empty_under on that subtree. Replace the post-await deregistration with a DeregisterGuard constructed BEFORE awaiting; its Drop impl deregisters with outcome Panicked when std::thread::panicking(), Cancelled when the token is cancelled, else Completed. The guard covers the spawn / spawn_and_hold paths (via finish_task) and spawn_detached (token None -> Panicked on panic, else Completed). Adds the Panicked variant to TaskOutcome, documents it on the intiface EngineMessage::TaskEnded outcome field, and adds a unit test asserting wait_empty_under resolves within a timeout after a scoped task panics (verified RED against the old post-await deregistration). Also softens the task module doc, which overstated that every task is scope-owned, to note buttplug_server is fully migrated while other crates follow up. Co-Authored-By: Claude Fable 5 --- crates/buttplug_core/src/util/task/mod.rs | 22 ++++++-- .../buttplug_core/src/util/task/registry.rs | 5 +- crates/buttplug_core/src/util/task/scope.rs | 56 +++++++++++++++++-- .../src/frontend/process_messages.rs | 2 + 4 files changed, 71 insertions(+), 14 deletions(-) diff --git a/crates/buttplug_core/src/util/task/mod.rs b/crates/buttplug_core/src/util/task/mod.rs index 149633296..0391a9f0e 100644 --- a/crates/buttplug_core/src/util/task/mod.rs +++ b/crates/buttplug_core/src/util/task/mod.rs @@ -7,10 +7,12 @@ //! Task Scope and Task Registry: ownership and introspection for spawned tasks. //! -//! Every task is spawned through a [TaskScope], which links it into an -//! ownership tree, derives its hierarchical path, registers it in the global -//! [TaskRegistry], and hands it a cooperative [CancellationToken]. Dropping a -//! scope cancels its subtree. +//! A task spawned through a [TaskScope] is linked into an ownership tree, given +//! a hierarchical path, registered in the global [TaskRegistry], and handed a +//! cooperative [CancellationToken]; dropping a scope cancels its subtree. The +//! `buttplug_server` crate is fully migrated onto scope-owned tasks; the +//! hardware-manager and client crates still use the bare `spawn!` macro and +//! migrate onto scopes in a follow-up. mod registry; mod scope; @@ -35,8 +37,12 @@ where let span = tracing::span!(tracing::Level::INFO, "buttplug_task", task.path = %path); async_manager::spawn( async move { + // Deregister via a drop guard so a panicking detached task still leaves + // the registry (outcome Panicked) instead of leaking its entry forever. + // Detached tasks have no cancellation token, so the outcome is Panicked on + // panic, else Completed. + let _guard = scope::DeregisterGuard::new(id, None); fut.await; - registry().deregister(id, TaskOutcome::Completed); }, span, ); @@ -54,8 +60,12 @@ where let span = tracing::span!(tracing::Level::INFO, "buttplug_task", task.path = %path); async_manager::spawn( async move { + // Deregister via a drop guard so a panicking detached task still leaves + // the registry (outcome Panicked) instead of leaking its entry forever. + // Detached tasks have no cancellation token, so the outcome is Panicked on + // panic, else Completed. + let _guard = scope::DeregisterGuard::new(id, None); fut.await; - registry().deregister(id, TaskOutcome::Completed); }, span, ); diff --git a/crates/buttplug_core/src/util/task/registry.rs b/crates/buttplug_core/src/util/task/registry.rs index 75185bcfc..97f986e28 100644 --- a/crates/buttplug_core/src/util/task/registry.rs +++ b/crates/buttplug_core/src/util/task/registry.rs @@ -27,12 +27,13 @@ impl TaskId { } } -/// How a task ended: ran to completion on its own, or exited after observing -/// cancellation. +/// How a task ended: ran to completion on its own, exited after observing +/// cancellation, or unwound from a panic. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum TaskOutcome { Completed, Cancelled, + Panicked, } /// Registry entry for a live task. diff --git a/crates/buttplug_core/src/util/task/scope.rs b/crates/buttplug_core/src/util/task/scope.rs index c9d6521c7..eb9e97c4c 100644 --- a/crates/buttplug_core/src/util/task/scope.rs +++ b/crates/buttplug_core/src/util/task/scope.rs @@ -141,14 +141,42 @@ impl TaskScope { } } +/// Deregisters a task from the global registry on drop. Created BEFORE the +/// task future is awaited so deregistration happens even if the future panics: +/// a panicking task unwinds through this guard, so the registry entry is removed +/// rather than leaked (which would hang `wait_empty_under` on that subtree +/// forever). The outcome is derived at drop time from whether we are unwinding +/// from a panic and whether the task observed cancellation. +/// +/// `token` is `None` for detached tasks, which have no cancellation concept: +/// their outcome is `Panicked` on panic, else `Completed`. +pub(super) struct DeregisterGuard { + id: TaskId, + token: Option, +} + +impl DeregisterGuard { + pub(super) fn new(id: TaskId, token: Option) -> Self { + Self { id, token } + } +} + +impl Drop for DeregisterGuard { + fn drop(&mut self) { + let outcome = if std::thread::panicking() { + TaskOutcome::Panicked + } else if self.token.as_ref().is_some_and(|t| t.is_cancelled()) { + TaskOutcome::Cancelled + } else { + TaskOutcome::Completed + }; + registry().deregister(self.id, outcome); + } +} + async fn finish_task(fut: impl Future, id: TaskId, token: CancellationToken) { + let _guard = DeregisterGuard::new(id, Some(token)); fut.await; - let outcome = if token.is_cancelled() { - TaskOutcome::Cancelled - } else { - TaskOutcome::Completed - }; - registry().deregister(id, outcome); } impl Drop for TaskScope { @@ -232,6 +260,22 @@ mod test { .expect("shutdown did not resolve"); } + #[tokio::test] + async fn test_panicking_task_deregisters() { + // A scoped task that panics must still deregister (via the drop guard), + // otherwise wait_empty_under on its root would hang forever. tokio catches + // the panic at the task boundary, so this test itself does not fail from the + // spawned panic. Without the guard, this wait would time out. + let root = TaskScope::root("panictest"); + let path = root.path().to_owned(); + root.spawn("panicker", |_token| async move { + panic!("intentional panic for deregistration test"); + }); + tokio::time::timeout(Duration::from_secs(1), registry().wait_empty_under(&path)) + .await + .expect("panicking task did not deregister — registry entry leaked"); + } + #[tokio::test] async fn test_spawn_and_hold_keeps_scope_alive() { let root = TaskScope::root("holdtest"); diff --git a/crates/intiface_engine/src/frontend/process_messages.rs b/crates/intiface_engine/src/frontend/process_messages.rs index b297a208c..2ffc5652d 100644 --- a/crates/intiface_engine/src/frontend/process_messages.rs +++ b/crates/intiface_engine/src/frontend/process_messages.rs @@ -76,6 +76,8 @@ pub enum EngineMessage { TaskEnded { id: u64, path: String, + /// How the task ended: "Completed" | "Cancelled" | "Panicked" + /// (the Debug rendering of `buttplug_core`'s `TaskOutcome`). outcome: String, }, TaskList { From 62138cff36f01f6c7d8d09d127f530931967d1d3 Mon Sep 17 00:00:00 2001 From: Kyle Machulis Date: Wed, 10 Jun 2026 13:19:23 -0700 Subject: [PATCH 19/20] test(core): assert held task reports Completed outcome MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit test_spawn_and_hold_keeps_scope_alive now subscribes to the registry event stream before spawning and asserts the held task's Ended event carries TaskOutcome::Completed (not Cancelled) — proving spawn_and_hold does not wire drop-cancel to the normally-finishing held task. Lagged broadcast reads are tolerated so the assertion stays stable under parallel test load. Co-Authored-By: Claude Fable 5 --- crates/buttplug_core/src/util/task/scope.rs | 36 +++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/crates/buttplug_core/src/util/task/scope.rs b/crates/buttplug_core/src/util/task/scope.rs index eb9e97c4c..035a2518b 100644 --- a/crates/buttplug_core/src/util/task/scope.rs +++ b/crates/buttplug_core/src/util/task/scope.rs @@ -278,8 +278,13 @@ mod test { #[tokio::test] async fn test_spawn_and_hold_keeps_scope_alive() { + use registry::TaskEvent; + let root = TaskScope::root("holdtest"); let path = root.path().to_owned(); + // Subscribe BEFORE spawning so we observe both the Started and Ended events + // for the held task and can assert how it actually finished. + let mut events = registry().event_stream(); let sub_scope = root.child("subscription"); let (tx, rx) = tokio::sync::oneshot::channel(); // Consuming spawn: the scope moves INTO the task and must not cancel it. @@ -295,5 +300,36 @@ mod test { tokio::time::timeout(Duration::from_secs(1), registry().wait_empty_under(&path)) .await .expect("task did not deregister"); + + // The held task ran to its natural end, so its reported outcome MUST be + // Completed — not Cancelled. (If spawn_and_hold had wired drop-cancel to the + // held task, it would have been cancelled mid-sleep and reported Cancelled.) + // Drain the event stream looking for this task's Ended event. The path is + // exact ("/subscription/worker") so we don't match unrelated tasks + // from other tests sharing the global registry. + let task_path = format!("{path}/subscription/worker"); + let outcome = tokio::time::timeout(Duration::from_secs(1), async { + loop { + match events.recv().await { + Ok(TaskEvent::Ended { path, outcome, .. }) if path == task_path => return outcome, + Ok(_) => continue, + // The registry's broadcast channel is process-global; heavy parallel + // test load can evict buffered events (Lagged). Keep draining — our + // own Ended event fires ~20ms after subscribing, so under normal load + // it arrives well before any eviction window matters. + Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue, + Err(tokio::sync::broadcast::error::RecvError::Closed) => { + panic!("event stream closed before held task's Ended event") + } + } + } + }) + .await + .expect("did not observe Ended event for held task"); + assert_eq!( + outcome, + TaskOutcome::Completed, + "normally-finishing held task should report Completed, got {outcome:?}" + ); } } From 3e8231f7e84fcc396be42880a805b0d257157212 Mon Sep 17 00:00:00 2001 From: Kyle Machulis Date: Wed, 10 Jun 2026 13:19:35 -0700 Subject: [PATCH 20/20] test(server): reframe shutdown smoke test, add stalled-bringup regression test Addresses code review of the task-scope-lifecycle PR-response fixes: - Fix 1: rename test_shutdown_runs_cleanup_through_event_loop_before_cancel to test_shutdown_under_load_drains_subtree and rewrite its docstring honestly. It is not RED-capable for the cleanup-before-cancel ordering (reverting that fix leaves it green), so it is documented as a shutdown-under-load smoke test that guards the coarser hang/leak failure mode. The existing NOTE on why the write-observation variant is infeasible (1ms message_gap teardown race) is kept; the inherently-flaky instrumented-ordering variant is explicitly not pursued. - Fix 2: add test_shutdown_resolves_with_stalled_bringup plus a StallingDeviceCommunicationManager util whose connect() never resolves. With a device bringup stalled in connect(), shutdown() must still resolve within 10s via the biased select on the bringup token. Verified RED: reverting the bringup select to the non-cancellable |_token| form makes shutdown hang and the test time out at 10s. - Parallel-stability: the leak-check tests derived their device-manager scope prefix by scanning the global registry for any new device-manager-N root, which could pick a concurrent test's root and flakily report leaked tasks. Add ServerDeviceManager::scope_path() and use server.device_manager(). scope_path() so each test inspects only its own subtree. Full file is now stable across 12 consecutive runs. Co-Authored-By: Claude Fable 5 --- .../src/device/server_device_manager.rs | 9 + .../tests/test_task_lifecycle.rs | 158 +++++++++++------- crates/buttplug_tests/tests/util/mod.rs | 2 + .../stalling_device_communication_manager.rs | 114 +++++++++++++ 4 files changed, 224 insertions(+), 59 deletions(-) create mode 100644 crates/buttplug_tests/tests/util/stalling_device_communication_manager.rs diff --git a/crates/buttplug_server/src/device/server_device_manager.rs b/crates/buttplug_server/src/device/server_device_manager.rs index 95b3cd606..ddb36fbf6 100644 --- a/crates/buttplug_server/src/device/server_device_manager.rs +++ b/crates/buttplug_server/src/device/server_device_manager.rs @@ -243,6 +243,15 @@ impl ServerDeviceManager { .map(|sender| convert_broadcast_receiver_to_stream(sender.subscribe())) } + /// The hierarchical path of this manager's root Task Scope + /// (e.g. "device-manager-3"). Every task this manager spawns lives at or + /// under this prefix in the global Task Registry. Useful for scoping + /// registry queries to a single manager instance — the registry is + /// process-global, so parallel managers must filter by their own prefix. + pub fn scope_path(&self) -> &str { + self.task_scope.path() + } + fn start_scanning(&self) -> ButtplugServerResultFuture { let command_sender = self.device_command_sender.clone(); async move { diff --git a/crates/buttplug_tests/tests/test_task_lifecycle.rs b/crates/buttplug_tests/tests/test_task_lifecycle.rs index f4cc7630b..4425c4443 100644 --- a/crates/buttplug_tests/tests/test_task_lifecycle.rs +++ b/crates/buttplug_tests/tests/test_task_lifecycle.rs @@ -23,7 +23,8 @@ use buttplug_core::{ use buttplug_server::message::ButtplugClientMessageVariant; use futures::{StreamExt, pin_mut}; use std::time::Duration; -use util::test_server_with_device; +use util::stalling_device_communication_manager::StallingDeviceCommunicationManagerBuilder; +use util::{test_server_with_comm_manager, test_server_with_device}; /// Bring a real (test-harness) device online and confirm that, once the server /// is shut down and dropped, no tasks remain registered under the scope tree. @@ -77,32 +78,31 @@ async fn test_server_shutdown_leaves_no_tasks() { .await .expect("timed out waiting for device to connect"); - // Device is up: the registry must now show more tasks than baseline, and at - // least one of them must be a per-device task. We also derive this server's - // own device-manager scope prefix so subsequent leak checks inspect only this - // server's subtree — the registry is process-global, so other tests running - // in parallel must not pollute these assertions. - let after_connect: Vec = registry().snapshot().into_iter().map(|t| t.path).collect(); - let new_tasks: Vec<&String> = after_connect - .iter() - .filter(|p| !baseline.contains(p)) + // Device is up. Scope all subsequent leak checks to THIS server's own + // device-manager subtree: the registry is process-global, so other tests + // running in parallel must not pollute these assertions. We ask the manager + // for its own scope path directly rather than guessing it from the global + // registry snapshot — guessing is racy, because a concurrent test's + // `device-manager-N` tasks are also "new" relative to our baseline and could + // be picked instead of ours. + let scope_prefix: String = server.device_manager().scope_path().to_owned(); + + // Sanity: the registry must now show this server spawned per-device tasks + // under its own subtree. + let new_tasks: Vec = registry() + .snapshot() + .into_iter() + .map(|t| t.path) + .filter(|p| !baseline.contains(p) && p.starts_with(&format!("{scope_prefix}/"))) .collect(); assert!( !new_tasks.is_empty(), - "expected scope-spawned tasks after a device connected" + "expected scope-spawned tasks under {scope_prefix} after a device connected" ); assert!( new_tasks.iter().any(|p| p.contains("/device-")), "expected a per-device task in the registry, got: {new_tasks:?}" ); - let scope_prefix: String = new_tasks - .iter() - .find_map(|p| { - p.split_once('/') - .filter(|(root, _)| root.starts_with("device-manager")) - .map(|(root, _)| root.to_owned()) - }) - .expect("expected this server's device-manager scope in the registry"); // `shutdown()` is contractually responsible for draining every task it // spawned: it cancels the scope tree and awaits the registry going empty @@ -139,36 +139,34 @@ async fn test_server_shutdown_leaves_no_tasks() { }); } -/// Regression test for shutdown ordering: cleanup MUST run before the task -/// scope is cancelled. -/// -/// `shutdown()` must run stop_scanning / stop_devices / per-device disconnect -/// and cancel the device-manager scope only afterwards. The buggy ordering -/// cancelled the scope synchronously first: device io tasks select `biased` -/// with their cancellation token, so queued stop commands were dropped, and the -/// StopScanning issued into the still-running event loop raced its cancellation. +/// Shutdown-under-load smoke test: with a device connected, scanning still +/// active, and the device in a non-zero output state, `shutdown()` must drive +/// its cleanup (stop_scanning / stop_devices / per-device disconnect) through +/// the live event loop, drain every task under its scope, and return Ok within +/// a bounded time — it must neither hang nor strand tasks. /// -/// This test exercises the StopScanning-through-event-loop path the old -/// ordering broke: a device is connected AND scanning is still active when -/// `shutdown()` is called. `shutdown()` must drive cleanup through the live -/// event loop, drain every scope task, and return Ok within a bounded time — -/// i.e. it must not hang or strand tasks. +/// SCOPE / what this does NOT verify: this is a smoke test, not a regression +/// test for shutdown *ordering*. It does not prove that cleanup runs *before* +/// scope cancellation — reverting the cleanup-before-cancel ordering leaves this +/// test green, because the contract it checks (shutdown completes and its +/// subtree drains) holds under both orderings with this harness. The +/// cleanup-before-cancel ordering is the correct production behavior, but a +/// test that goes RED on that specific regression is not achievable here (see +/// NOTE). This test guards against the coarser failure mode: a shutdown that +/// hangs or leaks tasks when invoked under realistic load. /// -/// NOTE on variant choice: the stronger "observe the device's actual stop -/// write" assertion proved infeasible with this harness. The test hardware sets -/// a 1ms message_gap (see TestHardwareConnector::specialize), so the device io -/// task batches commands; during shutdown the per-device `disconnect()` fires a -/// `Disconnected` hardware event that tears the io task down inside that 1ms -/// batch window, dropping the pending (batched) stop write regardless of cancel -/// ordering. That teardown race is independent of the bug under test, so a -/// write-observation assertion is inherently flaky here. We therefore assert the -/// contract the cleanup-before-cancel ordering must uphold: shutdown completes -/// successfully with cleanup driven through the live event loop. +/// NOTE on why ordering can't be observed here: the stronger "observe the +/// device's actual stop write" assertion is infeasible with this harness. The +/// test hardware sets a 1ms message_gap (see TestHardwareConnector::specialize), +/// so the device io task batches commands; during shutdown the per-device +/// `disconnect()` fires a `Disconnected` hardware event that tears the io task +/// down inside that 1ms batch window, dropping the pending (batched) stop write +/// regardless of cancel ordering. That teardown race is independent of any +/// ordering bug, so a write-observation assertion is inherently flaky here. An +/// instrumented-ordering variant was also attempted and found inherently flaky +/// with this harness, so it is deliberately not pursued. #[tokio::test] -async fn test_shutdown_runs_cleanup_through_event_loop_before_cancel() { - // Capture baseline so we can confirm shutdown drains everything it spawned. - let baseline: Vec = registry().snapshot().into_iter().map(|t| t.path).collect(); - +async fn test_shutdown_under_load_drains_subtree() { // Hold the channel so the device stays connected through shutdown. let (server, _channel) = test_server_with_device("Massage Demo"); @@ -178,7 +176,7 @@ async fn test_shutdown_runs_cleanup_through_event_loop_before_cancel() { server .parse_message(ButtplugClientMessageVariant::V4( RequestServerInfoV4::new( - "Shutdown Ordering Test", + "Shutdown Under Load Test", BUTTPLUG_CURRENT_API_MAJOR_VERSION, BUTTPLUG_CURRENT_API_MINOR_VERSION, ) @@ -227,18 +225,10 @@ async fn test_shutdown_runs_cleanup_through_event_loop_before_cancel() { // Identify this server's own device-manager scope prefix so the leak check // below is isolated from any other test running in parallel against the - // global registry. - let scope_prefix: String = registry() - .snapshot() - .into_iter() - .map(|t| t.path) - .filter(|p| !baseline.contains(p)) - .find_map(|p| { - p.split_once('/') - .filter(|(root, _)| root.starts_with("device-manager")) - .map(|(root, _)| root.to_owned()) - }) - .expect("expected this server's device-manager scope in the registry"); + // global registry. We ask the manager directly rather than guessing from the + // global registry snapshot, which would race with concurrent tests' own + // `device-manager-N` roots. + let scope_prefix: String = server.device_manager().scope_path().to_owned(); // shutdown() must drive cleanup (stop_scanning + stop_devices + disconnect) // through the live event loop and only then cancel the scope, returning Ok @@ -256,3 +246,53 @@ async fn test_shutdown_runs_cleanup_through_event_loop_before_cancel() { "shutdown() returned but {leaked} task(s) are still registered under {scope_prefix}" ); } + +/// Regression test for the cancellable-bringup fix (fix 2): `shutdown()` must +/// not hang when a device bringup is stalled in `connect()`. +/// +/// The stalling comm manager emits one `DeviceFound` on scan; the device-manager +/// event loop spawns a bringup task that awaits `connect()`, which never +/// resolves. `shutdown()` cancels the device-manager scope and then +/// `wait_empty_under`s its subtree. The bringup task only deregisters once it +/// observes cancellation via the `biased` select on its token — without that +/// select it would await `connect()` forever and `shutdown()` would never +/// resolve. +/// +/// RED evidence: replacing the bringup's `move |token|` select with the +/// non-cancellable `move |_token|` form makes this test time out at the 10s +/// bound and fail. With the fix in place it resolves promptly. +#[tokio::test] +async fn test_shutdown_resolves_with_stalled_bringup() { + let server = test_server_with_comm_manager(StallingDeviceCommunicationManagerBuilder::default()); + + server + .parse_message(ButtplugClientMessageVariant::V4( + RequestServerInfoV4::new( + "Stalled Bringup Test", + BUTTPLUG_CURRENT_API_MAJOR_VERSION, + BUTTPLUG_CURRENT_API_MINOR_VERSION, + ) + .into(), + )) + .await + .expect("server info request should succeed"); + + // Kick off scanning so a device is found and a bringup task begins — and then + // stalls in connect(). + server + .parse_message(ButtplugClientMessageVariant::V4( + StartScanningV0::default().into(), + )) + .await + .expect("start scanning should succeed"); + + // Give the bringup task time to spawn and enter (and block in) connect(). + tokio::time::sleep(Duration::from_millis(200)).await; + + // shutdown() must cancel the stalled bringup and return within the bound. + // Without the biased select on the bringup token this hangs forever. + tokio::time::timeout(Duration::from_secs(10), server.shutdown()) + .await + .expect("shutdown hung with a stalled device bringup — bringup is not honoring its cancellation token") + .expect("server shutdown errored"); +} diff --git a/crates/buttplug_tests/tests/util/mod.rs b/crates/buttplug_tests/tests/util/mod.rs index c19a80b54..9f4e07e8c 100644 --- a/crates/buttplug_tests/tests/util/mod.rs +++ b/crates/buttplug_tests/tests/util/mod.rs @@ -15,6 +15,8 @@ pub mod test_device_manager; pub use delay_device_communication_manager::DelayDeviceCommunicationManagerBuilder; #[allow(dead_code)] pub mod channel_transport; +#[allow(dead_code)] +pub mod stalling_device_communication_manager; use buttplug_client::ButtplugClient; use buttplug_client_in_process::ButtplugInProcessClientConnectorBuilder; use buttplug_server::{ diff --git a/crates/buttplug_tests/tests/util/stalling_device_communication_manager.rs b/crates/buttplug_tests/tests/util/stalling_device_communication_manager.rs new file mode 100644 index 000000000..7b21868c4 --- /dev/null +++ b/crates/buttplug_tests/tests/util/stalling_device_communication_manager.rs @@ -0,0 +1,114 @@ +// Buttplug Rust Source Code File - See https://buttplug.io for more info. +// +// Copyright 2016-2026 Nonpolynomial Labs LLC. All rights reserved. +// +// Licensed under the BSD 3-Clause license. See LICENSE file in the project root +// for full license information. + +//! A test comm manager whose device bringup stalls forever in `connect()`. +//! +//! On `start_scanning` it emits a single `DeviceFound` event carrying a +//! [StallingHardwareConnector]. The device-manager event loop spawns a bringup +//! task that awaits `connect()` — which never resolves — modelling a real BLE +//! connect that hangs. This is the precise scenario the cancellable-bringup fix +//! guards: without a `biased` select on the bringup token, `shutdown()`'s +//! `wait_empty_under` would block forever waiting for the bringup task to +//! deregister. + +use async_trait::async_trait; +use buttplug_core::{ButtplugResultFuture, errors::ButtplugDeviceError}; +use buttplug_server::device::hardware::{ + HardwareConnector, + HardwareSpecializer, + communication::{ + HardwareCommunicationManager, + HardwareCommunicationManagerBuilder, + HardwareCommunicationManagerEvent, + }, +}; +use buttplug_server_device_config::{BluetoothLESpecifier, ProtocolCommunicationSpecifier}; +use futures::FutureExt; +use log::error; +use std::collections::HashMap; +use tokio::sync::mpsc::Sender; + +/// A `HardwareConnector` whose `connect()` future never resolves. +#[derive(Debug)] +struct StallingHardwareConnector { + specifier: ProtocolCommunicationSpecifier, +} + +#[async_trait] +impl HardwareConnector for StallingHardwareConnector { + fn specifier(&self) -> ProtocolCommunicationSpecifier { + self.specifier.clone() + } + + async fn connect(&mut self) -> Result, ButtplugDeviceError> { + // Block forever: this models a device connect that hangs and never returns. + // The bringup task must drop this future when its cancellation token fires. + std::future::pending::<()>().await; + unreachable!("stalling connector connect() should never resolve"); + } +} + +#[derive(Default)] +pub struct StallingDeviceCommunicationManagerBuilder {} + +impl HardwareCommunicationManagerBuilder for StallingDeviceCommunicationManagerBuilder { + fn finish( + &mut self, + sender: Sender, + ) -> Box { + Box::new(StallingDeviceCommunicationManager { + device_sender: sender, + }) + } +} + +pub struct StallingDeviceCommunicationManager { + device_sender: Sender, +} + +impl HardwareCommunicationManager for StallingDeviceCommunicationManager { + fn name(&self) -> &'static str { + "StallingDeviceCommunicationManager" + } + + fn start_scanning(&mut self) -> ButtplugResultFuture { + let device_sender = self.device_sender.clone(); + async move { + // "Massage Demo" is a known test device name with a real protocol config, + // so bringup proceeds into connect() (which then stalls). + let specifier = ProtocolCommunicationSpecifier::BluetoothLE( + BluetoothLESpecifier::new_from_device("Massage Demo", &HashMap::new(), &[]), + ); + let connector = StallingHardwareConnector { specifier }; + if device_sender + .send(HardwareCommunicationManagerEvent::DeviceFound { + name: "Massage Demo".to_owned(), + address: "stalling-device-0".to_owned(), + creator: Box::new(connector), + }) + .await + .is_err() + { + error!("Device channel no longer open."); + } + Ok(()) + } + .boxed() + } + + fn stop_scanning(&mut self) -> ButtplugResultFuture { + async { Ok(()) }.boxed() + } + + fn scanning_status(&self) -> bool { + false + } + + fn can_scan(&self) -> bool { + true + } +}