Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
af7ed36
docs: add Task Scope and Task Registry domain terms
qdot Jun 10, 2026
0bf2483
feat(core): add TaskRegistry for named task introspection
qdot Jun 10, 2026
542e8bd
feat(core): add TaskScope ownership tree with cooperative cancellation
qdot Jun 10, 2026
246e930
feat(core): add spawn_detached escape hatch for unowned tasks
qdot Jun 10, 2026
22e91ba
docs(core): clarify TaskId wrap and TaskRegistry memory high-water be…
qdot Jun 10, 2026
b382128
feat(server): device manager and device tasks owned by TaskScopes
qdot Jun 10, 2026
0c208ae
docs: cargo fmt must use nightly toolchain
qdot Jun 10, 2026
fa20399
feat(server): scope-owned ping timer, delete Drop-spawn hack
qdot Jun 10, 2026
58ebc49
feat(server): protocol subscription tasks owned by device scopes
qdot Jun 10, 2026
45bcd23
test: assert no leaked tasks after server shutdown
qdot Jun 10, 2026
2abb944
fix(server): address code review issues from phase 2 TaskScope migration
qdot Jun 10, 2026
91c03d1
feat(engine): task event and task list message types
qdot Jun 10, 2026
c449e77
feat(engine): stream task lifecycle events and snapshots to frontends
qdot Jun 10, 2026
7d2c20c
docs: update project context for task-scope-lifecycle
qdot Jun 10, 2026
f87658f
fix: address code review feedback on task-scope-lifecycle branch
qdot Jun 10, 2026
6efcd78
fix(server): run shutdown cleanup before task scope cancellation
qdot Jun 10, 2026
a53fcb2
fix(server): make device bringup cancellable so shutdown cannot hang
qdot Jun 10, 2026
49f8df4
fix(core): deregister tasks on panic via drop guard
qdot Jun 10, 2026
62138cf
test(core): assert held task reports Completed outcome
qdot Jun 10, 2026
3e8231f
test(server): reframe shutdown smoke test, add stalled-bringup regres…
qdot Jun 10, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions CLAUDE.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# CLAUDE.md

Last verified: 2026-05-19
Last verified: 2026-06-09

This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.

Expand All @@ -11,10 +11,12 @@ cargo build # Debug build
cargo build --release # Release build (LTO enabled)
cargo test # Run all tests
cargo test -p buttplug_server # Run tests for specific crate
cargo fmt --all -- --check # Check formatting
cargo fmt # Auto-format (2-space indent, edition 2024)
cargo +nightly fmt --all -- --check # Check formatting (MUST use nightly)
cargo +nightly fmt # Auto-format (2-space indent, edition 2024)
```

**Formatting gotcha**: rustfmt.toml uses nightly-only options (`imports_layout`, `empty_item_single_line`). Running `cargo fmt` on the STABLE toolchain silently ignores them and rewrites the entire workspace into the wrong style (~190 files of import-collapsing churn). Always use `cargo +nightly fmt`. CI checks formatting with nightly.

**Linux dependencies**: `libudev-dev`, `libusb-1.0-0-dev` (for serial/HID support)

**WASM build**:
Expand Down Expand Up @@ -54,7 +56,7 @@ Buttplug is a framework for interfacing with intimate hardware devices. It uses

**Message-Based Protocol**: All client-server communication uses versioned JSON messages (v0-v4). Version negotiation happens during handshake.

**Async Architecture**: Heavy use of tokio channels (mpsc, broadcast, oneshot) for communication between components. Runtime abstraction supports tokio (production) and WASM.
**Async Architecture**: Heavy use of tokio channels (mpsc, broadcast, oneshot) for communication between components. Runtime abstraction supports tokio (production) and WASM. Spawned tasks are owned by `TaskScope`s rather than fire-and-forget `spawn!` (see Task Lifecycle below).

**Device Lifecycle**:
```
Expand Down Expand Up @@ -93,6 +95,17 @@ Simulated devices allow testing the full device lifecycle without real hardware.
- Validation rejects unknown archetypes and duplicate addresses at config build time
- `SimulatedProtocol` is a no-op handler; `SimulatedHardwareConnector` creates in-memory endpoints

**Task Lifecycle** (`buttplug_core::util::task`):
Spawned async tasks are owned by a `TaskScope` ownership tree rather than fire-and-forget, giving cooperative cancellation and global introspection. Key contracts:
- `TaskScope::root(name)` makes a root scope (path gets a unique numeric suffix, e.g. `server-2`, so parallel instances don't collide); `.child(name)` derives a sub-scope whose token is a child of the parent's.
- `scope.spawn(name, |token| async ...)` spawns a task owned by the scope; long-running tasks MUST `select!` on the passed token. Cancelling or dropping a scope cancels its whole subtree.
- `scope.spawn_and_hold(name, ...)` consumes the scope into the task, so drop-cancel can't fire before the task runs (used for `FnOnce` callbacks like ping-timeout and protocol subscription handlers).
- `scope.shutdown().await` cancels the subtree and waits until every task under it has deregistered (wrap in a timeout if tasks may be uncooperative).
- `spawn_detached(name, fut)` is a rare escape hatch: registered under `detached/{name}` but uncancellable. Prefer scopes.
- `TaskRegistry` (global, via `registry()`) records every live task: `snapshot()`, `live_count_under(prefix)` (segment-aware prefix match), `event_stream()` (`TaskEvent::Started`/`Ended`), `wait_empty_under(prefix)`.
- Ownership in the server: `ButtplugServer` owns a `server` root scope; `ServerDeviceManager` owns a `device-manager` root scope with per-device child scopes (io/event-forwarding/bringup); `PingTimer` is scope-owned (the old `PingMessage::End` + Drop-spawn shutdown hack is gone). `ProtocolHandler::handle_input_subscribe_cmd` now takes a `TaskScope` param.
- `intiface_engine` exposes this to frontends when `emit_task_events` is set: registry events forward as `EngineMessage::TaskStarted`/`TaskEnded`, and `IntifaceMessage::RequestTaskList` returns `EngineMessage::TaskList` (`Vec<TaskListEntry>`).

## Agent skills

### Issue tracker
Expand Down
8 changes: 8 additions & 0 deletions CONTEXT.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,14 @@ _Avoid_: "Discovery" as a distinct stage from identification — they're the sam
An opt-in broadcast of every output command sent to a **Device** — device index, feature index, output type, and value. Used by frontends (e.g. Intiface Central) to visually display real-time device activity, verify hardware behaviour matches commands, and let developers see what *would* happen with simulated devices. Disabled by default to avoid overhead.
_Avoid_: Treating as internal-only debugging; it's a user-facing observability feature.

**Task Scope**:
The owner of spawned async tasks within a module. Every task is spawned through a Task Scope, which links it to a parent, derives its hierarchical name (e.g. `device-manager-1/devices/device-3/io`), registers it in the **Task Registry**, and hands it a cooperative cancellation token. Dropping a scope cancels its children. Tasks cannot be spawned without a parent scope.
_Avoid_: "Detached task" or bare spawning as the normal pattern; detachment is the rare, explicit exception.

**Task Registry**:
The queryable record of every live task — id, hierarchical path, parent, state. Populated as a side effect of spawning through a **Task Scope**. Exposed in-process for tests and embedders, and to frontends via TaskStarted/TaskEnded **Events** plus a snapshot query (same opt-in pattern as **Output Observation**).
_Avoid_: Treating as internal-only debugging; like Output Observations, it's user-facing observability.

**Command**:
A message from **Client** to **Server** requesting an action — controlling a device, starting a scan, requesting device lists. Always has a non-zero message ID; the server responds with an `Ok` or `Error` using the same ID.
_Avoid_: Referring to server-initiated messages as commands.
Expand Down
2 changes: 2 additions & 0 deletions crates/buttplug_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,5 @@ wasm-bindgen-futures = { version = "0.4.64", optional = true }
wasmtimer = { version = "0.4.3", optional = true }
smallvec = { version = "1.15.1", features = ["serde", "const_generics"] }
enumflags2 = "0.7.12"
tokio-util = "0.7.18"
dashmap = "6.1.0"
1 change: 1 addition & 0 deletions crates/buttplug_core/src/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub mod range;
pub mod serializers;
pub mod small_vec_enum_map;
pub mod stream;
pub mod task;

#[cfg(all(not(feature = "wasm"), feature = "tokio-runtime"))]
pub use tokio::time::sleep;
Expand Down
93 changes: 93 additions & 0 deletions crates/buttplug_core/src/util/task/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Buttplug Rust Source Code File - See https://buttplug.io for more info.
//
// Copyright 2016-2026 Nonpolynomial Labs LLC. All rights reserved.
//
// Licensed under the BSD 3-Clause license. See LICENSE file in the project root
// for full license information.

//! Task Scope and Task Registry: ownership and introspection for spawned tasks.
//!
//! A task spawned through a [TaskScope] is linked into an ownership tree, given
//! a hierarchical path, registered in the global [TaskRegistry], and handed a
//! cooperative [CancellationToken]; dropping a scope cancels its subtree. The
//! `buttplug_server` crate is fully migrated onto scope-owned tasks; the
//! hardware-manager and client crates still use the bare `spawn!` macro and
//! migrate onto scopes in a follow-up.

mod registry;
mod scope;

pub use registry::{TaskEvent, TaskId, TaskInfo, TaskOutcome, TaskRegistry, registry};
pub use scope::TaskScope;

use crate::util::async_manager;
use std::future::Future;

/// Spawn a task with no owning scope. Registered in the Task Registry under
/// "detached/{name}" so it still shows up in snapshots, but nothing can cancel
/// it. RARE — prefer [TaskScope::spawn]. Valid uses: one-shot notifications
/// where the spawner is being destroyed.
#[cfg(not(feature = "wasm"))]
pub fn spawn_detached<Fut>(name: &str, fut: Fut)
where
Fut: Future<Output = ()> + Send + 'static,
{
let path = format!("detached/{name}");
let id = registry().register(path.clone(), true);
let span = tracing::span!(tracing::Level::INFO, "buttplug_task", task.path = %path);
async_manager::spawn(
async move {
// Deregister via a drop guard so a panicking detached task still leaves
// the registry (outcome Panicked) instead of leaking its entry forever.
// Detached tasks have no cancellation token, so the outcome is Panicked on
// panic, else Completed.
let _guard = scope::DeregisterGuard::new(id, None);
fut.await;
},
span,
);
}

/// Spawn a task with no owning scope (WASM, no Send required). See the
/// non-WASM variant for semantics.
#[cfg(feature = "wasm")]
pub fn spawn_detached<Fut>(name: &str, fut: Fut)
where
Fut: Future<Output = ()> + 'static,
{
let path = format!("detached/{name}");
let id = registry().register(path.clone(), true);
let span = tracing::span!(tracing::Level::INFO, "buttplug_task", task.path = %path);
async_manager::spawn(
async move {
// Deregister via a drop guard so a panicking detached task still leaves
// the registry (outcome Panicked) instead of leaking its entry forever.
// Detached tasks have no cancellation token, so the outcome is Panicked on
// panic, else Completed.
let _guard = scope::DeregisterGuard::new(id, None);
fut.await;
},
span,
);
}

#[cfg(test)]
mod test {
use super::*;
use std::time::Duration;

#[tokio::test]
async fn test_spawn_detached_registers() {
let (tx, rx) = tokio::sync::oneshot::channel();
spawn_detached("test-notify", async move {
let _ = tx.send(());
});
rx.await.unwrap();
tokio::time::timeout(
Duration::from_secs(1),
registry().wait_empty_under("detached/test-notify"),
)
.await
.expect("detached task did not deregister");
}
}
222 changes: 222 additions & 0 deletions crates/buttplug_core/src/util/task/registry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
// Buttplug Rust Source Code File - See https://buttplug.io for more info.
//
// Copyright 2016-2026 Nonpolynomial Labs LLC. All rights reserved.
//
// Licensed under the BSD 3-Clause license. See LICENSE file in the project root
// for full license information.

use dashmap::DashMap;
use std::sync::{
OnceLock,
atomic::{AtomicU64, Ordering},
};
use tokio::sync::broadcast;

/// Unique identifier for a registered task. Process-lifetime unique.
///
/// IDs are drawn from a monotonically-incrementing `AtomicU64` counter. At
/// 64-bit width and even at 10 million tasks per second the counter would take
/// roughly 58,000 years to wrap, so id reuse within a single process lifetime
/// is not a practical concern.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct TaskId(u64);

impl TaskId {
pub fn value(&self) -> u64 {
self.0
}
}

/// How a task ended: ran to completion on its own, exited after observing
/// cancellation, or unwound from a panic.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TaskOutcome {
Completed,
Cancelled,
Panicked,
}

/// Registry entry for a live task.
#[derive(Debug, Clone)]
pub struct TaskInfo {
pub id: TaskId,
/// Hierarchical path, e.g. "device-manager-1/devices/device-3/io".
pub path: String,
/// True if spawned via spawn_detached (no owning scope).
pub detached: bool,
}

/// Lifecycle events broadcast by the Task Registry.
#[derive(Debug, Clone)]
pub enum TaskEvent {
Started {
id: TaskId,
path: String,
},
Ended {
id: TaskId,
path: String,
outcome: TaskOutcome,
},
}

/// The global record of every live task, populated as a side effect of
/// spawning through a Task Scope.
///
/// **Memory note**: entries are removed on `deregister`, but `DashMap` does not
/// shrink shard capacity after removals. Peak concurrent-task count therefore
/// becomes a memory high-water mark that is held for the lifetime of the
/// registry (i.e. the process). In practice buttplug servers run a bounded
/// number of concurrent tasks, so this is not expected to be significant.
#[derive(Debug)]
pub struct TaskRegistry {
tasks: DashMap<u64, TaskInfo>,
counter: AtomicU64,
root_counter: AtomicU64,
events: broadcast::Sender<TaskEvent>,
}

/// The global Task Registry.
pub fn registry() -> &'static TaskRegistry {
static REGISTRY: OnceLock<TaskRegistry> = OnceLock::new();
REGISTRY.get_or_init(TaskRegistry::new)
}

impl TaskRegistry {
pub(super) fn new() -> Self {
Self {
tasks: DashMap::new(),
counter: AtomicU64::new(1),
root_counter: AtomicU64::new(1),
events: broadcast::channel(256).0,
}
}

/// Unique suffix for root scope names so parallel instances don't collide.
pub(super) fn next_root_suffix(&self) -> u64 {
self.root_counter.fetch_add(1, Ordering::Relaxed)
}

pub(super) fn register(&self, path: String, detached: bool) -> TaskId {
let id = TaskId(self.counter.fetch_add(1, Ordering::Relaxed));
self.tasks.insert(
id.0,
TaskInfo {
id,
path: path.clone(),
detached,
},
);
let _ = self.events.send(TaskEvent::Started { id, path });
id
}

pub(super) fn deregister(&self, id: TaskId, outcome: TaskOutcome) {
if let Some((_, info)) = self.tasks.remove(&id.0) {
let _ = self.events.send(TaskEvent::Ended {
id,
path: info.path,
outcome,
});
}
}

/// Snapshot of all live tasks.
pub fn snapshot(&self) -> Vec<TaskInfo> {
self.tasks.iter().map(|e| e.value().clone()).collect()
}

/// Count of live tasks at or under the given path prefix. Prefix matching is
/// segment-aware: "server-1" matches "server-1/loop" but not "server-10/loop".
pub fn live_count_under(&self, prefix: &str) -> usize {
self
.tasks
.iter()
.filter(|e| path_is_under(&e.value().path, prefix))
.count()
}

/// Subscribe to task lifecycle events.
pub fn event_stream(&self) -> broadcast::Receiver<TaskEvent> {
self.events.subscribe()
}

/// Wait until no live tasks remain at or under the given path prefix.
/// Subscribes to events BEFORE counting to avoid missing an Ended event
/// between the count and the wait. Callers should wrap in a timeout if the
/// subtree may contain uncooperative tasks.
pub async fn wait_empty_under(&self, prefix: &str) {
let mut events = self.events.subscribe();
while self.live_count_under(prefix) > 0 {
match events.recv().await {
Ok(_) | Err(broadcast::error::RecvError::Lagged(_)) => continue,
// Sender is owned by the registry itself; Closed only happens for
// non-global registries in tests being dropped.
Err(broadcast::error::RecvError::Closed) => return,
}
}
}
}

fn path_is_under(path: &str, prefix: &str) -> bool {
path == prefix || (path.starts_with(prefix) && path.as_bytes().get(prefix.len()) == Some(&b'/'))
}

#[cfg(test)]
mod test {
use super::*;

#[test]
fn test_register_and_snapshot() {
let reg = TaskRegistry::new();
let id = reg.register("root-1/loop".to_owned(), false);
let snapshot = reg.snapshot();
assert_eq!(snapshot.len(), 1);
assert_eq!(snapshot[0].id, id);
assert_eq!(snapshot[0].path, "root-1/loop");
reg.deregister(id, TaskOutcome::Completed);
assert!(reg.snapshot().is_empty());
}

#[test]
fn test_prefix_boundary() {
let reg = TaskRegistry::new();
reg.register("server-1/loop".to_owned(), false);
reg.register("server-10/loop".to_owned(), false);
// "server-1" must not match "server-10/loop"
assert_eq!(reg.live_count_under("server-1"), 1);
assert_eq!(reg.live_count_under("server-10"), 1);
assert_eq!(reg.live_count_under("server"), 0);
}

#[tokio::test]
async fn test_events_emitted() {
let reg = TaskRegistry::new();
let mut events = reg.event_stream();
let id = reg.register("root-1/task".to_owned(), false);
reg.deregister(id, TaskOutcome::Cancelled);
assert!(matches!(
events.recv().await.unwrap(),
TaskEvent::Started { .. }
));
let TaskEvent::Ended { outcome, .. } = events.recv().await.unwrap() else {
panic!("expected Ended event");
};
assert_eq!(outcome, TaskOutcome::Cancelled);
}

#[tokio::test]
async fn test_wait_empty_under() {
let reg = std::sync::Arc::new(TaskRegistry::new());
let id = reg.register("root-1/task".to_owned(), false);
let reg_clone = reg.clone();
let waiter = tokio::spawn(async move { reg_clone.wait_empty_under("root-1").await });
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
assert!(!waiter.is_finished());
reg.deregister(id, TaskOutcome::Completed);
tokio::time::timeout(std::time::Duration::from_secs(1), waiter)
.await
.expect("wait_empty_under did not resolve")
.unwrap();
}
}
Loading