diff --git a/CLAUDE.md b/CLAUDE.md index 4df848dd5..d4c52d1eb 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -1,6 +1,6 @@ # CLAUDE.md -Last verified: 2026-05-19 +Last verified: 2026-06-09 This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. @@ -11,10 +11,12 @@ cargo build # Debug build cargo build --release # Release build (LTO enabled) cargo test # Run all tests cargo test -p buttplug_server # Run tests for specific crate -cargo fmt --all -- --check # Check formatting -cargo fmt # Auto-format (2-space indent, edition 2024) +cargo +nightly fmt --all -- --check # Check formatting (MUST use nightly) +cargo +nightly fmt # Auto-format (2-space indent, edition 2024) ``` +**Formatting gotcha**: rustfmt.toml uses nightly-only options (`imports_layout`, `empty_item_single_line`). Running `cargo fmt` on the STABLE toolchain silently ignores them and rewrites the entire workspace into the wrong style (~190 files of import-collapsing churn). Always use `cargo +nightly fmt`. CI checks formatting with nightly. + **Linux dependencies**: `libudev-dev`, `libusb-1.0-0-dev` (for serial/HID support) **WASM build**: @@ -54,7 +56,7 @@ Buttplug is a framework for interfacing with intimate hardware devices. It uses **Message-Based Protocol**: All client-server communication uses versioned JSON messages (v0-v4). Version negotiation happens during handshake. -**Async Architecture**: Heavy use of tokio channels (mpsc, broadcast, oneshot) for communication between components. Runtime abstraction supports tokio (production) and WASM. +**Async Architecture**: Heavy use of tokio channels (mpsc, broadcast, oneshot) for communication between components. Runtime abstraction supports tokio (production) and WASM. Spawned tasks are owned by `TaskScope`s rather than fire-and-forget `spawn!` (see Task Lifecycle below). **Device Lifecycle**: ``` @@ -93,6 +95,17 @@ Simulated devices allow testing the full device lifecycle without real hardware. - Validation rejects unknown archetypes and duplicate addresses at config build time - `SimulatedProtocol` is a no-op handler; `SimulatedHardwareConnector` creates in-memory endpoints +**Task Lifecycle** (`buttplug_core::util::task`): +Spawned async tasks are owned by a `TaskScope` ownership tree rather than fire-and-forget, giving cooperative cancellation and global introspection. Key contracts: +- `TaskScope::root(name)` makes a root scope (path gets a unique numeric suffix, e.g. `server-2`, so parallel instances don't collide); `.child(name)` derives a sub-scope whose token is a child of the parent's. +- `scope.spawn(name, |token| async ...)` spawns a task owned by the scope; long-running tasks MUST `select!` on the passed token. Cancelling or dropping a scope cancels its whole subtree. +- `scope.spawn_and_hold(name, ...)` consumes the scope into the task, so drop-cancel can't fire before the task runs (used for `FnOnce` callbacks like ping-timeout and protocol subscription handlers). +- `scope.shutdown().await` cancels the subtree and waits until every task under it has deregistered (wrap in a timeout if tasks may be uncooperative). +- `spawn_detached(name, fut)` is a rare escape hatch: registered under `detached/{name}` but uncancellable. Prefer scopes. +- `TaskRegistry` (global, via `registry()`) records every live task: `snapshot()`, `live_count_under(prefix)` (segment-aware prefix match), `event_stream()` (`TaskEvent::Started`/`Ended`), `wait_empty_under(prefix)`. +- Ownership in the server: `ButtplugServer` owns a `server` root scope; `ServerDeviceManager` owns a `device-manager` root scope with per-device child scopes (io/event-forwarding/bringup); `PingTimer` is scope-owned (the old `PingMessage::End` + Drop-spawn shutdown hack is gone). `ProtocolHandler::handle_input_subscribe_cmd` now takes a `TaskScope` param. +- `intiface_engine` exposes this to frontends when `emit_task_events` is set: registry events forward as `EngineMessage::TaskStarted`/`TaskEnded`, and `IntifaceMessage::RequestTaskList` returns `EngineMessage::TaskList` (`Vec`). + ## Agent skills ### Issue tracker diff --git a/CONTEXT.md b/CONTEXT.md index 5ee2983c4..5489a1808 100644 --- a/CONTEXT.md +++ b/CONTEXT.md @@ -92,6 +92,14 @@ _Avoid_: "Discovery" as a distinct stage from identification — they're the sam An opt-in broadcast of every output command sent to a **Device** — device index, feature index, output type, and value. Used by frontends (e.g. Intiface Central) to visually display real-time device activity, verify hardware behaviour matches commands, and let developers see what *would* happen with simulated devices. Disabled by default to avoid overhead. _Avoid_: Treating as internal-only debugging; it's a user-facing observability feature. +**Task Scope**: +The owner of spawned async tasks within a module. Every task is spawned through a Task Scope, which links it to a parent, derives its hierarchical name (e.g. `device-manager-1/devices/device-3/io`), registers it in the **Task Registry**, and hands it a cooperative cancellation token. Dropping a scope cancels its children. Tasks cannot be spawned without a parent scope. +_Avoid_: "Detached task" or bare spawning as the normal pattern; detachment is the rare, explicit exception. + +**Task Registry**: +The queryable record of every live task — id, hierarchical path, parent, state. Populated as a side effect of spawning through a **Task Scope**. Exposed in-process for tests and embedders, and to frontends via TaskStarted/TaskEnded **Events** plus a snapshot query (same opt-in pattern as **Output Observation**). +_Avoid_: Treating as internal-only debugging; like Output Observations, it's user-facing observability. + **Command**: A message from **Client** to **Server** requesting an action — controlling a device, starting a scan, requesting device lists. Always has a non-zero message ID; the server responds with an `Ok` or `Error` using the same ID. _Avoid_: Referring to server-initiated messages as commands. diff --git a/crates/buttplug_core/Cargo.toml b/crates/buttplug_core/Cargo.toml index f979fff80..bb7f5cef0 100644 --- a/crates/buttplug_core/Cargo.toml +++ b/crates/buttplug_core/Cargo.toml @@ -57,3 +57,5 @@ wasm-bindgen-futures = { version = "0.4.64", optional = true } wasmtimer = { version = "0.4.3", optional = true } smallvec = { version = "1.15.1", features = ["serde", "const_generics"] } enumflags2 = "0.7.12" +tokio-util = "0.7.18" +dashmap = "6.1.0" diff --git a/crates/buttplug_core/src/util/mod.rs b/crates/buttplug_core/src/util/mod.rs index 439950aeb..5734abdb4 100644 --- a/crates/buttplug_core/src/util/mod.rs +++ b/crates/buttplug_core/src/util/mod.rs @@ -14,6 +14,7 @@ pub mod range; pub mod serializers; pub mod small_vec_enum_map; pub mod stream; +pub mod task; #[cfg(all(not(feature = "wasm"), feature = "tokio-runtime"))] pub use tokio::time::sleep; diff --git a/crates/buttplug_core/src/util/task/mod.rs b/crates/buttplug_core/src/util/task/mod.rs new file mode 100644 index 000000000..0391a9f0e --- /dev/null +++ b/crates/buttplug_core/src/util/task/mod.rs @@ -0,0 +1,93 @@ +// Buttplug Rust Source Code File - See https://buttplug.io for more info. +// +// Copyright 2016-2026 Nonpolynomial Labs LLC. All rights reserved. +// +// Licensed under the BSD 3-Clause license. See LICENSE file in the project root +// for full license information. + +//! Task Scope and Task Registry: ownership and introspection for spawned tasks. +//! +//! A task spawned through a [TaskScope] is linked into an ownership tree, given +//! a hierarchical path, registered in the global [TaskRegistry], and handed a +//! cooperative [CancellationToken]; dropping a scope cancels its subtree. The +//! `buttplug_server` crate is fully migrated onto scope-owned tasks; the +//! hardware-manager and client crates still use the bare `spawn!` macro and +//! migrate onto scopes in a follow-up. + +mod registry; +mod scope; + +pub use registry::{TaskEvent, TaskId, TaskInfo, TaskOutcome, TaskRegistry, registry}; +pub use scope::TaskScope; + +use crate::util::async_manager; +use std::future::Future; + +/// Spawn a task with no owning scope. Registered in the Task Registry under +/// "detached/{name}" so it still shows up in snapshots, but nothing can cancel +/// it. RARE — prefer [TaskScope::spawn]. Valid uses: one-shot notifications +/// where the spawner is being destroyed. +#[cfg(not(feature = "wasm"))] +pub fn spawn_detached(name: &str, fut: Fut) +where + Fut: Future + Send + 'static, +{ + let path = format!("detached/{name}"); + let id = registry().register(path.clone(), true); + let span = tracing::span!(tracing::Level::INFO, "buttplug_task", task.path = %path); + async_manager::spawn( + async move { + // Deregister via a drop guard so a panicking detached task still leaves + // the registry (outcome Panicked) instead of leaking its entry forever. + // Detached tasks have no cancellation token, so the outcome is Panicked on + // panic, else Completed. + let _guard = scope::DeregisterGuard::new(id, None); + fut.await; + }, + span, + ); +} + +/// Spawn a task with no owning scope (WASM, no Send required). See the +/// non-WASM variant for semantics. +#[cfg(feature = "wasm")] +pub fn spawn_detached(name: &str, fut: Fut) +where + Fut: Future + 'static, +{ + let path = format!("detached/{name}"); + let id = registry().register(path.clone(), true); + let span = tracing::span!(tracing::Level::INFO, "buttplug_task", task.path = %path); + async_manager::spawn( + async move { + // Deregister via a drop guard so a panicking detached task still leaves + // the registry (outcome Panicked) instead of leaking its entry forever. + // Detached tasks have no cancellation token, so the outcome is Panicked on + // panic, else Completed. + let _guard = scope::DeregisterGuard::new(id, None); + fut.await; + }, + span, + ); +} + +#[cfg(test)] +mod test { + use super::*; + use std::time::Duration; + + #[tokio::test] + async fn test_spawn_detached_registers() { + let (tx, rx) = tokio::sync::oneshot::channel(); + spawn_detached("test-notify", async move { + let _ = tx.send(()); + }); + rx.await.unwrap(); + tokio::time::timeout( + Duration::from_secs(1), + registry().wait_empty_under("detached/test-notify"), + ) + .await + .expect("detached task did not deregister"); + } +} diff --git a/crates/buttplug_core/src/util/task/registry.rs b/crates/buttplug_core/src/util/task/registry.rs new file mode 100644 index 000000000..97f986e28 --- /dev/null +++ b/crates/buttplug_core/src/util/task/registry.rs @@ -0,0 +1,222 @@ +// Buttplug Rust Source Code File - See https://buttplug.io for more info. +// +// Copyright 2016-2026 Nonpolynomial Labs LLC. All rights reserved. +// +// Licensed under the BSD 3-Clause license. See LICENSE file in the project root +// for full license information. + +use dashmap::DashMap; +use std::sync::{ + OnceLock, + atomic::{AtomicU64, Ordering}, +}; +use tokio::sync::broadcast; + +/// Unique identifier for a registered task. Process-lifetime unique. +/// +/// IDs are drawn from a monotonically-incrementing `AtomicU64` counter. At +/// 64-bit width and even at 10 million tasks per second the counter would take +/// roughly 58,000 years to wrap, so id reuse within a single process lifetime +/// is not a practical concern. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct TaskId(u64); + +impl TaskId { + pub fn value(&self) -> u64 { + self.0 + } +} + +/// How a task ended: ran to completion on its own, exited after observing +/// cancellation, or unwound from a panic. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum TaskOutcome { + Completed, + Cancelled, + Panicked, +} + +/// Registry entry for a live task. +#[derive(Debug, Clone)] +pub struct TaskInfo { + pub id: TaskId, + /// Hierarchical path, e.g. "device-manager-1/devices/device-3/io". + pub path: String, + /// True if spawned via spawn_detached (no owning scope). + pub detached: bool, +} + +/// Lifecycle events broadcast by the Task Registry. +#[derive(Debug, Clone)] +pub enum TaskEvent { + Started { + id: TaskId, + path: String, + }, + Ended { + id: TaskId, + path: String, + outcome: TaskOutcome, + }, +} + +/// The global record of every live task, populated as a side effect of +/// spawning through a Task Scope. +/// +/// **Memory note**: entries are removed on `deregister`, but `DashMap` does not +/// shrink shard capacity after removals. Peak concurrent-task count therefore +/// becomes a memory high-water mark that is held for the lifetime of the +/// registry (i.e. the process). In practice buttplug servers run a bounded +/// number of concurrent tasks, so this is not expected to be significant. +#[derive(Debug)] +pub struct TaskRegistry { + tasks: DashMap, + counter: AtomicU64, + root_counter: AtomicU64, + events: broadcast::Sender, +} + +/// The global Task Registry. +pub fn registry() -> &'static TaskRegistry { + static REGISTRY: OnceLock = OnceLock::new(); + REGISTRY.get_or_init(TaskRegistry::new) +} + +impl TaskRegistry { + pub(super) fn new() -> Self { + Self { + tasks: DashMap::new(), + counter: AtomicU64::new(1), + root_counter: AtomicU64::new(1), + events: broadcast::channel(256).0, + } + } + + /// Unique suffix for root scope names so parallel instances don't collide. + pub(super) fn next_root_suffix(&self) -> u64 { + self.root_counter.fetch_add(1, Ordering::Relaxed) + } + + pub(super) fn register(&self, path: String, detached: bool) -> TaskId { + let id = TaskId(self.counter.fetch_add(1, Ordering::Relaxed)); + self.tasks.insert( + id.0, + TaskInfo { + id, + path: path.clone(), + detached, + }, + ); + let _ = self.events.send(TaskEvent::Started { id, path }); + id + } + + pub(super) fn deregister(&self, id: TaskId, outcome: TaskOutcome) { + if let Some((_, info)) = self.tasks.remove(&id.0) { + let _ = self.events.send(TaskEvent::Ended { + id, + path: info.path, + outcome, + }); + } + } + + /// Snapshot of all live tasks. + pub fn snapshot(&self) -> Vec { + self.tasks.iter().map(|e| e.value().clone()).collect() + } + + /// Count of live tasks at or under the given path prefix. Prefix matching is + /// segment-aware: "server-1" matches "server-1/loop" but not "server-10/loop". + pub fn live_count_under(&self, prefix: &str) -> usize { + self + .tasks + .iter() + .filter(|e| path_is_under(&e.value().path, prefix)) + .count() + } + + /// Subscribe to task lifecycle events. + pub fn event_stream(&self) -> broadcast::Receiver { + self.events.subscribe() + } + + /// Wait until no live tasks remain at or under the given path prefix. + /// Subscribes to events BEFORE counting to avoid missing an Ended event + /// between the count and the wait. Callers should wrap in a timeout if the + /// subtree may contain uncooperative tasks. + pub async fn wait_empty_under(&self, prefix: &str) { + let mut events = self.events.subscribe(); + while self.live_count_under(prefix) > 0 { + match events.recv().await { + Ok(_) | Err(broadcast::error::RecvError::Lagged(_)) => continue, + // Sender is owned by the registry itself; Closed only happens for + // non-global registries in tests being dropped. + Err(broadcast::error::RecvError::Closed) => return, + } + } + } +} + +fn path_is_under(path: &str, prefix: &str) -> bool { + path == prefix || (path.starts_with(prefix) && path.as_bytes().get(prefix.len()) == Some(&b'/')) +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_register_and_snapshot() { + let reg = TaskRegistry::new(); + let id = reg.register("root-1/loop".to_owned(), false); + let snapshot = reg.snapshot(); + assert_eq!(snapshot.len(), 1); + assert_eq!(snapshot[0].id, id); + assert_eq!(snapshot[0].path, "root-1/loop"); + reg.deregister(id, TaskOutcome::Completed); + assert!(reg.snapshot().is_empty()); + } + + #[test] + fn test_prefix_boundary() { + let reg = TaskRegistry::new(); + reg.register("server-1/loop".to_owned(), false); + reg.register("server-10/loop".to_owned(), false); + // "server-1" must not match "server-10/loop" + assert_eq!(reg.live_count_under("server-1"), 1); + assert_eq!(reg.live_count_under("server-10"), 1); + assert_eq!(reg.live_count_under("server"), 0); + } + + #[tokio::test] + async fn test_events_emitted() { + let reg = TaskRegistry::new(); + let mut events = reg.event_stream(); + let id = reg.register("root-1/task".to_owned(), false); + reg.deregister(id, TaskOutcome::Cancelled); + assert!(matches!( + events.recv().await.unwrap(), + TaskEvent::Started { .. } + )); + let TaskEvent::Ended { outcome, .. } = events.recv().await.unwrap() else { + panic!("expected Ended event"); + }; + assert_eq!(outcome, TaskOutcome::Cancelled); + } + + #[tokio::test] + async fn test_wait_empty_under() { + let reg = std::sync::Arc::new(TaskRegistry::new()); + let id = reg.register("root-1/task".to_owned(), false); + let reg_clone = reg.clone(); + let waiter = tokio::spawn(async move { reg_clone.wait_empty_under("root-1").await }); + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + assert!(!waiter.is_finished()); + reg.deregister(id, TaskOutcome::Completed); + tokio::time::timeout(std::time::Duration::from_secs(1), waiter) + .await + .expect("wait_empty_under did not resolve") + .unwrap(); + } +} diff --git a/crates/buttplug_core/src/util/task/scope.rs b/crates/buttplug_core/src/util/task/scope.rs new file mode 100644 index 000000000..035a2518b --- /dev/null +++ b/crates/buttplug_core/src/util/task/scope.rs @@ -0,0 +1,335 @@ +// Buttplug Rust Source Code File - See https://buttplug.io for more info. +// +// Copyright 2016-2026 Nonpolynomial Labs LLC. All rights reserved. +// +// Licensed under the BSD 3-Clause license. See LICENSE file in the project root +// for full license information. + +use super::registry::{TaskId, TaskOutcome, registry}; +use crate::util::async_manager; +use std::future::Future; +use tokio_util::sync::CancellationToken; + +/// The owner of spawned async tasks within a module. +/// +/// Tasks can only be spawned through a scope, which derives their hierarchical +/// path, registers them in the global Task Registry, and hands them a +/// cooperative [CancellationToken]. Dropping a scope cancels its subtree. +/// +/// Scopes are intentionally NOT [Clone]: ownership is singular. To share +/// spawning capability, create a [child][TaskScope::child] and move it, or +/// wrap a scope in [std::sync::Arc] when a cloneable holder needs it (the +/// subtree then cancels when the last Arc drops). +#[derive(Debug)] +pub struct TaskScope { + path: String, + token: CancellationToken, +} + +impl TaskScope { + /// Create a root scope. The path gets a unique numeric suffix + /// (e.g. "server-2") so parallel instances in one process don't collide. + pub fn root(name: &str) -> Self { + Self { + path: format!("{}-{}", name, registry().next_root_suffix()), + token: CancellationToken::new(), + } + } + + /// Create a child scope. Cancelling this scope cancels the child. + pub fn child(&self, name: &str) -> Self { + Self { + path: format!("{}/{}", self.path, name), + token: self.token.child_token(), + } + } + + pub fn path(&self) -> &str { + &self.path + } + + /// The scope's own cancellation token, for select!-ing in code that runs + /// inside an already-spawned context. + pub fn token(&self) -> &CancellationToken { + &self.token + } + + /// Request cancellation of every task in this scope's subtree. + pub fn cancel(&self) { + self.token.cancel(); + } + + /// Cancel the subtree and wait until every task under this scope has + /// deregistered. Wrap in a timeout if the subtree may contain + /// uncooperative tasks. + pub async fn shutdown(&self) { + self.token.cancel(); + registry().wait_empty_under(&self.path).await; + } + + /// Spawn a task owned by this scope. The closure receives the task's own + /// child token; long-running tasks MUST select on it. + #[cfg(not(feature = "wasm"))] + pub fn spawn(&self, name: &str, f: F) + where + F: FnOnce(CancellationToken) -> Fut, + Fut: Future + Send + 'static, + { + let (id, task_token, span) = self.register_task(name); + let fut = f(task_token.clone()); + async_manager::spawn(finish_task(fut, id, task_token), span); + } + + /// Spawn a task owned by this scope (WASM, no Send required). + #[cfg(feature = "wasm")] + pub fn spawn(&self, name: &str, f: F) + where + F: FnOnce(CancellationToken) -> Fut, + Fut: Future + 'static, + { + let (id, task_token, span) = self.register_task(name); + let fut = f(task_token.clone()); + async_manager::spawn(finish_task(fut, id, task_token), span); + } + + /// Consume the scope and spawn a task that holds it alive for its own + /// duration. Use when the caller has nowhere to store the scope (e.g. + /// protocol subscription handlers): drop-cancel must not fire before the + /// task runs, but parent cancellation still propagates. + #[cfg(not(feature = "wasm"))] + pub fn spawn_and_hold(self, name: &str, f: F) + where + F: FnOnce(CancellationToken) -> Fut, + Fut: Future + Send + 'static, + { + let (id, task_token, span) = self.register_task(name); + let fut = f(task_token.clone()); + async_manager::spawn( + async move { + let _hold = self; + finish_task(fut, id, task_token).await; + }, + span, + ); + } + + /// Consume the scope and spawn a task that holds it alive (WASM, no Send). + #[cfg(feature = "wasm")] + pub fn spawn_and_hold(self, name: &str, f: F) + where + F: FnOnce(CancellationToken) -> Fut, + Fut: Future + 'static, + { + let (id, task_token, span) = self.register_task(name); + let fut = f(task_token.clone()); + async_manager::spawn( + async move { + let _hold = self; + finish_task(fut, id, task_token).await; + }, + span, + ); + } + + fn register_task(&self, name: &str) -> (TaskId, CancellationToken, tracing::Span) { + let path = format!("{}/{}", self.path, name); + let id = registry().register(path.clone(), false); + let task_token = self.token.child_token(); + // Span names must be const in tracing; the dynamic path goes in a field. + let span = tracing::span!(tracing::Level::INFO, "buttplug_task", task.path = %path); + (id, task_token, span) + } +} + +/// Deregisters a task from the global registry on drop. Created BEFORE the +/// task future is awaited so deregistration happens even if the future panics: +/// a panicking task unwinds through this guard, so the registry entry is removed +/// rather than leaked (which would hang `wait_empty_under` on that subtree +/// forever). The outcome is derived at drop time from whether we are unwinding +/// from a panic and whether the task observed cancellation. +/// +/// `token` is `None` for detached tasks, which have no cancellation concept: +/// their outcome is `Panicked` on panic, else `Completed`. +pub(super) struct DeregisterGuard { + id: TaskId, + token: Option, +} + +impl DeregisterGuard { + pub(super) fn new(id: TaskId, token: Option) -> Self { + Self { id, token } + } +} + +impl Drop for DeregisterGuard { + fn drop(&mut self) { + let outcome = if std::thread::panicking() { + TaskOutcome::Panicked + } else if self.token.as_ref().is_some_and(|t| t.is_cancelled()) { + TaskOutcome::Cancelled + } else { + TaskOutcome::Completed + }; + registry().deregister(self.id, outcome); + } +} + +async fn finish_task(fut: impl Future, id: TaskId, token: CancellationToken) { + let _guard = DeregisterGuard::new(id, Some(token)); + fut.await; +} + +impl Drop for TaskScope { + fn drop(&mut self) { + self.token.cancel(); + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::util::task::registry; + use std::time::Duration; + + #[test] + fn test_root_path_unique() { + let a = TaskScope::root("testroot"); + let b = TaskScope::root("testroot"); + assert_ne!(a.path(), b.path()); + assert!(a.path().starts_with("testroot-")); + } + + #[test] + fn test_child_path() { + let root = TaskScope::root("testroot"); + let child = root.child("devices"); + assert_eq!(child.path(), format!("{}/devices", root.path())); + } + + #[tokio::test] + async fn test_spawn_registers_and_completes() { + let root = TaskScope::root("spawntest"); + let path = root.path().to_owned(); + let (tx, rx) = tokio::sync::oneshot::channel(); + root.spawn("worker", |_token| async move { + let _ = tx.send(()); + }); + rx.await.unwrap(); + tokio::time::timeout(Duration::from_secs(1), registry().wait_empty_under(&path)) + .await + .expect("task did not deregister after completion"); + } + + #[tokio::test] + async fn test_cancel_propagates_to_children() { + let root = TaskScope::root("canceltest"); + let path = root.path().to_owned(); + let child = root.child("inner"); + child.spawn("worker", |token| async move { + token.cancelled().await; + }); + root.cancel(); + tokio::time::timeout(Duration::from_secs(1), registry().wait_empty_under(&path)) + .await + .expect("cancel did not propagate to child scope task"); + } + + #[tokio::test] + async fn test_drop_cancels() { + let root = TaskScope::root("droptest"); + let path = root.path().to_owned(); + root.spawn("worker", |token| async move { + token.cancelled().await; + }); + drop(root); + tokio::time::timeout(Duration::from_secs(1), registry().wait_empty_under(&path)) + .await + .expect("drop did not cancel task"); + } + + #[tokio::test] + async fn test_shutdown_awaits_subtree() { + let root = TaskScope::root("shutdowntest"); + root.spawn("worker", |token| async move { + token.cancelled().await; + // Simulate cleanup work after observing cancellation. + tokio::time::sleep(Duration::from_millis(20)).await; + }); + tokio::time::timeout(Duration::from_secs(1), root.shutdown()) + .await + .expect("shutdown did not resolve"); + } + + #[tokio::test] + async fn test_panicking_task_deregisters() { + // A scoped task that panics must still deregister (via the drop guard), + // otherwise wait_empty_under on its root would hang forever. tokio catches + // the panic at the task boundary, so this test itself does not fail from the + // spawned panic. Without the guard, this wait would time out. + let root = TaskScope::root("panictest"); + let path = root.path().to_owned(); + root.spawn("panicker", |_token| async move { + panic!("intentional panic for deregistration test"); + }); + tokio::time::timeout(Duration::from_secs(1), registry().wait_empty_under(&path)) + .await + .expect("panicking task did not deregister — registry entry leaked"); + } + + #[tokio::test] + async fn test_spawn_and_hold_keeps_scope_alive() { + use registry::TaskEvent; + + let root = TaskScope::root("holdtest"); + let path = root.path().to_owned(); + // Subscribe BEFORE spawning so we observe both the Started and Ended events + // for the held task and can assert how it actually finished. + let mut events = registry().event_stream(); + let sub_scope = root.child("subscription"); + let (tx, rx) = tokio::sync::oneshot::channel(); + // Consuming spawn: the scope moves INTO the task and must not cancel it. + sub_scope.spawn_and_hold("worker", |_token| async move { + tokio::time::sleep(Duration::from_millis(20)).await; + let _ = tx.send(()); + }); + // If drop-cancel fired early this would hang (task cancelled before send). + tokio::time::timeout(Duration::from_secs(1), rx) + .await + .expect("spawn_and_hold task was cancelled early") + .unwrap(); + tokio::time::timeout(Duration::from_secs(1), registry().wait_empty_under(&path)) + .await + .expect("task did not deregister"); + + // The held task ran to its natural end, so its reported outcome MUST be + // Completed — not Cancelled. (If spawn_and_hold had wired drop-cancel to the + // held task, it would have been cancelled mid-sleep and reported Cancelled.) + // Drain the event stream looking for this task's Ended event. The path is + // exact ("/subscription/worker") so we don't match unrelated tasks + // from other tests sharing the global registry. + let task_path = format!("{path}/subscription/worker"); + let outcome = tokio::time::timeout(Duration::from_secs(1), async { + loop { + match events.recv().await { + Ok(TaskEvent::Ended { path, outcome, .. }) if path == task_path => return outcome, + Ok(_) => continue, + // The registry's broadcast channel is process-global; heavy parallel + // test load can evict buffered events (Lagged). Keep draining — our + // own Ended event fires ~20ms after subscribing, so under normal load + // it arrives well before any eviction window matters. + Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue, + Err(tokio::sync::broadcast::error::RecvError::Closed) => { + panic!("event stream closed before held task's Ended event") + } + } + } + }) + .await + .expect("did not observe Ended event for held task"); + assert_eq!( + outcome, + TaskOutcome::Completed, + "normally-finishing held task should report Completed, got {outcome:?}" + ); + } +} diff --git a/crates/buttplug_server/src/device/device_handle.rs b/crates/buttplug_server/src/device/device_handle.rs index f666b2a18..759dc4824 100644 --- a/crates/buttplug_server/src/device/device_handle.rs +++ b/crates/buttplug_server/src/device/device_handle.rs @@ -26,7 +26,7 @@ use buttplug_core::{ OutputValue, StopCmdV4, }, - util::stream::convert_broadcast_receiver_to_stream, + util::{stream::convert_broadcast_receiver_to_stream, task::TaskScope}, }; use buttplug_server_device_config::{ DeviceConfigurationManager, @@ -111,10 +111,16 @@ pub struct DeviceHandle { stop_commands: Arc>, internal_hw_msg_sender: Sender>, output_observation_sender: Option>, + /// Scope owning this device's tasks. Rides in an Arc since DeviceHandle is + /// Clone; the subtree cancels when the last clone drops. + task_scope: Arc, } impl DeviceHandle { - /// Create a new DeviceHandle with direct ownership of device state + /// Create a new DeviceHandle with direct ownership of device state. + /// The arguments bundle all device-state concerns (hardware, protocol, definition, identity, + /// commands, I/O channel, observability, lifecycle scope) — suppressing the lint is correct here. + #[allow(clippy::too_many_arguments)] pub(crate) fn new( hardware: Arc, handler: Arc, @@ -123,6 +129,7 @@ impl DeviceHandle { stop_commands: Vec, internal_hw_msg_sender: Sender>, output_observation_sender: Option>, + task_scope: Arc, ) -> Self { Self { hardware, @@ -134,6 +141,7 @@ impl DeviceHandle { stop_commands: Arc::new(stop_commands), internal_hw_msg_sender, output_observation_sender, + task_scope, } } @@ -402,9 +410,17 @@ impl DeviceHandle { info!("Handling input subscribe command"); let device = self.hardware.clone(); let handler = self.handler.clone(); + let task_scope = self.task_scope.child("subscription"); async move { handler - .handle_input_subscribe_cmd(device_index, device, feature_index, feature_id, input_type) + .handle_input_subscribe_cmd( + device_index, + device, + feature_index, + feature_id, + input_type, + task_scope, + ) .await .map(|_| message::OkV0::new(1).into()) .map_err(|e| e.into()) @@ -455,7 +471,12 @@ pub(super) async fn build_device_handle( protocol_specializers: Vec, device_event_sender: tokio::sync::mpsc::Sender, output_observation_sender: Option>, + task_scope: TaskScope, ) -> Result { + // DeviceHandle is Clone, so its scope rides in an Arc; the device subtree + // cancels when the last clone (and thus the last Arc) drops. + let task_scope = Arc::new(task_scope); + // At this point, we know we've got hardware that is waiting to connect, and enough protocol // info to actually do something after we connect. So go ahead and connect. trace!("Connecting to {:?}", hardware_connector); @@ -524,6 +545,7 @@ pub(super) async fn build_device_handle( }; spawn_device_task( + &task_scope, hardware.clone(), handler.clone(), DeviceTaskConfig { @@ -590,6 +612,7 @@ pub(super) async fn build_device_handle( stop_commands, internal_hw_msg_sender, output_observation_sender, + task_scope.clone(), ); // If we need a keepalive with a packet replay, set this up via stopping the device on connect. @@ -614,41 +637,48 @@ pub(super) async fn build_device_handle( // to the device manager event loop via the provided sender. let event_stream = device_handle.event_stream(); let identifier = device_handle.identifier().clone(); - buttplug_core::spawn!("DeviceEventForwarding", async move { + task_scope.spawn("event-forwarding", move |token| async move { futures::pin_mut!(event_stream); loop { - let event = futures::StreamExt::next(&mut event_stream).await; - match event { - Some(DeviceEvent::Disconnected(id)) => { - if device_event_sender - .send(InternalDeviceEvent::Disconnected(id)) - .await - .is_err() - { - info!( - "Device event sender closed for device {:?}, stopping event forwarding.", - identifier - ); - break; - } + tokio::select! { + _ = token.cancelled() => { + info!("Event forwarding cancelled for device {:?}", identifier); + break; } - Some(DeviceEvent::Notification(_, msg)) => { - if device_event_sender - .send(InternalDeviceEvent::Notification(msg)) - .await - .is_err() - { - info!( - "Device event sender closed for device {:?}, stopping event forwarding.", - identifier - ); - break; + event = futures::StreamExt::next(&mut event_stream) => { + match event { + Some(DeviceEvent::Disconnected(id)) => { + if device_event_sender + .send(InternalDeviceEvent::Disconnected(id)) + .await + .is_err() + { + info!( + "Device event sender closed for device {:?}, stopping event forwarding.", + identifier + ); + break; + } + } + Some(DeviceEvent::Notification(_, msg)) => { + if device_event_sender + .send(InternalDeviceEvent::Notification(msg)) + .await + .is_err() + { + info!( + "Device event sender closed for device {:?}, stopping event forwarding.", + identifier + ); + break; + } + } + None => { + // Stream ended (device likely disconnected) + break; + } } } - None => { - // Stream ended (device likely disconnected) - break; - } } } }); diff --git a/crates/buttplug_server/src/device/device_task.rs b/crates/buttplug_server/src/device/device_task.rs index a54f1fa43..8664ba35d 100644 --- a/crates/buttplug_server/src/device/device_task.rs +++ b/crates/buttplug_server/src/device/device_task.rs @@ -14,9 +14,10 @@ use std::{collections::VecDeque, sync::Arc, time::Duration}; -use buttplug_core::util::async_manager; +use buttplug_core::util::{async_manager, task::TaskScope}; use futures::future; use tokio::{select, sync::mpsc::Receiver, time::Instant}; +use tokio_util::sync::CancellationToken; use super::{ hardware::{Hardware, HardwareCommand, HardwareEvent, HardwareWriteCmd}, @@ -43,13 +44,14 @@ pub struct DeviceTaskConfig { /// /// Returns immediately after spawning the task. pub fn spawn_device_task( + task_scope: &TaskScope, hardware: Arc, _handler: Arc, config: DeviceTaskConfig, mut command_receiver: Receiver>, ) { - buttplug_core::spawn!("DeviceTask", async move { - run_device_task(hardware, config, &mut command_receiver).await; + task_scope.spawn("io", move |token| async move { + run_device_task(hardware, config, &mut command_receiver, token).await; }); } @@ -61,6 +63,7 @@ async fn run_device_task( hardware: Arc, config: DeviceTaskConfig, command_receiver: &mut Receiver>, + token: CancellationToken, ) { let mut hardware_events = hardware.event_stream(); let device_wait_duration = config.message_gap; @@ -115,6 +118,12 @@ async fn run_device_task( select! { biased; + // Priority 0: Cooperative cancellation - wins over new work. + _ = token.cancelled() => { + info!("Device task cancelled, shutting down"); + return; + } + // Priority 1: Incoming commands msg = command_receiver.recv() => { let Some(commands) = msg else { diff --git a/crates/buttplug_server/src/device/protocol.rs b/crates/buttplug_server/src/device/protocol.rs index e624b5706..83cc94b8b 100644 --- a/crates/buttplug_server/src/device/protocol.rs +++ b/crates/buttplug_server/src/device/protocol.rs @@ -10,6 +10,7 @@ use buttplug_core::{ errors::ButtplugDeviceError, message::{InputReadingV4, InputType, InputValue, OutputCommand}, + util::task::TaskScope, }; use buttplug_server_device_config::{ Endpoint, @@ -372,6 +373,7 @@ pub trait ProtocolHandler: Sync + Send { _feature_index: u32, _feature_id: Uuid, _sensor_type: InputType, + _task_scope: TaskScope, ) -> BoxFuture<'_, Result<(), ButtplugDeviceError>> { future::ready(Err(ButtplugDeviceError::UnhandledCommand( "Command not implemented for this protocol: InputCmd (Subscribe)".to_string(), diff --git a/crates/buttplug_server/src/device/protocol_impl/galaku.rs b/crates/buttplug_server/src/device/protocol_impl/galaku.rs index f951ba2fb..57fed8654 100644 --- a/crates/buttplug_server/src/device/protocol_impl/galaku.rs +++ b/crates/buttplug_server/src/device/protocol_impl/galaku.rs @@ -14,6 +14,7 @@ use futures_util::{FutureExt, future}; use buttplug_core::errors::ButtplugDeviceError; use buttplug_core::message::{InputReadingV4, InputType, InputTypeReading, InputValue}; +use buttplug_core::util::task::TaskScope; use buttplug_server_device_config::Endpoint; use buttplug_server_device_config::{ @@ -245,6 +246,7 @@ impl ProtocolHandler for Galaku { _feature_index: u32, feature_id: Uuid, sensor_type: InputType, + _task_scope: TaskScope, ) -> BoxFuture<'_, Result<(), ButtplugDeviceError>> { match sensor_type { InputType::Battery => { diff --git a/crates/buttplug_server/src/device/protocol_impl/kgoal_boost.rs b/crates/buttplug_server/src/device/protocol_impl/kgoal_boost.rs index 7e99fef18..c21f1a796 100644 --- a/crates/buttplug_server/src/device/protocol_impl/kgoal_boost.rs +++ b/crates/buttplug_server/src/device/protocol_impl/kgoal_boost.rs @@ -15,7 +15,7 @@ use crate::{ use buttplug_core::{ errors::ButtplugDeviceError, message::{InputReadingV4, InputType, InputValue}, - util::stream::convert_broadcast_receiver_to_stream, + util::{stream::convert_broadcast_receiver_to_stream, task::TaskScope}, }; use buttplug_server_device_config::Endpoint; use futures::{ @@ -65,6 +65,7 @@ impl ProtocolHandler for KGoalBoost { feature_index: u32, feature_id: Uuid, _sensor_type: InputType, + task_scope: TaskScope, ) -> BoxFuture<'_, Result<(), ButtplugDeviceError>> { let sensors = self.subscribed_sensors.load(Ordering::Relaxed); if (sensors & (1 << feature_index as u8)) > 0 { @@ -90,9 +91,19 @@ impl ProtocolHandler for KGoalBoost { let stream_sensors = stream_sensors.clone(); info!("Starting Kgoal subscription"); // If we subscribe successfully, we need to set up our event handler. - buttplug_core::spawn!("Kgoal subscription event handler", async move { + task_scope.spawn_and_hold("kgoal-events", move |token| async move { let mut cached_values = vec![0u32, 0u32]; - while let Ok(info) = hardware_stream.recv().await { + loop { + let info = tokio::select! { + biased; + _ = token.cancelled() => return, + info = hardware_stream.recv() => { + let Ok(info) = info else { + return; + }; + info + } + }; let subscribed_sensors = stream_sensors.load(Ordering::Relaxed); // If we have no receivers, quit. if sender.receiver_count() == 0 || subscribed_sensors == 0 { diff --git a/crates/buttplug_server/src/device/server_device_manager.rs b/crates/buttplug_server/src/device/server_device_manager.rs index 77ce10877..ddb36fbf6 100644 --- a/crates/buttplug_server/src/device/server_device_manager.rs +++ b/crates/buttplug_server/src/device/server_device_manager.rs @@ -36,7 +36,7 @@ use buttplug_core::{ DeviceListV4, StopCmdV4, }, - util::stream::convert_broadcast_receiver_to_stream, + util::{stream::convert_broadcast_receiver_to_stream, task::TaskScope}, }; use buttplug_server_device_config::{DeviceConfigurationManager, UserDeviceIdentifier}; use dashmap::DashMap; @@ -54,7 +54,6 @@ use std::{ }, }; use tokio::sync::{broadcast, mpsc}; -use tokio_util::sync::CancellationToken; #[derive(Debug)] pub(super) enum DeviceManagerCommand { @@ -175,7 +174,8 @@ impl ServerDeviceManagerBuilder { } let devices = Arc::new(DashMap::new()); - let loop_cancellation_token = CancellationToken::new(); + let task_scope = TaskScope::root("device-manager"); + let devices_scope = task_scope.child("devices"); let output_sender = broadcast::channel(255).0; let output_observation_sender = if self.emit_output_observations { @@ -184,24 +184,31 @@ impl ServerDeviceManagerBuilder { None }; - let mut event_loop = ServerDeviceManagerEventLoop::new( - comm_managers, - self.device_configuration_manager.clone(), - devices.clone(), - loop_cancellation_token.child_token(), - output_sender.clone(), - device_event_receiver, - device_command_receiver, - output_observation_sender.clone(), - ); - buttplug_core::spawn!("ServerDeviceManager event loop", async move { + // Clone everything the event loop needs, since the originals are still + // required to construct the ServerDeviceManager below. + let device_configuration_manager = self.device_configuration_manager.clone(); + let devices_clone = devices.clone(); + let output_sender_clone = output_sender.clone(); + let output_observation_sender_clone = output_observation_sender.clone(); + task_scope.spawn("event-loop", move |token| async move { + let mut event_loop = ServerDeviceManagerEventLoop::new( + comm_managers, + device_configuration_manager, + devices_clone, + token, + devices_scope, + output_sender_clone, + device_event_receiver, + device_command_receiver, + output_observation_sender_clone, + ); event_loop.run().await; }); Ok(ServerDeviceManager { device_configuration_manager: self.device_configuration_manager.clone(), devices, device_command_sender, - loop_cancellation_token, + task_scope, running: Arc::new(AtomicBool::new(true)), output_sender, output_observation_sender, @@ -216,7 +223,7 @@ pub struct ServerDeviceManager { #[getset(get = "pub(crate)")] devices: Arc>, device_command_sender: mpsc::Sender, - loop_cancellation_token: CancellationToken, + task_scope: TaskScope, running: Arc, output_sender: broadcast::Sender, output_observation_sender: Option>, @@ -236,6 +243,15 @@ impl ServerDeviceManager { .map(|sender| convert_broadcast_receiver_to_stream(sender.subscribe())) } + /// The hierarchical path of this manager's root Task Scope + /// (e.g. "device-manager-3"). Every task this manager spawns lives at or + /// under this prefix in the global Task Registry. Useful for scoping + /// registry queries to a single manager instance — the registry is + /// process-global, so parallel managers must filter by their own prefix. + pub fn scope_path(&self) -> &str { + self.task_scope.path() + } + fn start_scanning(&self) -> ButtplugServerResultFuture { let command_sender = self.device_command_sender.clone(); async move { @@ -372,7 +388,13 @@ impl ServerDeviceManager { self.running.store(false, Ordering::Relaxed); let stop_scanning = self.stop_scanning(); let stop_devices = self.stop_devices(&StopCmdV4::default()); - let token = self.loop_cancellation_token.clone(); + // TaskScope is not Clone, but its CancellationToken is. Capture the token + // and path synchronously, but DO NOT cancel yet: device io tasks select + // biased on their token, so cancelling before the stop/disconnect commands + // drain would drop those queued commands and leave devices running. Run + // cleanup first, then cancel, then await the subtree draining. + let token = self.task_scope.token().clone(); + let scope_path = self.task_scope.path().to_owned(); async move { // Force stop scanning, otherwise we can disconnect and instantly try to reconnect while // cleaning up if we're still scanning. @@ -381,7 +403,12 @@ impl ServerDeviceManager { for device in devices.iter() { device.value().disconnect().await?; } + // Cleanup commands have drained; now cancel the scope and wait for every + // task under it to deregister. token.cancel(); + buttplug_core::util::task::registry() + .wait_empty_under(&scope_path) + .await; Ok(message::OkV0::default().into()) } .boxed() @@ -391,6 +418,9 @@ impl ServerDeviceManager { impl Drop for ServerDeviceManager { fn drop(&mut self) { info!("Dropping device manager!"); - self.loop_cancellation_token.cancel(); + // Explicitly cancel the scope here for clarity and to trigger shutdown + // eagerly (before field drop order kicks in). Field drop would also cancel + // via TaskScope::drop, but the explicit call makes the intent obvious. + self.task_scope.cancel(); } } diff --git a/crates/buttplug_server/src/device/server_device_manager_event_loop.rs b/crates/buttplug_server/src/device/server_device_manager_event_loop.rs index 6300586df..0635bffbc 100644 --- a/crates/buttplug_server/src/device/server_device_manager_event_loop.rs +++ b/crates/buttplug_server/src/device/server_device_manager_event_loop.rs @@ -23,6 +23,7 @@ use crate::device::{ hardware::communication::{HardwareCommunicationManager, HardwareCommunicationManagerEvent}, protocol::ProtocolManager, }; +use buttplug_core::util::task::TaskScope; use dashmap::{DashMap, DashSet}; use futures::{FutureExt, future}; use std::sync::Arc; @@ -68,6 +69,9 @@ pub(super) struct ServerDeviceManagerEventLoop { connecting_devices: Arc>, /// Cancellation token for the event loop loop_cancellation_token: CancellationToken, + /// Scope owning all per-device task subtrees. Each device's tasks are spawned + /// through a `device-{...}` child of this scope. + devices_scope: TaskScope, /// Protocol map, for mapping user definitions to protocols protocol_manager: ProtocolManager, /// Optional sender for output observations, None when disabled @@ -81,6 +85,7 @@ impl ServerDeviceManagerEventLoop { device_config_manager: Arc, device_map: Arc>, loop_cancellation_token: CancellationToken, + devices_scope: TaskScope, server_sender: broadcast::Sender, device_comm_receiver: mpsc::Receiver, device_command_receiver: mpsc::Receiver, @@ -99,6 +104,7 @@ impl ServerDeviceManagerEventLoop { scanning_state: ScanningState::Idle, connecting_devices: Arc::new(DashSet::new()), loop_cancellation_token, + devices_scope, protocol_manager: ProtocolManager::default(), output_observation_sender, } @@ -292,44 +298,78 @@ impl ServerDeviceManagerEventLoop { let device_config_manager = self.device_config_manager.clone(); let connecting_devices = self.connecting_devices.clone(); let output_observation_sender = self.output_observation_sender.clone(); - let span = info_span!( - "device creation", - name = tracing::field::display(name), - address = tracing::field::display(address.clone()) - ); // Clone sender again for the forwarding task that build_device_handle will spawn let device_event_sender_for_forwarding = self.device_event_sender.clone(); - buttplug_core::util::async_manager::spawn( - async move { - match build_device_handle( - device_config_manager, - creator, - protocol_specializers, - device_event_sender_for_forwarding, - output_observation_sender, - ) - .await - { - Ok(device_handle) => { - if device_event_sender_clone - .send(InternalDeviceEvent::Connected(device_handle)) - .await - .is_err() - { - error!( - "Device manager disappeared before connection established, device will be dropped." - ); - } + // Create the per-device scope before spawning the bringup task. The + // device index is not known until identification completes inside + // build_device_handle, so both scopes are keyed by the device's stable + // address. + let device_scope = self.devices_scope.child(&format!("device-{address}")); + + // Spawn bringup through devices_scope so it is registered in the Task + // Registry for the duration of the connection attempt. This closes the + // shutdown race: wait_empty_under on the devices scope can only return + // zero after the bringup task deregisters, which happens *after* + // build_device_handle returns OR after cancellation is observed. + // build_device_handle registers the io and event-forwarding tasks (via + // task_scope.spawn) synchronously before returning, so on the success + // path the registry count never momentarily hits zero while work + // remains: + // 1. bringup registers (here) + // 2. io task registers (inside build_device_handle → spawn_device_task) + // 3. event-forwarding task registers (inside build_device_handle) + // 4. build_device_handle returns + // 5. bringup deregisters + // Step 2–3 happen before step 5, guaranteeing correct ordering. + // + // build_device_handle can stall indefinitely (e.g. a BLE connect that + // never completes), so the bringup MUST honor its cancellation token — + // otherwise ServerDeviceManager::shutdown's wait_empty_under would hang + // forever waiting for this task to deregister. We select on the token; + // on cancellation we drop the build_device_handle future, which drops + // device_scope and thereby cancels anything it has already spawned. + self.devices_scope.spawn( + &format!("bringup-{address}"), + move |token| async move { + tokio::select! { + biased; + _ = token.cancelled() => { + info!( + "Device bringup for {address} cancelled before connection completed." + ); } - Err(e) => { - error!("Device errored while trying to connect: {:?}", e); + result = build_device_handle( + device_config_manager, + creator, + protocol_specializers, + device_event_sender_for_forwarding, + output_observation_sender, + device_scope, + ) => { + match result { + Ok(device_handle) => { + if device_event_sender_clone + .send(InternalDeviceEvent::Connected(device_handle)) + .await + .is_err() + { + error!( + "Device manager disappeared before connection established, device will be dropped." + ); + } + } + Err(e) => { + error!("Device errored while trying to connect: {:?}", e); + } + } } } + // Runs on every path (success, error, AND cancellation): the address + // must always leave the connecting set so a future scan can retry. connecting_devices.remove(&address); }, - span, ); } } diff --git a/crates/buttplug_server/src/ping_timer.rs b/crates/buttplug_server/src/ping_timer.rs index 4b7ec5a9d..2fece8dba 100644 --- a/crates/buttplug_server/src/ping_timer.rs +++ b/crates/buttplug_server/src/ping_timer.rs @@ -5,19 +5,19 @@ // Licensed under the BSD 3-Clause license. See LICENSE file in the project root // for full license information. -use buttplug_core::util::async_manager; +use buttplug_core::util::{async_manager, task::TaskScope}; use futures::Future; use std::{sync::Arc, time::Duration}; use tokio::{ select, sync::{Mutex, mpsc}, }; +use tokio_util::sync::CancellationToken; pub enum PingMessage { Ping, StartTimer, StopTimer, - End, } /// Internal ping timer task that monitors for ping timeouts. @@ -26,6 +26,7 @@ async fn ping_timer( max_ping_time: u32, mut ping_msg_receiver: mpsc::Receiver, on_ping_timeout: Arc>>, + token: CancellationToken, ) where F: FnOnce() + Send + 'static, { @@ -33,6 +34,9 @@ async fn ping_timer( let mut pinged = false; loop { select! { + _ = token.cancelled() => { + return; + } _ = async_manager::sleep(Duration::from_millis(max_ping_time.into())) => { if started { if !pinged { @@ -53,7 +57,6 @@ async fn ping_timer( PingMessage::StartTimer => started = true, PingMessage::StopTimer => started = false, PingMessage::Ping => pinged = true, - PingMessage::End => break, } } }; @@ -63,19 +66,8 @@ async fn ping_timer( pub struct PingTimer { max_ping_time: u32, ping_msg_sender: mpsc::Sender, -} - -impl Drop for PingTimer { - fn drop(&mut self) { - // This cannot block, otherwise it will throw in WASM contexts on - // destruction. We must use send(), not blocking_send(). - let sender = self.ping_msg_sender.clone(); - buttplug_core::spawn!("PingTimerDrop", async move { - if sender.send(PingMessage::End).await.is_err() { - debug!("Receiver does not exist, assuming ping timer event loop already dead."); - } - }); - } + // Dropping the timer drops the scope, which cancels the timer task. + _task_scope: TaskScope, } impl PingTimer { @@ -84,19 +76,21 @@ impl PingTimer { /// The callback is called once when the ping timer expires without receiving /// a ping message. If max_ping_time is 0, the timer is disabled and the /// callback will never be called. - pub fn new(max_ping_time: u32, on_ping_timeout: Option) -> Self + pub fn new(max_ping_time: u32, on_ping_timeout: Option, task_scope: TaskScope) -> Self where F: FnOnce() + Send + 'static, { let (sender, receiver) = mpsc::channel(256); if max_ping_time > 0 { let callback = Arc::new(Mutex::new(on_ping_timeout)); - let fut = ping_timer(max_ping_time, receiver, callback); - buttplug_core::spawn!("PingTimer", fut); + task_scope.spawn("timer", move |token| { + ping_timer(max_ping_time, receiver, callback, token) + }); } Self { max_ping_time, ping_msg_sender: sender, + _task_scope: task_scope, } } diff --git a/crates/buttplug_server/src/server.rs b/crates/buttplug_server/src/server.rs index 61642068a..f428a1735 100644 --- a/crates/buttplug_server/src/server.rs +++ b/crates/buttplug_server/src/server.rs @@ -35,7 +35,7 @@ use buttplug_core::{ StopCmdV4, StopScanningV0, }, - util::stream::convert_broadcast_receiver_to_stream, + util::{stream::convert_broadcast_receiver_to_stream, task::TaskScope}, }; use futures::{ Stream, @@ -90,6 +90,9 @@ pub struct ButtplugServer { /// Broadcaster for server events. Receivers for this are handed out through the /// [ButtplugServer::event_stream()] method. output_sender: broadcast::Sender, + /// Root scope owning all tasks spawned for this server instance (e.g. the + /// ping timer). Cancelled on shutdown. + task_scope: TaskScope, } impl std::fmt::Debug for ButtplugServer { @@ -110,6 +113,7 @@ impl ButtplugServer { device_manager: Arc, state: Arc>, output_sender: broadcast::Sender, + task_scope: TaskScope, ) -> Self { ButtplugServer { server_name: server_name.to_owned(), @@ -118,6 +122,7 @@ impl ButtplugServer { device_manager, state, output_sender, + task_scope, } } @@ -233,8 +238,24 @@ impl ButtplugServer { pub fn shutdown(&self) -> ButtplugServerResultFuture { let device_manager = self.device_manager.clone(); - //let disconnect_future = self.disconnect(); - async move { device_manager.shutdown().await }.boxed() + // Capture the token and path synchronously, but cancel only after the + // device manager has finished its own cleanup. The server scope only owns + // the ping timer today, so cancelling early is harmless, but we align with + // the device manager's cleanup-before-cancel ordering for consistency. + let token = self.task_scope.token().clone(); + let scope_path = self.task_scope.path().to_owned(); + async move { + let result = device_manager.shutdown().await; + token.cancel(); + // Await only this server's own subtree. A shared device manager + // (with_shared_device_manager) outlives this server, so its subtree is + // NOT awaited here -- device_manager.shutdown() handles its own. + buttplug_core::util::task::registry() + .wait_empty_under(&scope_path) + .await; + result + } + .boxed() } /// Sends a [ButtplugClientMessage] to be parsed by the server (for handshake or ping), or passed diff --git a/crates/buttplug_server/src/server_builder.rs b/crates/buttplug_server/src/server_builder.rs index d5ab30e5b..5763f384e 100644 --- a/crates/buttplug_server/src/server_builder.rs +++ b/crates/buttplug_server/src/server_builder.rs @@ -14,6 +14,7 @@ use super::{ use buttplug_core::{ errors::*, message::{self, ButtplugServerMessageV4, StopCmdV4}, + util::task::TaskScope, }; use buttplug_server_device_config::DeviceConfigurationManagerBuilder; use std::sync::{Arc, RwLock}; @@ -98,12 +99,16 @@ impl ButtplugServerBuilder { let ping_time = self.max_ping_time.unwrap_or(0); + // Root scope owning all tasks spawned for this server instance. + let task_scope = TaskScope::root("server"); + // Create the ping timeout callback if ping time is configured. // The callback handles: updating state, stopping devices, and sending error. let ping_timeout_callback = if ping_time > 0 { let state_clone = state.clone(); let device_manager_clone = self.device_manager.clone(); let output_sender_clone = output_sender.clone(); + let ping_timeout_scope = task_scope.child("ping-timeout"); Some(move || { error!("Ping out signal received, stopping server"); @@ -112,8 +117,10 @@ impl ButtplugServerBuilder { let mut state_guard = state_clone.write().expect("State lock poisoned"); *state_guard = ConnectionState::PingedOut; } - // Stop all devices (spawn async task since callback is sync) - buttplug_core::spawn!("PingTimeoutStopDevices", async move { + // Stop all devices (spawn async task since callback is sync). The + // callback is FnOnce, so the child scope moves in and is consumed by + // spawn_and_hold, keeping it alive for the duration of the task. + ping_timeout_scope.spawn_and_hold("stop-devices", move |_token| async move { if let Err(e) = device_manager_clone .stop_devices(&StopCmdV4::default()) .await @@ -135,7 +142,11 @@ impl ButtplugServerBuilder { None }; - let ping_timer = Arc::new(PingTimer::new(ping_time, ping_timeout_callback)); + let ping_timer = Arc::new(PingTimer::new( + ping_time, + ping_timeout_callback, + task_scope.child("ping-timer"), + )); // Assuming everything passed, return the server. Ok(ButtplugServer::new( @@ -145,6 +156,7 @@ impl ButtplugServerBuilder { self.device_manager.clone(), state, output_sender, + task_scope, )) } } diff --git a/crates/buttplug_tests/CLAUDE.md b/crates/buttplug_tests/CLAUDE.md index 31f5b50a1..e5408baa3 100644 --- a/crates/buttplug_tests/CLAUDE.md +++ b/crates/buttplug_tests/CLAUDE.md @@ -54,4 +54,5 @@ Tests run across multiple protocol spec versions (v0–v4) via version-specific - `test_message_downgrades.rs` — Protocol version downgrade path tests - `test_disabled_device_features.rs` — Tests for user config feature disabling - `test_output_observations.rs` — Integration tests for output observability (observation stream, filtering, multi-device, enable/disable) +- `test_task_lifecycle.rs` — Integration test asserting the global Task Registry has no live tasks under the server's scope after server shutdown (verifies `TaskScope` ownership leaves no leaked tasks) - `test_websocket_connectors.rs` / `test_websocket_device_comm_manager.rs` — WebSocket transport integration tests diff --git a/crates/buttplug_tests/tests/test_task_lifecycle.rs b/crates/buttplug_tests/tests/test_task_lifecycle.rs new file mode 100644 index 000000000..4425c4443 --- /dev/null +++ b/crates/buttplug_tests/tests/test_task_lifecycle.rs @@ -0,0 +1,298 @@ +// Buttplug Rust Source Code File - See https://buttplug.io for more info. +// +// Copyright 2016-2026 Nonpolynomial Labs LLC. All rights reserved. +// +// Licensed under the BSD 3-Clause license. See LICENSE file in the project root +// for full license information. + +mod util; + +use buttplug_core::{ + message::{ + BUTTPLUG_CURRENT_API_MAJOR_VERSION, + BUTTPLUG_CURRENT_API_MINOR_VERSION, + ButtplugServerMessageV4, + OutputCmdV4, + OutputCommand, + OutputValue, + RequestServerInfoV4, + StartScanningV0, + }, + util::task::registry, +}; +use buttplug_server::message::ButtplugClientMessageVariant; +use futures::{StreamExt, pin_mut}; +use std::time::Duration; +use util::stalling_device_communication_manager::StallingDeviceCommunicationManagerBuilder; +use util::{test_server_with_comm_manager, test_server_with_device}; + +/// Bring a real (test-harness) device online and confirm that, once the server +/// is shut down and dropped, no tasks remain registered under the scope tree. +/// +/// This exercises the *device task* lifecycle specifically: a connected device +/// spawns an `io` task whose only exit path is its `token.cancelled()` select +/// arm. If scope cancellation were removed, that task would leak and this test +/// would fail with a non-empty leaked-task list. (A server with no device is +/// insufficient — the device-manager event loop also exits on channel drop, so +/// it cannot prove cancellation actually fires.) +#[tokio::test] +async fn test_server_shutdown_leaves_no_tasks() { + let baseline: Vec = registry().snapshot().into_iter().map(|t| t.path).collect(); + + // Hold the channel so the device stays connected. + let (server, _channel) = test_server_with_device("Massage Demo"); + + let recv = server.server_version_event_stream(); + pin_mut!(recv); + + server + .parse_message(ButtplugClientMessageVariant::V4( + RequestServerInfoV4::new( + "Task Lifecycle Test", + BUTTPLUG_CURRENT_API_MAJOR_VERSION, + BUTTPLUG_CURRENT_API_MINOR_VERSION, + ) + .into(), + )) + .await + .expect("server info request should succeed"); + server + .parse_message(ButtplugClientMessageVariant::V4( + StartScanningV0::default().into(), + )) + .await + .expect("start scanning should succeed"); + + // Wait until the device is actually connected — this is what spawns the + // per-device `io` task that we want to prove gets cleaned up. + tokio::time::timeout(Duration::from_secs(5), async { + while let Some(msg) = recv.next().await { + if let ButtplugServerMessageV4::DeviceList(list) = msg { + if !list.devices().is_empty() { + return; + } + } + } + panic!("device event stream ended before a device connected"); + }) + .await + .expect("timed out waiting for device to connect"); + + // Device is up. Scope all subsequent leak checks to THIS server's own + // device-manager subtree: the registry is process-global, so other tests + // running in parallel must not pollute these assertions. We ask the manager + // for its own scope path directly rather than guessing it from the global + // registry snapshot — guessing is racy, because a concurrent test's + // `device-manager-N` tasks are also "new" relative to our baseline and could + // be picked instead of ours. + let scope_prefix: String = server.device_manager().scope_path().to_owned(); + + // Sanity: the registry must now show this server spawned per-device tasks + // under its own subtree. + let new_tasks: Vec = registry() + .snapshot() + .into_iter() + .map(|t| t.path) + .filter(|p| !baseline.contains(p) && p.starts_with(&format!("{scope_prefix}/"))) + .collect(); + assert!( + !new_tasks.is_empty(), + "expected scope-spawned tasks under {scope_prefix} after a device connected" + ); + assert!( + new_tasks.iter().any(|p| p.contains("/device-")), + "expected a per-device task in the registry, got: {new_tasks:?}" + ); + + // `shutdown()` is contractually responsible for draining every task it + // spawned: it cancels the scope tree and awaits the registry going empty + // under its path before returning. We assert emptiness *before* dropping the + // server, so that ordinary drop-of-channels teardown cannot mask a missing + // cancellation arm. If any scope-spawned task fails to observe cancellation, + // it will still be parked on its event stream / receiver here and show up as + // leaked. We hold `_channel` alive across this check precisely so the device + // event stream does not close on its own. + server.shutdown().await.expect("server shutdown errored"); + + let leaked = registry().live_count_under(&scope_prefix); + assert_eq!( + leaked, 0, + "shutdown() returned but {leaked} task(s) are still registered under {scope_prefix}" + ); + + // Dropping the server must not resurrect or strand anything either. Give a + // short grace period and confirm this server's subtree stays empty. + drop(server); + drop(_channel); + tokio::time::timeout(Duration::from_secs(5), async { + loop { + if registry().live_count_under(&scope_prefix) == 0 { + return; + } + tokio::time::sleep(Duration::from_millis(50)).await; + } + }) + .await + .unwrap_or_else(|_| { + let leaked = registry().live_count_under(&scope_prefix); + panic!("leaked {leaked} task(s) under {scope_prefix} after drop"); + }); +} + +/// Shutdown-under-load smoke test: with a device connected, scanning still +/// active, and the device in a non-zero output state, `shutdown()` must drive +/// its cleanup (stop_scanning / stop_devices / per-device disconnect) through +/// the live event loop, drain every task under its scope, and return Ok within +/// a bounded time — it must neither hang nor strand tasks. +/// +/// SCOPE / what this does NOT verify: this is a smoke test, not a regression +/// test for shutdown *ordering*. It does not prove that cleanup runs *before* +/// scope cancellation — reverting the cleanup-before-cancel ordering leaves this +/// test green, because the contract it checks (shutdown completes and its +/// subtree drains) holds under both orderings with this harness. The +/// cleanup-before-cancel ordering is the correct production behavior, but a +/// test that goes RED on that specific regression is not achievable here (see +/// NOTE). This test guards against the coarser failure mode: a shutdown that +/// hangs or leaks tasks when invoked under realistic load. +/// +/// NOTE on why ordering can't be observed here: the stronger "observe the +/// device's actual stop write" assertion is infeasible with this harness. The +/// test hardware sets a 1ms message_gap (see TestHardwareConnector::specialize), +/// so the device io task batches commands; during shutdown the per-device +/// `disconnect()` fires a `Disconnected` hardware event that tears the io task +/// down inside that 1ms batch window, dropping the pending (batched) stop write +/// regardless of cancel ordering. That teardown race is independent of any +/// ordering bug, so a write-observation assertion is inherently flaky here. An +/// instrumented-ordering variant was also attempted and found inherently flaky +/// with this harness, so it is deliberately not pursued. +#[tokio::test] +async fn test_shutdown_under_load_drains_subtree() { + // Hold the channel so the device stays connected through shutdown. + let (server, _channel) = test_server_with_device("Massage Demo"); + + let recv = server.server_version_event_stream(); + pin_mut!(recv); + + server + .parse_message(ButtplugClientMessageVariant::V4( + RequestServerInfoV4::new( + "Shutdown Under Load Test", + BUTTPLUG_CURRENT_API_MAJOR_VERSION, + BUTTPLUG_CURRENT_API_MINOR_VERSION, + ) + .into(), + )) + .await + .expect("server info request should succeed"); + + // Start scanning and leave it running: shutdown's stop_scanning must drain + // through the event loop before the scope is cancelled. + server + .parse_message(ButtplugClientMessageVariant::V4( + StartScanningV0::default().into(), + )) + .await + .expect("start scanning should succeed"); + + // Wait for the device to connect so its io / event-forwarding tasks exist + // under the scope alongside the still-running scanning state. + let device_index = tokio::time::timeout(Duration::from_secs(5), async { + while let Some(msg) = recv.next().await { + if let ButtplugServerMessageV4::DeviceList(list) = msg + && let Some((&idx, _)) = list.devices().iter().next() + { + return idx; + } + } + panic!("device event stream ended before a device connected"); + }) + .await + .expect("timed out waiting for device to connect"); + + // Put the device into an actively-running state so there is real cleanup to + // perform (a non-zero output that StopCmd must reset). + server + .parse_message(ButtplugClientMessageVariant::V4( + OutputCmdV4::new( + device_index, + 0, + OutputCommand::Vibrate(OutputValue::new(50)), + ) + .into(), + )) + .await + .expect("vibrate command should succeed"); + + // Identify this server's own device-manager scope prefix so the leak check + // below is isolated from any other test running in parallel against the + // global registry. We ask the manager directly rather than guessing from the + // global registry snapshot, which would race with concurrent tests' own + // `device-manager-N` roots. + let scope_prefix: String = server.device_manager().scope_path().to_owned(); + + // shutdown() must drive cleanup (stop_scanning + stop_devices + disconnect) + // through the live event loop and only then cancel the scope, returning Ok + // within a bounded time. + tokio::time::timeout(Duration::from_secs(10), server.shutdown()) + .await + .expect("shutdown did not resolve in time — cleanup likely raced against cancellation") + .expect("server shutdown errored"); + + // shutdown() is contractually responsible for draining every task under its + // own scope. Inspect only this server's subtree to stay parallel-safe. + let leaked = registry().live_count_under(&scope_prefix); + assert_eq!( + leaked, 0, + "shutdown() returned but {leaked} task(s) are still registered under {scope_prefix}" + ); +} + +/// Regression test for the cancellable-bringup fix (fix 2): `shutdown()` must +/// not hang when a device bringup is stalled in `connect()`. +/// +/// The stalling comm manager emits one `DeviceFound` on scan; the device-manager +/// event loop spawns a bringup task that awaits `connect()`, which never +/// resolves. `shutdown()` cancels the device-manager scope and then +/// `wait_empty_under`s its subtree. The bringup task only deregisters once it +/// observes cancellation via the `biased` select on its token — without that +/// select it would await `connect()` forever and `shutdown()` would never +/// resolve. +/// +/// RED evidence: replacing the bringup's `move |token|` select with the +/// non-cancellable `move |_token|` form makes this test time out at the 10s +/// bound and fail. With the fix in place it resolves promptly. +#[tokio::test] +async fn test_shutdown_resolves_with_stalled_bringup() { + let server = test_server_with_comm_manager(StallingDeviceCommunicationManagerBuilder::default()); + + server + .parse_message(ButtplugClientMessageVariant::V4( + RequestServerInfoV4::new( + "Stalled Bringup Test", + BUTTPLUG_CURRENT_API_MAJOR_VERSION, + BUTTPLUG_CURRENT_API_MINOR_VERSION, + ) + .into(), + )) + .await + .expect("server info request should succeed"); + + // Kick off scanning so a device is found and a bringup task begins — and then + // stalls in connect(). + server + .parse_message(ButtplugClientMessageVariant::V4( + StartScanningV0::default().into(), + )) + .await + .expect("start scanning should succeed"); + + // Give the bringup task time to spawn and enter (and block in) connect(). + tokio::time::sleep(Duration::from_millis(200)).await; + + // shutdown() must cancel the stalled bringup and return within the bound. + // Without the biased select on the bringup token this hangs forever. + tokio::time::timeout(Duration::from_secs(10), server.shutdown()) + .await + .expect("shutdown hung with a stalled device bringup — bringup is not honoring its cancellation token") + .expect("server shutdown errored"); +} diff --git a/crates/buttplug_tests/tests/util/mod.rs b/crates/buttplug_tests/tests/util/mod.rs index c19a80b54..9f4e07e8c 100644 --- a/crates/buttplug_tests/tests/util/mod.rs +++ b/crates/buttplug_tests/tests/util/mod.rs @@ -15,6 +15,8 @@ pub mod test_device_manager; pub use delay_device_communication_manager::DelayDeviceCommunicationManagerBuilder; #[allow(dead_code)] pub mod channel_transport; +#[allow(dead_code)] +pub mod stalling_device_communication_manager; use buttplug_client::ButtplugClient; use buttplug_client_in_process::ButtplugInProcessClientConnectorBuilder; use buttplug_server::{ diff --git a/crates/buttplug_tests/tests/util/stalling_device_communication_manager.rs b/crates/buttplug_tests/tests/util/stalling_device_communication_manager.rs new file mode 100644 index 000000000..7b21868c4 --- /dev/null +++ b/crates/buttplug_tests/tests/util/stalling_device_communication_manager.rs @@ -0,0 +1,114 @@ +// Buttplug Rust Source Code File - See https://buttplug.io for more info. +// +// Copyright 2016-2026 Nonpolynomial Labs LLC. All rights reserved. +// +// Licensed under the BSD 3-Clause license. See LICENSE file in the project root +// for full license information. + +//! A test comm manager whose device bringup stalls forever in `connect()`. +//! +//! On `start_scanning` it emits a single `DeviceFound` event carrying a +//! [StallingHardwareConnector]. The device-manager event loop spawns a bringup +//! task that awaits `connect()` — which never resolves — modelling a real BLE +//! connect that hangs. This is the precise scenario the cancellable-bringup fix +//! guards: without a `biased` select on the bringup token, `shutdown()`'s +//! `wait_empty_under` would block forever waiting for the bringup task to +//! deregister. + +use async_trait::async_trait; +use buttplug_core::{ButtplugResultFuture, errors::ButtplugDeviceError}; +use buttplug_server::device::hardware::{ + HardwareConnector, + HardwareSpecializer, + communication::{ + HardwareCommunicationManager, + HardwareCommunicationManagerBuilder, + HardwareCommunicationManagerEvent, + }, +}; +use buttplug_server_device_config::{BluetoothLESpecifier, ProtocolCommunicationSpecifier}; +use futures::FutureExt; +use log::error; +use std::collections::HashMap; +use tokio::sync::mpsc::Sender; + +/// A `HardwareConnector` whose `connect()` future never resolves. +#[derive(Debug)] +struct StallingHardwareConnector { + specifier: ProtocolCommunicationSpecifier, +} + +#[async_trait] +impl HardwareConnector for StallingHardwareConnector { + fn specifier(&self) -> ProtocolCommunicationSpecifier { + self.specifier.clone() + } + + async fn connect(&mut self) -> Result, ButtplugDeviceError> { + // Block forever: this models a device connect that hangs and never returns. + // The bringup task must drop this future when its cancellation token fires. + std::future::pending::<()>().await; + unreachable!("stalling connector connect() should never resolve"); + } +} + +#[derive(Default)] +pub struct StallingDeviceCommunicationManagerBuilder {} + +impl HardwareCommunicationManagerBuilder for StallingDeviceCommunicationManagerBuilder { + fn finish( + &mut self, + sender: Sender, + ) -> Box { + Box::new(StallingDeviceCommunicationManager { + device_sender: sender, + }) + } +} + +pub struct StallingDeviceCommunicationManager { + device_sender: Sender, +} + +impl HardwareCommunicationManager for StallingDeviceCommunicationManager { + fn name(&self) -> &'static str { + "StallingDeviceCommunicationManager" + } + + fn start_scanning(&mut self) -> ButtplugResultFuture { + let device_sender = self.device_sender.clone(); + async move { + // "Massage Demo" is a known test device name with a real protocol config, + // so bringup proceeds into connect() (which then stalls). + let specifier = ProtocolCommunicationSpecifier::BluetoothLE( + BluetoothLESpecifier::new_from_device("Massage Demo", &HashMap::new(), &[]), + ); + let connector = StallingHardwareConnector { specifier }; + if device_sender + .send(HardwareCommunicationManagerEvent::DeviceFound { + name: "Massage Demo".to_owned(), + address: "stalling-device-0".to_owned(), + creator: Box::new(connector), + }) + .await + .is_err() + { + error!("Device channel no longer open."); + } + Ok(()) + } + .boxed() + } + + fn stop_scanning(&mut self) -> ButtplugResultFuture { + async { Ok(()) }.boxed() + } + + fn scanning_status(&self) -> bool { + false + } + + fn can_scan(&self) -> bool { + true + } +} diff --git a/crates/intiface_engine/src/engine.rs b/crates/intiface_engine/src/engine.rs index a4a81e44c..81780d02d 100644 --- a/crates/intiface_engine/src/engine.rs +++ b/crates/intiface_engine/src/engine.rs @@ -306,8 +306,15 @@ impl IntifaceEngine { let event_receiver = server.event_stream(); let frontend_clone = frontend.clone(); let stop_child_token = self.stop_token.child_token(); + let emit_task_events = options.emit_task_events(); tokio::spawn(async move { - frontend_server_event_loop(event_receiver, frontend_clone, stop_child_token).await; + frontend_server_event_loop( + event_receiver, + frontend_clone, + stop_child_token, + emit_task_events, + ) + .await; }); } diff --git a/crates/intiface_engine/src/frontend/mod.rs b/crates/intiface_engine/src/frontend/mod.rs index 8f4dd416b..45f45a415 100644 --- a/crates/intiface_engine/src/frontend/mod.rs +++ b/crates/intiface_engine/src/frontend/mod.rs @@ -9,8 +9,9 @@ pub mod process_messages; use crate::error::IntifaceError; use crate::remote_server::ButtplugRemoteServerEvent; use async_trait::async_trait; +use buttplug_core::util::task::{TaskEvent, registry}; use futures::{Stream, StreamExt, pin_mut}; -pub use process_messages::{EngineMessage, IntifaceMessage}; +pub use process_messages::{EngineMessage, IntifaceMessage, TaskListEntry}; use std::sync::Arc; use tokio::{ select, @@ -50,7 +51,19 @@ pub async fn frontend_external_event_loop( connection_cancellation_token.cancel(); info!("Got external stop request"); break; - } + }, + IntifaceMessage::RequestTaskList {} => { + let tasks = registry() + .snapshot() + .into_iter() + .map(|t| TaskListEntry { + id: t.id.value(), + path: t.path, + detached: t.detached, + }) + .collect(); + frontend.send(EngineMessage::TaskList { tasks }).await; + }, }, Err(_) => { info!("Frontend sender dropped, assuming connection lost, breaking."); @@ -70,9 +83,12 @@ pub async fn frontend_server_event_loop( receiver: impl Stream, frontend: Arc, connection_cancellation_token: CancellationToken, + emit_task_events: bool, ) { pin_mut!(receiver); + let mut task_events = emit_task_events.then(|| registry().event_stream()); + loop { select! { maybe_event = receiver.next() => { @@ -113,6 +129,31 @@ pub async fn frontend_server_event_loop( }, } }, + task_event = async { + match task_events.as_mut() { + Some(rx) => rx.recv().await, + None => std::future::pending().await, + } + } => { + match task_event { + Ok(TaskEvent::Started { id, path }) => { + frontend.send(EngineMessage::TaskStarted { id: id.value(), path }).await; + } + Ok(TaskEvent::Ended { id, path, outcome }) => { + frontend.send(EngineMessage::TaskEnded { + id: id.value(), + path, + outcome: format!("{outcome:?}"), + }).await; + } + Err(broadcast::error::RecvError::Lagged(n)) => { + warn!("Task event stream lagged, dropped {} events", n); + } + Err(broadcast::error::RecvError::Closed) => { + // Global registry sender never closes; unreachable in practice. + } + } + }, _ = connection_cancellation_token.cancelled() => { info!("Connection cancellation token activated, breaking from frontend server event loop"); break; diff --git a/crates/intiface_engine/src/frontend/process_messages.rs b/crates/intiface_engine/src/frontend/process_messages.rs index cc9abbacb..2ffc5652d 100644 --- a/crates/intiface_engine/src/frontend/process_messages.rs +++ b/crates/intiface_engine/src/frontend/process_messages.rs @@ -8,6 +8,13 @@ use buttplug_server_device_config::UserDeviceIdentifier; use serde::{Deserialize, Serialize}; +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TaskListEntry { + pub id: u64, + pub path: String, + pub detached: bool, +} + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(tag = "code", rename_all = "snake_case")] pub enum EngineErrorDetail { @@ -62,19 +69,50 @@ pub enum EngineMessage { output_type: String, value: f64, }, + TaskStarted { + id: u64, + path: String, + }, + TaskEnded { + id: u64, + path: String, + /// How the task ended: "Completed" | "Cancelled" | "Panicked" + /// (the Debug rendering of `buttplug_core`'s `TaskOutcome`). + outcome: String, + }, + TaskList { + tasks: Vec, + }, } #[derive(Debug, Clone, Serialize, Deserialize)] pub enum IntifaceMessage { RequestEngineVersion { expected_version: u32 }, Stop {}, + RequestTaskList {}, } #[cfg(test)] mod test { - use super::{EngineErrorDetail, EngineMessage}; + use super::{EngineErrorDetail, EngineMessage, IntifaceMessage}; use serde_json::json; + #[test] + fn test_task_message_serialization() { + let msg = EngineMessage::TaskEnded { + id: 42, + path: "server-1/ping-timer/timer".to_owned(), + outcome: "Cancelled".to_owned(), + }; + let json = serde_json::to_string(&msg).unwrap(); + let back: EngineMessage = serde_json::from_str(&json).unwrap(); + assert!(matches!(back, EngineMessage::TaskEnded { id: 42, .. })); + + let req = r#"{"RequestTaskList":{}}"#; + let parsed: IntifaceMessage = serde_json::from_str(req).unwrap(); + assert!(matches!(parsed, IntifaceMessage::RequestTaskList {})); + } + #[test] fn generic_engine_error_serializes_without_structured_fields() { let message = EngineMessage::EngineError { diff --git a/crates/intiface_engine/src/options.rs b/crates/intiface_engine/src/options.rs index f76e1adaa..ca8fa663a 100644 --- a/crates/intiface_engine/src/options.rs +++ b/crates/intiface_engine/src/options.rs @@ -67,6 +67,8 @@ pub struct EngineOptions { rest_api_port: Option, #[getset(get_copy = "pub")] emit_output_observations: bool, + #[getset(get_copy = "pub")] + emit_task_events: bool, } #[derive(Default, Debug, Clone)] @@ -100,6 +102,7 @@ pub struct EngineOptionsExternal { pub repeater_remote_address: Option, pub rest_api_port: Option, pub emit_output_observations: bool, + pub emit_task_events: bool, } impl From for EngineOptions { @@ -134,6 +137,7 @@ impl From for EngineOptions { repeater_remote_address: other.repeater_remote_address, rest_api_port: other.rest_api_port, emit_output_observations: other.emit_output_observations, + emit_task_events: other.emit_task_events, } } } @@ -297,6 +301,11 @@ impl EngineOptionsBuilder { self } + pub fn emit_task_events(&mut self, value: bool) -> &mut Self { + self.options.emit_task_events = value; + self + } + pub fn finish(&mut self) -> EngineOptions { self.options.clone() }