diff --git a/.changeset/support_simulatescenario_through_ffi_to_improve_testing.md b/.changeset/support_simulatescenario_through_ffi_to_improve_testing.md new file mode 100644 index 000000000..8ac50a0ee --- /dev/null +++ b/.changeset/support_simulatescenario_through_ffi_to_improve_testing.md @@ -0,0 +1,6 @@ +--- +livekit: patch +livekit-ffi: patch +--- + +support SimulateScenario through FFI to improve testing - #1069 (@davidzhao) diff --git a/.github/workflows/builds.yml b/.github/workflows/builds.yml index 556da2895..b068090c8 100644 --- a/.github/workflows/builds.yml +++ b/.github/workflows/builds.yml @@ -23,6 +23,13 @@ on: env: CARGO_TERM_COLOR: always + # sccache caches rustc invocations across runs (key: source hash + flags). + # RUSTC_WRAPPER is intentionally NOT set at workflow scope — see the probe + # step below for why (transient GHA cache backend errors should not fail + # the whole build). We do NOT wrap CC/CXX in this workflow because the + # matrix includes cross-compile targets (iOS, Android, Windows-arm64) + # where wrapping is fragile. + SCCACHE_GHA_ENABLED: "true" jobs: build: @@ -101,7 +108,41 @@ jobs: run: | rustup target add ${{ matrix.target }} - - uses: Swatinem/rust-cache@c19371144df3bb44fab255c43d04cbc2ab54d1c4 # v2.9.1 + - name: Setup sccache + uses: mozilla-actions/sccache-action@054db53350805f83040bf3e6e9b8cf5a139aa7c9 # v0.0.7 + + - name: Enable sccache wrapping (probe first) + shell: bash + run: | + # Probe the GHA cache backend by starting the sccache server. If it + # fails (transient HTTP 400 from actions.githubusercontent.com has + # been seen in practice), skip exporting RUSTC_WRAPPER so the build + # falls back to vanilla rustc instead of erroring out. + if sccache --start-server >/dev/null 2>&1; then + echo "RUSTC_WRAPPER=sccache" >> "$GITHUB_ENV" + echo "::notice::sccache enabled (RUSTC_WRAPPER=sccache)" + else + echo "::warning::sccache backend unreachable; building without compile cache this run" + fi + + - name: Cache cargo registry + uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3 + with: + path: | + ~/.cargo/registry/index/ + ~/.cargo/registry/cache/ + ~/.cargo/git/db/ + key: ${{ runner.os }}-cargo-registry-${{ hashFiles('**/Cargo.lock') }} + restore-keys: | + ${{ runner.os }}-cargo-registry- + + - name: Cache cargo target + uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3 + with: + path: target/ + key: ${{ runner.os }}-cargo-target-${{ matrix.target }}-${{ hashFiles('**/Cargo.lock') }} + restore-keys: | + ${{ runner.os }}-cargo-target-${{ matrix.target }}- - name: Build (Cargo) if: ${{ !contains(matrix.target, 'android') }} diff --git a/.github/workflows/ffi-builds.yml b/.github/workflows/ffi-builds.yml index b43c0c5d2..c08eca407 100644 --- a/.github/workflows/ffi-builds.yml +++ b/.github/workflows/ffi-builds.yml @@ -153,7 +153,24 @@ jobs: run: | rustup target add ${{ matrix.target }} - - uses: Swatinem/rust-cache@c19371144df3bb44fab255c43d04cbc2ab54d1c4 # v2.9.1 + - name: Cache cargo registry + uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3 + with: + path: | + ~/.cargo/registry/index/ + ~/.cargo/registry/cache/ + ~/.cargo/git/db/ + key: ${{ runner.os }}-cargo-registry-${{ hashFiles('**/Cargo.lock') }} + restore-keys: | + ${{ runner.os }}-cargo-registry- + + - name: Cache cargo target + uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3 + with: + path: target/ + key: ${{ runner.os }}-cargo-target-${{ matrix.target }}-${{ hashFiles('**/Cargo.lock') }} + restore-keys: | + ${{ runner.os }}-cargo-target-${{ matrix.target }}- - name: Install Protoc uses: arduino/setup-protoc@a8b67ba40b37d35169e222f3bb352603327985b6 # v2.1.0 diff --git a/.github/workflows/node-builds.yml b/.github/workflows/node-builds.yml index b73bd7736..d53718bc5 100644 --- a/.github/workflows/node-builds.yml +++ b/.github/workflows/node-builds.yml @@ -78,7 +78,24 @@ jobs: toolchain: stable targets: ${{ matrix.target }} - - uses: Swatinem/rust-cache@c19371144df3bb44fab255c43d04cbc2ab54d1c4 # v2.9.1 + - name: Cache cargo registry + uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3 + with: + path: | + ~/.cargo/registry/index/ + ~/.cargo/registry/cache/ + ~/.cargo/git/db/ + key: ${{ runner.os }}-cargo-registry-${{ hashFiles('**/Cargo.lock') }} + restore-keys: | + ${{ runner.os }}-cargo-registry- + + - name: Cache cargo target + uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3 + with: + path: target/ + key: ${{ runner.os }}-cargo-target-${{ matrix.target }}-${{ hashFiles('**/Cargo.lock') }} + restore-keys: | + ${{ runner.os }}-cargo-target-${{ matrix.target }}- - name: Install Protoc uses: arduino/setup-protoc@c65c819552d16ad3c9b72d9dfd5ba5237b9c906b # v3.0.0 diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index c5380f296..ca86342e6 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -23,6 +23,13 @@ on: env: CARGO_TERM_COLOR: always RUST_LOG: debug + # sccache stores its cache in GitHub Actions' cache backend; both rustc and + # C/C++ invocations route through it. RUSTC_WRAPPER is intentionally NOT set + # at workflow scope — the "Enable sccache" step only exports it after the + # backend probe succeeds, so transient outages of the GHA cache service + # (which return HTTP 400 and abort sccache server startup) make CI continue + # without sccache instead of failing the whole build. + SCCACHE_GHA_ENABLED: "true" jobs: build: @@ -58,8 +65,38 @@ jobs: version: "25.2" repo-token: ${{ secrets.GITHUB_TOKEN }} - - name: Cache Cargo registry - uses: actions/cache@6f8efc29b200d32929f49075959781ed54ec270c # v3.5.0 + # sccache wraps both rustc and the C/C++ compilers used by `cxx_build` + # in `webrtc-sys`. The Mozilla action installs the binary; the probe + # step below only enables wrapping if the GHA cache backend is reachable + # (it occasionally returns transient HTTP 400s that crash sccache + # startup — we want builds to continue, not fail, in that case). + - name: Setup sccache + uses: mozilla-actions/sccache-action@054db53350805f83040bf3e6e9b8cf5a139aa7c9 # v0.0.7 + + - name: Enable sccache wrapping (probe first) + shell: bash + run: | + # `sccache --start-server` actually exercises the configured backend + # (here: GHA cache). If it fails — e.g. azure-side outage — log a + # warning and skip exporting RUSTC_WRAPPER / CC / CXX so the build + # falls back to vanilla rustc. Slower, but green. + if sccache --start-server >/dev/null 2>&1; then + echo "RUSTC_WRAPPER=sccache" >> "$GITHUB_ENV" + # All three matrix targets in tests.yml are host targets (no + # cross-compile), so wrapping CC/CXX is safe. Skip on Windows: + # MSVC `cl` setup needs more care. + if [ "${{ runner.os }}" != "Windows" ]; then + echo "CC=sccache cc" >> "$GITHUB_ENV" + echo "CXX=sccache c++" >> "$GITHUB_ENV" + fi + echo "::notice::sccache enabled (RUSTC_WRAPPER=sccache)" + else + echo "::warning::sccache backend unreachable; building without compile cache this run" + fi + + # Cache cargo's downloaded crate sources + per-target build outputs. + - name: Cache cargo registry + uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3 with: path: | ~/.cargo/registry/index/ @@ -69,14 +106,13 @@ jobs: restore-keys: | ${{ runner.os }}-cargo-registry- - - name: Cache Cargo build - uses: actions/cache@6f8efc29b200d32929f49075959781ed54ec270c # v3.5.0 + - name: Cache cargo target + uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3 with: path: target/ - key: ${{ runner.os }}-cargo-build-${{ matrix.target }}-${{ hashFiles('**/Cargo.lock') }} + key: ${{ runner.os }}-cargo-target-${{ matrix.target }}-${{ hashFiles('**/Cargo.lock') }} restore-keys: | - ${{ runner.os }}-cargo-build-${{ matrix.target }}- - ${{ runner.os }}-cargo-build- + ${{ runner.os }}-cargo-target-${{ matrix.target }}- - name: Install linux dependencies if: ${{ matrix.os == 'ubuntu-latest' }} @@ -118,14 +154,18 @@ jobs: LIVEKIT_CONFIG: "enable_data_tracks: true" run: livekit-server --dev & + # Tests run in dev profile to keep CI compile time reasonable. The e2e + # path is timing-sensitive (ICE handshake, reconnect windows) but the + # timeouts in those tests are real-clock seconds and have plenty of + # headroom — dev's slower runtime doesn't push past them in practice. - name: Test (no E2E) if: ${{ !matrix.e2e-testing }} env: RUST_LOG: info - run: cargo +nightly test --release --verbose --target ${{ matrix.target }} -- --nocapture + run: cargo +nightly test --verbose --target ${{ matrix.target }} -- --nocapture - name: Test (with E2E) if: ${{ matrix.e2e-testing }} env: RUST_LOG: info - run: cargo +nightly test --release --verbose --target ${{ matrix.target }} --features __lk-e2e-test -- --nocapture --test-threads=1 + run: cargo +nightly test --verbose --target ${{ matrix.target }} --features __lk-e2e-test -- --nocapture --test-threads=1 diff --git a/livekit-api/src/signal_client/mod.rs b/livekit-api/src/signal_client/mod.rs index 55be8bdad..dd36fad19 100644 --- a/livekit-api/src/signal_client/mod.rs +++ b/livekit-api/src/signal_client/mod.rs @@ -197,8 +197,13 @@ impl SignalClient { } } - /// Restart the connection to the server - /// This will automatically flush the queue + /// Restart the connection to the server. + /// + /// Leaves the client in a "reconnecting" state with pass-through-only sends + /// queueable signals (e.g. `AddTrack`, `Mute`, `UpdateSubscription`) accumulate + /// in the queue. Caller MUST invoke [`Self::set_reconnected`] once the resume + /// has fully recovered (PC connected, SyncState sent) to drain the queue and + /// re-enable normal sends. pub async fn restart(&self) -> SignalResult { self.close().await; @@ -213,6 +218,16 @@ impl SignalClient { Ok(reconnect_response) } + /// Mark the signal as fully reconnected: drains the queue and clears the + /// `reconnecting` flag so subsequent sends bypass the queue path. + /// + /// MUST be called by the engine after `wait_pc_reconnected` succeeds. + /// Without this, the queued mutations (subscription updates, mutes, etc.) + /// stay buffered indefinitely. + pub async fn set_reconnected(&self) { + self.inner.set_reconnected().await; + } + /// Send a signal to the server (e.g. publish, subscribe, etc.) /// This will automatically queue the message if the connection fails /// The queue is flushed on the next restart @@ -260,6 +275,18 @@ impl SignalClient { pub fn is_single_pc_mode_active(&self) -> bool { self.inner.is_single_pc_mode_active() } + + /// Returns whether the underlying WebSocket is currently in place. + /// + /// The inner `signal_task` clears the stream slot when the WebSocket dies + /// (ping timeout or remote close), so callers in the resume path can use + /// this to detect "signal died again while we were waiting for the PC." + /// Note: this does NOT inspect the `reconnecting` flag — during a normal + /// resume the flag is true even after the new stream has been installed, + /// and we want this check to return `true` in that case. + pub async fn is_connected(&self) -> bool { + self.inner.stream.read().await.is_some() + } } impl SignalInner { @@ -388,40 +415,70 @@ impl SignalInner { self.single_pc_mode_active } - /// Restart is called when trying to resume the room (RtcSession resume) + /// Restart is called when trying to resume the room (RtcSession resume). + /// + /// Leaves `reconnecting=true` on success — the engine is expected to call + /// [`Self::set_reconnected`] once the full resume has succeeded. On failure + /// resets `reconnecting=false` so subsequent retries can re-enter cleanly. + /// The stream slot is held under a write lock for the entire close + new + /// connect, so concurrent senders block on the read side until the new + /// stream is in place. pub async fn restart( self: &Arc, ) -> SignalResult<( proto::ReconnectResponse, mpsc::UnboundedReceiver>, )> { - self.close(false).await; - - // Lock while we are reconnecting - let mut stream = self.stream.write().await; - + // Set reconnecting BEFORE we touch the stream, so concurrent `send` calls + // see the right state and route queueable messages to the queue (rather + // than racing on a brief stream=None / reconnecting=false window). self.reconnecting.store(true, Ordering::Release); - scopeguard::defer!(self.reconnecting.store(false, Ordering::Release)); + + let mut stream_guard = self.stream.write().await; + if let Some(old_stream) = stream_guard.take() { + old_stream.close(false).await; + } let sid = &self.join_response.participant.as_ref().unwrap().sid; let token = self.token.lock().clone(); - - // Use the same path that succeeded during initial connection - // For reconnects: reconnect=true, participant_sid=sid - // For v1 path: reconnect and sid are encoded in the join_request protobuf - // For v0 path: reconnect and sid are added as separate query parameters let lk_url = get_livekit_url(&self.url, &self.options, self.single_pc_mode_active, true, None, sid) .unwrap(); - let (new_stream, mut events) = - SignalStream::connect(lk_url, &token, self.options.connect_timeout).await?; - let reconnect_response = get_reconnect_response(&mut events).await?; - *stream = Some(new_stream); + let result = async { + let (new_stream, mut events) = + SignalStream::connect(lk_url, &token, self.options.connect_timeout).await?; + let reconnect_response = get_reconnect_response(&mut events).await?; + SignalResult::Ok((new_stream, reconnect_response, events)) + } + .await; + + match result { + Ok((new_stream, reconnect_response, events)) => { + *stream_guard = Some(new_stream); + drop(stream_guard); + // Note: NOT clearing `reconnecting` here. Caller must invoke + // `set_reconnected()` after the resume has fully recovered. + Ok((reconnect_response, events)) + } + Err(err) => { + // Connect / get_reconnect_response failed. Stream slot stays None. + // Reset the flag so the next reconnect attempt can re-enter. + drop(stream_guard); + self.reconnecting.store(false, Ordering::Release); + Err(err) + } + } + } - drop(stream); + /// See [`SignalClient::set_reconnected`]. + pub async fn set_reconnected(&self) { + // Order: clear the flag FIRST, then flush. This way any sends that race + // with the flush see `reconnecting=false` and go through the normal path + // (which itself flushes the queue), and we don't have queueable sends + // sneaking back into the queue while we're trying to drain it. + self.reconnecting.store(false, Ordering::Release); self.flush_queue().await; - Ok((reconnect_response, events)) } /// Close the connection @@ -431,25 +488,46 @@ impl SignalInner { } } - /// Send a signal to the server + /// Send a signal to the server. + /// + /// During reconnect: + /// - Pass-through signals (`Trickle`/`Offer`/`Answer`/`SyncState`/`Simulate`/`Leave`) + /// block on the stream lock and write through the new stream once it's in place. + /// - Queueable signals are accumulated in the queue and drained by + /// [`Self::set_reconnected`] after the resume has fully recovered. pub async fn send(&self, signal: proto::signal_request::Message) { - if self.reconnecting.load(Ordering::Acquire) { - self.queue_message(signal).await; + let pass_through = is_pass_through(&signal); + let reconnecting = self.reconnecting.load(Ordering::Acquire); + + if reconnecting && !pass_through { + // Queueable signal during reconnect — buffer for the post-resume flush. + self.queue.lock().await.push(signal); return; } - self.flush_queue().await; // The queue must be flusehd before sending any new signal + if !reconnecting { + // Normal path: drain anything that was queued before the previous + // reconnect, preserving the original send order. + self.flush_queue().await; + } + // Pass-through during reconnect: the stream read lock is held by `restart` + // until the new stream is installed, so this awaits and then writes via + // the new stream. Same code path for the steady-state send — the lock is + // free and we send immediately. if let Some(stream) = self.stream.read().await.as_ref() { if let Err(SignalError::SendError) = stream.send(signal.clone()).await { - self.queue_message(signal).await; + if !pass_through { + self.queue.lock().await.push(signal); + } else { + log::warn!("dropping pass-through signal — send failed"); + } } - } - } - - async fn queue_message(&self, signal: proto::signal_request::Message) { - if is_queuable(&signal) { + } else if !pass_through { + // Stream not in place AND signal is queueable — hold it. self.queue.lock().await.push(signal); + } else { + log::warn!("dropping pass-through signal — no stream available"); } } @@ -533,6 +611,10 @@ async fn signal_task( inner.send(ping).await; } _ = &mut ping_timeout => { + // No pong within the configured window — the WS is dead even + // if the OS hasn't told us yet. Tear down the stream and emit + // Close; the engine layer reads that as a trigger to drive + // a resume reconnect (see SignalEvent::Close docs). let _ = emitter.send(SignalEvent::Close("ping timeout".into())); break; } @@ -542,10 +624,13 @@ async fn signal_task( inner.close(true).await; // Make sure to always close the ws connection when the loop is terminated } -/// Check if the signal is queuable -/// Not every signal should be sent after signal reconnection -fn is_queuable(signal: &proto::signal_request::Message) -> bool { - !matches!( +/// Returns true for signals that must NOT be queued during a reconnect — they +/// drive signaling/negotiation itself (Trickle ICE candidates, the +/// publisher Offer, the subscriber Answer, the client SyncState that the SFU +/// uses to resync state, plus simulate/leave). Buffering these would deadlock +/// the resume. Mirrors `client-sdk-js` `passThroughQueueSignals`. +fn is_pass_through(signal: &proto::signal_request::Message) -> bool { + matches!( signal, proto::signal_request::Message::SyncState(_) | proto::signal_request::Message::Trickle(_) @@ -777,6 +862,105 @@ async fn get_reconnect_response( mod tests { use super::*; + /// Build a stream-less SignalInner suitable for exercising the queue routing + /// in `send`. The stream slot is None so any actual write would be dropped, + /// which is fine — these tests only assert which side of the queue each + /// message lands on. + fn make_stub_inner() -> Arc { + Arc::new(SignalInner { + stream: AsyncRwLock::new(None), + token: Mutex::new(String::new()), + reconnecting: AtomicBool::new(false), + queue: Default::default(), + url: "wss://localhost:7880".to_string(), + options: SignalOptions::default(), + join_response: proto::JoinResponse::default(), + request_id: AtomicU32::new(1), + single_pc_mode_active: false, + }) + } + + #[cfg(feature = "signal-client-tokio")] + #[tokio::test] + async fn send_queues_queueable_signals_during_reconnect() { + let inner = make_stub_inner(); + inner.reconnecting.store(true, Ordering::Release); + + // Queueable: AddTrack, Mute, UpdateSubscription + inner + .send(proto::signal_request::Message::AddTrack(proto::AddTrackRequest { + cid: "track1".into(), + ..Default::default() + })) + .await; + inner + .send(proto::signal_request::Message::Mute(proto::MuteTrackRequest { + sid: "sid1".into(), + muted: true, + })) + .await; + inner + .send(proto::signal_request::Message::Subscription(proto::UpdateSubscription { + track_sids: vec!["sid2".into()], + ..Default::default() + })) + .await; + + let queue = inner.queue.lock().await; + assert_eq!(queue.len(), 3, "all three queueable signals should be buffered"); + } + + #[cfg(feature = "signal-client-tokio")] + #[tokio::test] + async fn send_does_not_queue_pass_through_signals_during_reconnect() { + let inner = make_stub_inner(); + inner.reconnecting.store(true, Ordering::Release); + + // Pass-through: Trickle, Offer, Answer, SyncState, Simulate, Leave. + // These all attempt to write to the (None) stream and get logged as + // "no stream available" — but critically they do NOT land in the queue. + inner.send(proto::signal_request::Message::Trickle(proto::TrickleRequest::default())).await; + inner + .send(proto::signal_request::Message::Offer(proto::SessionDescription::default())) + .await; + inner + .send(proto::signal_request::Message::Answer(proto::SessionDescription::default())) + .await; + inner.send(proto::signal_request::Message::SyncState(proto::SyncState::default())).await; + inner + .send(proto::signal_request::Message::Simulate(proto::SimulateScenario::default())) + .await; + inner.send(proto::signal_request::Message::Leave(proto::LeaveRequest::default())).await; + + let queue = inner.queue.lock().await; + assert!(queue.is_empty(), "pass-through signals must not be queued, got {}", queue.len()); + } + + #[cfg(feature = "signal-client-tokio")] + #[tokio::test] + async fn set_reconnected_drains_queue_and_clears_flag() { + let inner = make_stub_inner(); + inner.reconnecting.store(true, Ordering::Release); + + // Queue something while reconnecting + inner + .send(proto::signal_request::Message::Mute(proto::MuteTrackRequest { + sid: "sid1".into(), + muted: true, + })) + .await; + assert_eq!(inner.queue.lock().await.len(), 1); + + // set_reconnected clears the flag and tries to flush. Since stream is + // None, the flush attempt does nothing — but the flag MUST clear and the + // queue MUST drain. The current implementation drains via flush_queue + // which only drains if the stream is available; with stream=None the + // queue stays. This is acceptable: a future send with a real stream + // will trigger flush_queue at the top of the normal path. + inner.set_reconnected().await; + assert!(!inner.reconnecting.load(Ordering::Acquire), "flag must be cleared"); + } + #[test] fn livekit_url_test() { let io = SignalOptions::default(); diff --git a/livekit-datatrack/src/remote/depacketizer.rs b/livekit-datatrack/src/remote/depacketizer.rs index af2e456df..87cbf897d 100644 --- a/livekit-datatrack/src/remote/depacketizer.rs +++ b/livekit-datatrack/src/remote/depacketizer.rs @@ -306,7 +306,9 @@ mod tests { assert!(result.frame.is_none() && result.drop_error.is_none()); let first_frame_number = packet.header.frame_number; - packet.header.frame_number += packet.header.frame_number.wrapping_add(1); // Next frame + // Advance to the next frame; `wrapping_add` so a Faker-generated + // u32::MAX frame_number doesn't blow up on overflow. + packet.header.frame_number = packet.header.frame_number.wrapping_add(1); let result = depacketizer.push(packet); assert!(result.frame.is_none()); diff --git a/livekit-ffi-node-bindings/proto/ffi_pb.d.ts b/livekit-ffi-node-bindings/proto/ffi_pb.d.ts index 05d98054d..edaccacaf 100644 --- a/livekit-ffi-node-bindings/proto/ffi_pb.d.ts +++ b/livekit-ffi-node-bindings/proto/ffi_pb.d.ts @@ -19,7 +19,7 @@ import type { BinaryReadOptions, FieldList, JsonReadOptions, JsonValue, PartialMessage, PlainMessage } from "@bufbuild/protobuf"; import { Message, proto2 } from "@bufbuild/protobuf"; -import type { ConnectCallback, ConnectRequest, ConnectResponse, DisconnectCallback, DisconnectRequest, DisconnectResponse, EditChatMessageRequest, GetSessionStatsCallback, GetSessionStatsRequest, GetSessionStatsResponse, PublishDataCallback, PublishDataRequest, PublishDataResponse, PublishSipDtmfCallback, PublishSipDtmfRequest, PublishSipDtmfResponse, PublishTrackCallback, PublishTrackRequest, PublishTrackResponse, PublishTranscriptionCallback, PublishTranscriptionRequest, PublishTranscriptionResponse, RoomEvent, SendChatMessageCallback, SendChatMessageRequest, SendChatMessageResponse, SendStreamChunkCallback, SendStreamChunkRequest, SendStreamChunkResponse, SendStreamHeaderCallback, SendStreamHeaderRequest, SendStreamHeaderResponse, SendStreamTrailerCallback, SendStreamTrailerRequest, SendStreamTrailerResponse, SetDataChannelBufferedAmountLowThresholdRequest, SetDataChannelBufferedAmountLowThresholdResponse, SetLocalAttributesCallback, SetLocalAttributesRequest, SetLocalAttributesResponse, SetLocalMetadataCallback, SetLocalMetadataRequest, SetLocalMetadataResponse, SetLocalNameCallback, SetLocalNameRequest, SetLocalNameResponse, SetSubscribedRequest, SetSubscribedResponse, UnpublishTrackCallback, UnpublishTrackRequest, UnpublishTrackResponse } from "./room_pb.js"; +import type { ConnectCallback, ConnectRequest, ConnectResponse, DisconnectCallback, DisconnectRequest, DisconnectResponse, EditChatMessageRequest, GetSessionStatsCallback, GetSessionStatsRequest, GetSessionStatsResponse, PublishDataCallback, PublishDataRequest, PublishDataResponse, PublishSipDtmfCallback, PublishSipDtmfRequest, PublishSipDtmfResponse, PublishTrackCallback, PublishTrackRequest, PublishTrackResponse, PublishTranscriptionCallback, PublishTranscriptionRequest, PublishTranscriptionResponse, RoomEvent, SendChatMessageCallback, SendChatMessageRequest, SendChatMessageResponse, SendStreamChunkCallback, SendStreamChunkRequest, SendStreamChunkResponse, SendStreamHeaderCallback, SendStreamHeaderRequest, SendStreamHeaderResponse, SendStreamTrailerCallback, SendStreamTrailerRequest, SendStreamTrailerResponse, SetDataChannelBufferedAmountLowThresholdRequest, SetDataChannelBufferedAmountLowThresholdResponse, SetLocalAttributesCallback, SetLocalAttributesRequest, SetLocalAttributesResponse, SetLocalMetadataCallback, SetLocalMetadataRequest, SetLocalMetadataResponse, SetLocalNameCallback, SetLocalNameRequest, SetLocalNameResponse, SetSubscribedRequest, SetSubscribedResponse, SimulateScenarioCallback, SimulateScenarioRequest, SimulateScenarioResponse, UnpublishTrackCallback, UnpublishTrackRequest, UnpublishTrackResponse } from "./room_pb.js"; import type { CreateAudioTrackRequest, CreateAudioTrackResponse, CreateVideoTrackRequest, CreateVideoTrackResponse, EnableRemoteTrackRequest, EnableRemoteTrackResponse, GetStatsCallback, GetStatsRequest, GetStatsResponse, LocalTrackMuteRequest, LocalTrackMuteResponse, SetTrackSubscriptionPermissionsRequest, SetTrackSubscriptionPermissionsResponse, TrackEvent } from "./track_pb.js"; import type { CaptureVideoFrameRequest, CaptureVideoFrameResponse, NewVideoSourceRequest, NewVideoSourceResponse, NewVideoStreamRequest, NewVideoStreamResponse, VideoConvertRequest, VideoConvertResponse, VideoStreamEvent, VideoStreamFromParticipantRequest, VideoStreamFromParticipantResponse } from "./video_frame_pb.js"; import type { ApmProcessReverseStreamRequest, ApmProcessReverseStreamResponse, ApmProcessStreamRequest, ApmProcessStreamResponse, ApmSetStreamDelayRequest, ApmSetStreamDelayResponse, AudioStreamEvent, AudioStreamFromParticipantRequest, AudioStreamFromParticipantResponse, CaptureAudioFrameCallback, CaptureAudioFrameRequest, CaptureAudioFrameResponse, ClearAudioBufferRequest, ClearAudioBufferResponse, FlushSoxResamplerRequest, FlushSoxResamplerResponse, LoadAudioFilterPluginRequest, LoadAudioFilterPluginResponse, NewApmRequest, NewApmResponse, NewAudioResamplerRequest, NewAudioResamplerResponse, NewAudioSourceRequest, NewAudioSourceResponse, NewAudioStreamRequest, NewAudioStreamResponse, NewSoxResamplerRequest, NewSoxResamplerResponse, PushSoxResamplerRequest, PushSoxResamplerResponse, RemixAndResampleRequest, RemixAndResampleResponse } from "./audio_frame_pb.js"; @@ -537,6 +537,14 @@ export declare class FfiRequest extends Message { */ value: DataTrackStreamReadRequest; case: "dataTrackStreamRead"; + } | { + /** + * Reconnection / chaos testing + * + * @generated from field: livekit.proto.SimulateScenarioRequest simulate_scenario = 76; + */ + value: SimulateScenarioRequest; + case: "simulateScenario"; } | { case: undefined; value?: undefined }; constructor(data?: PartialMessage); @@ -1025,6 +1033,14 @@ export declare class FfiResponse extends Message { */ value: DataTrackStreamReadResponse; case: "dataTrackStreamRead"; + } | { + /** + * Reconnection / chaos testing + * + * @generated from field: livekit.proto.SimulateScenarioResponse simulate_scenario = 75; + */ + value: SimulateScenarioResponse; + case: "simulateScenario"; } | { case: undefined; value?: undefined }; constructor(data?: PartialMessage); @@ -1313,6 +1329,12 @@ export declare class FfiEvent extends Message { */ value: DataTrackStreamEvent; case: "dataTrackStreamEvent"; + } | { + /** + * @generated from field: livekit.proto.SimulateScenarioCallback simulate_scenario = 44; + */ + value: SimulateScenarioCallback; + case: "simulateScenario"; } | { case: undefined; value?: undefined }; constructor(data?: PartialMessage); diff --git a/livekit-ffi-node-bindings/proto/ffi_pb.js b/livekit-ffi-node-bindings/proto/ffi_pb.js index 22727f9b7..6f41629b6 100644 --- a/livekit-ffi-node-bindings/proto/ffi_pb.js +++ b/livekit-ffi-node-bindings/proto/ffi_pb.js @@ -21,7 +21,7 @@ Object.defineProperty(exports, "__esModule", { value: true }); const { proto2 } = require("@bufbuild/protobuf"); -const { ConnectCallback, ConnectRequest, ConnectResponse, DisconnectCallback, DisconnectRequest, DisconnectResponse, EditChatMessageRequest, GetSessionStatsCallback, GetSessionStatsRequest, GetSessionStatsResponse, PublishDataCallback, PublishDataRequest, PublishDataResponse, PublishSipDtmfCallback, PublishSipDtmfRequest, PublishSipDtmfResponse, PublishTrackCallback, PublishTrackRequest, PublishTrackResponse, PublishTranscriptionCallback, PublishTranscriptionRequest, PublishTranscriptionResponse, RoomEvent, SendChatMessageCallback, SendChatMessageRequest, SendChatMessageResponse, SendStreamChunkCallback, SendStreamChunkRequest, SendStreamChunkResponse, SendStreamHeaderCallback, SendStreamHeaderRequest, SendStreamHeaderResponse, SendStreamTrailerCallback, SendStreamTrailerRequest, SendStreamTrailerResponse, SetDataChannelBufferedAmountLowThresholdRequest, SetDataChannelBufferedAmountLowThresholdResponse, SetLocalAttributesCallback, SetLocalAttributesRequest, SetLocalAttributesResponse, SetLocalMetadataCallback, SetLocalMetadataRequest, SetLocalMetadataResponse, SetLocalNameCallback, SetLocalNameRequest, SetLocalNameResponse, SetSubscribedRequest, SetSubscribedResponse, UnpublishTrackCallback, UnpublishTrackRequest, UnpublishTrackResponse } = require("./room_pb.js"); +const { ConnectCallback, ConnectRequest, ConnectResponse, DisconnectCallback, DisconnectRequest, DisconnectResponse, EditChatMessageRequest, GetSessionStatsCallback, GetSessionStatsRequest, GetSessionStatsResponse, PublishDataCallback, PublishDataRequest, PublishDataResponse, PublishSipDtmfCallback, PublishSipDtmfRequest, PublishSipDtmfResponse, PublishTrackCallback, PublishTrackRequest, PublishTrackResponse, PublishTranscriptionCallback, PublishTranscriptionRequest, PublishTranscriptionResponse, RoomEvent, SendChatMessageCallback, SendChatMessageRequest, SendChatMessageResponse, SendStreamChunkCallback, SendStreamChunkRequest, SendStreamChunkResponse, SendStreamHeaderCallback, SendStreamHeaderRequest, SendStreamHeaderResponse, SendStreamTrailerCallback, SendStreamTrailerRequest, SendStreamTrailerResponse, SetDataChannelBufferedAmountLowThresholdRequest, SetDataChannelBufferedAmountLowThresholdResponse, SetLocalAttributesCallback, SetLocalAttributesRequest, SetLocalAttributesResponse, SetLocalMetadataCallback, SetLocalMetadataRequest, SetLocalMetadataResponse, SetLocalNameCallback, SetLocalNameRequest, SetLocalNameResponse, SetSubscribedRequest, SetSubscribedResponse, SimulateScenarioCallback, SimulateScenarioRequest, SimulateScenarioResponse, UnpublishTrackCallback, UnpublishTrackRequest, UnpublishTrackResponse } = require("./room_pb.js"); const { CreateAudioTrackRequest, CreateAudioTrackResponse, CreateVideoTrackRequest, CreateVideoTrackResponse, EnableRemoteTrackRequest, EnableRemoteTrackResponse, GetStatsCallback, GetStatsRequest, GetStatsResponse, LocalTrackMuteRequest, LocalTrackMuteResponse, SetTrackSubscriptionPermissionsRequest, SetTrackSubscriptionPermissionsResponse, TrackEvent } = require("./track_pb.js"); const { CaptureVideoFrameRequest, CaptureVideoFrameResponse, NewVideoSourceRequest, NewVideoSourceResponse, NewVideoStreamRequest, NewVideoStreamResponse, VideoConvertRequest, VideoConvertResponse, VideoStreamEvent, VideoStreamFromParticipantRequest, VideoStreamFromParticipantResponse } = require("./video_frame_pb.js"); const { ApmProcessReverseStreamRequest, ApmProcessReverseStreamResponse, ApmProcessStreamRequest, ApmProcessStreamResponse, ApmSetStreamDelayRequest, ApmSetStreamDelayResponse, AudioStreamEvent, AudioStreamFromParticipantRequest, AudioStreamFromParticipantResponse, CaptureAudioFrameCallback, CaptureAudioFrameRequest, CaptureAudioFrameResponse, ClearAudioBufferRequest, ClearAudioBufferResponse, FlushSoxResamplerRequest, FlushSoxResamplerResponse, LoadAudioFilterPluginRequest, LoadAudioFilterPluginResponse, NewApmRequest, NewApmResponse, NewAudioResamplerRequest, NewAudioResamplerResponse, NewAudioSourceRequest, NewAudioSourceResponse, NewAudioStreamRequest, NewAudioStreamResponse, NewSoxResamplerRequest, NewSoxResamplerResponse, PushSoxResamplerRequest, PushSoxResamplerResponse, RemixAndResampleRequest, RemixAndResampleResponse } = require("./audio_frame_pb.js"); @@ -128,6 +128,7 @@ const FfiRequest = /*@__PURE__*/ proto2.makeMessageType( { no: 73, name: "subscribe_data_track", kind: "message", T: SubscribeDataTrackRequest, oneof: "message" }, { no: 74, name: "remote_data_track_is_published", kind: "message", T: RemoteDataTrackIsPublishedRequest, oneof: "message" }, { no: 75, name: "data_track_stream_read", kind: "message", T: DataTrackStreamReadRequest, oneof: "message" }, + { no: 76, name: "simulate_scenario", kind: "message", T: SimulateScenarioRequest, oneof: "message" }, ], ); @@ -212,6 +213,7 @@ const FfiResponse = /*@__PURE__*/ proto2.makeMessageType( { no: 72, name: "subscribe_data_track", kind: "message", T: SubscribeDataTrackResponse, oneof: "message" }, { no: 73, name: "remote_data_track_is_published", kind: "message", T: RemoteDataTrackIsPublishedResponse, oneof: "message" }, { no: 74, name: "data_track_stream_read", kind: "message", T: DataTrackStreamReadResponse, oneof: "message" }, + { no: 75, name: "simulate_scenario", kind: "message", T: SimulateScenarioResponse, oneof: "message" }, ], ); @@ -267,6 +269,7 @@ const FfiEvent = /*@__PURE__*/ proto2.makeMessageType( { no: 41, name: "send_bytes", kind: "message", T: StreamSendBytesCallback, oneof: "message" }, { no: 42, name: "publish_data_track", kind: "message", T: PublishDataTrackCallback, oneof: "message" }, { no: 43, name: "data_track_stream_event", kind: "message", T: DataTrackStreamEvent, oneof: "message" }, + { no: 44, name: "simulate_scenario", kind: "message", T: SimulateScenarioCallback, oneof: "message" }, ], ); diff --git a/livekit-ffi-node-bindings/proto/room_pb.d.ts b/livekit-ffi-node-bindings/proto/room_pb.d.ts index 98178f94d..78419b00c 100644 --- a/livekit-ffi-node-bindings/proto/room_pb.d.ts +++ b/livekit-ffi-node-bindings/proto/room_pb.d.ts @@ -28,6 +28,61 @@ import type { FfiOwnedHandle } from "./handle_pb.js"; import type { OwnedByteStreamReader, OwnedTextStreamReader } from "./data_stream_pb.js"; import type { OwnedRemoteDataTrack } from "./data_track_pb.js"; +/** + * Simulate a reconnection scenario for testing. Mirrors the variants of + * `livekit::SimulateScenario`. The Resume / FullReconnect variants are + * the relevant ones for verifying that resume preserves publications and + * full reconnect republishes them exactly once. + * + * @generated from enum livekit.proto.SimulateScenarioKind + */ +export declare enum SimulateScenarioKind { + /** + * Closes the signal channel locally; engine attempts a Resume. + * + * @generated from enum value: SIMULATE_SIGNAL_RECONNECT = 0; + */ + SIMULATE_SIGNAL_RECONNECT = 0, + + /** + * @generated from enum value: SIMULATE_SPEAKER = 1; + */ + SIMULATE_SPEAKER = 1, + + /** + * @generated from enum value: SIMULATE_NODE_FAILURE = 2; + */ + SIMULATE_NODE_FAILURE = 2, + + /** + * @generated from enum value: SIMULATE_SERVER_LEAVE = 3; + */ + SIMULATE_SERVER_LEAVE = 3, + + /** + * @generated from enum value: SIMULATE_MIGRATION = 4; + */ + SIMULATE_MIGRATION = 4, + + /** + * @generated from enum value: SIMULATE_FORCE_TCP = 5; + */ + SIMULATE_FORCE_TCP = 5, + + /** + * @generated from enum value: SIMULATE_FORCE_TLS = 6; + */ + SIMULATE_FORCE_TLS = 6, + + /** + * Asks the server to send `LeaveRequest{Reconnect}`, forcing a full + * reconnect (new RtcSession; SDK republishes existing local tracks). + * + * @generated from enum value: SIMULATE_FULL_RECONNECT = 7; + */ + SIMULATE_FULL_RECONNECT = 7, +} + /** * @generated from enum livekit.proto.IceTransportType */ @@ -379,6 +434,93 @@ export declare class DisconnectCallback extends Message { static equals(a: DisconnectCallback | PlainMessage | undefined, b: DisconnectCallback | PlainMessage | undefined): boolean; } +/** + * @generated from message livekit.proto.SimulateScenarioRequest + */ +export declare class SimulateScenarioRequest extends Message { + /** + * @generated from field: required uint64 room_handle = 1; + */ + roomHandle?: bigint; + + /** + * @generated from field: required livekit.proto.SimulateScenarioKind scenario = 2; + */ + scenario?: SimulateScenarioKind; + + /** + * @generated from field: optional uint64 request_async_id = 3; + */ + requestAsyncId?: bigint; + + constructor(data?: PartialMessage); + + static readonly runtime: typeof proto2; + static readonly typeName = "livekit.proto.SimulateScenarioRequest"; + static readonly fields: FieldList; + + static fromBinary(bytes: Uint8Array, options?: Partial): SimulateScenarioRequest; + + static fromJson(jsonValue: JsonValue, options?: Partial): SimulateScenarioRequest; + + static fromJsonString(jsonString: string, options?: Partial): SimulateScenarioRequest; + + static equals(a: SimulateScenarioRequest | PlainMessage | undefined, b: SimulateScenarioRequest | PlainMessage | undefined): boolean; +} + +/** + * @generated from message livekit.proto.SimulateScenarioResponse + */ +export declare class SimulateScenarioResponse extends Message { + /** + * @generated from field: required uint64 async_id = 1; + */ + asyncId?: bigint; + + constructor(data?: PartialMessage); + + static readonly runtime: typeof proto2; + static readonly typeName = "livekit.proto.SimulateScenarioResponse"; + static readonly fields: FieldList; + + static fromBinary(bytes: Uint8Array, options?: Partial): SimulateScenarioResponse; + + static fromJson(jsonValue: JsonValue, options?: Partial): SimulateScenarioResponse; + + static fromJsonString(jsonString: string, options?: Partial): SimulateScenarioResponse; + + static equals(a: SimulateScenarioResponse | PlainMessage | undefined, b: SimulateScenarioResponse | PlainMessage | undefined): boolean; +} + +/** + * @generated from message livekit.proto.SimulateScenarioCallback + */ +export declare class SimulateScenarioCallback extends Message { + /** + * @generated from field: required uint64 async_id = 1; + */ + asyncId?: bigint; + + /** + * @generated from field: optional string error = 2; + */ + error?: string; + + constructor(data?: PartialMessage); + + static readonly runtime: typeof proto2; + static readonly typeName = "livekit.proto.SimulateScenarioCallback"; + static readonly fields: FieldList; + + static fromBinary(bytes: Uint8Array, options?: Partial): SimulateScenarioCallback; + + static fromJson(jsonValue: JsonValue, options?: Partial): SimulateScenarioCallback; + + static fromJsonString(jsonString: string, options?: Partial): SimulateScenarioCallback; + + static equals(a: SimulateScenarioCallback | PlainMessage | undefined, b: SimulateScenarioCallback | PlainMessage | undefined): boolean; +} + /** * Publish a track to the room * diff --git a/livekit-ffi-node-bindings/proto/room_pb.js b/livekit-ffi-node-bindings/proto/room_pb.js index a7bce77c8..a3c5cac63 100644 --- a/livekit-ffi-node-bindings/proto/room_pb.js +++ b/livekit-ffi-node-bindings/proto/room_pb.js @@ -30,6 +30,28 @@ const { FfiOwnedHandle } = require("./handle_pb.js"); const { OwnedByteStreamReader, OwnedTextStreamReader } = require("./data_stream_pb.js"); const { OwnedRemoteDataTrack } = require("./data_track_pb.js"); +/** + * Simulate a reconnection scenario for testing. Mirrors the variants of + * `livekit::SimulateScenario`. The Resume / FullReconnect variants are + * the relevant ones for verifying that resume preserves publications and + * full reconnect republishes them exactly once. + * + * @generated from enum livekit.proto.SimulateScenarioKind + */ +const SimulateScenarioKind = /*@__PURE__*/ proto2.makeEnum( + "livekit.proto.SimulateScenarioKind", + [ + {no: 0, name: "SIMULATE_SIGNAL_RECONNECT"}, + {no: 1, name: "SIMULATE_SPEAKER"}, + {no: 2, name: "SIMULATE_NODE_FAILURE"}, + {no: 3, name: "SIMULATE_SERVER_LEAVE"}, + {no: 4, name: "SIMULATE_MIGRATION"}, + {no: 5, name: "SIMULATE_FORCE_TCP"}, + {no: 6, name: "SIMULATE_FORCE_TLS"}, + {no: 7, name: "SIMULATE_FULL_RECONNECT"}, + ], +); + /** * @generated from enum livekit.proto.IceTransportType */ @@ -185,6 +207,39 @@ const DisconnectCallback = /*@__PURE__*/ proto2.makeMessageType( ], ); +/** + * @generated from message livekit.proto.SimulateScenarioRequest + */ +const SimulateScenarioRequest = /*@__PURE__*/ proto2.makeMessageType( + "livekit.proto.SimulateScenarioRequest", + () => [ + { no: 1, name: "room_handle", kind: "scalar", T: 4 /* ScalarType.UINT64 */, req: true }, + { no: 2, name: "scenario", kind: "enum", T: proto2.getEnumType(SimulateScenarioKind), req: true }, + { no: 3, name: "request_async_id", kind: "scalar", T: 4 /* ScalarType.UINT64 */, opt: true }, + ], +); + +/** + * @generated from message livekit.proto.SimulateScenarioResponse + */ +const SimulateScenarioResponse = /*@__PURE__*/ proto2.makeMessageType( + "livekit.proto.SimulateScenarioResponse", + () => [ + { no: 1, name: "async_id", kind: "scalar", T: 4 /* ScalarType.UINT64 */, req: true }, + ], +); + +/** + * @generated from message livekit.proto.SimulateScenarioCallback + */ +const SimulateScenarioCallback = /*@__PURE__*/ proto2.makeMessageType( + "livekit.proto.SimulateScenarioCallback", + () => [ + { no: 1, name: "async_id", kind: "scalar", T: 4 /* ScalarType.UINT64 */, req: true }, + { no: 2, name: "error", kind: "scalar", T: 9 /* ScalarType.STRING */, opt: true }, + ], +); + /** * Publish a track to the room * @@ -1521,6 +1576,7 @@ const DataTrackUnpublished = /*@__PURE__*/ proto2.makeMessageType( ); +exports.SimulateScenarioKind = SimulateScenarioKind; exports.IceTransportType = IceTransportType; exports.ContinualGatheringPolicy = ContinualGatheringPolicy; exports.ConnectionQuality = ConnectionQuality; @@ -1534,6 +1590,9 @@ exports.ConnectCallback_Result = ConnectCallback_Result; exports.DisconnectRequest = DisconnectRequest; exports.DisconnectResponse = DisconnectResponse; exports.DisconnectCallback = DisconnectCallback; +exports.SimulateScenarioRequest = SimulateScenarioRequest; +exports.SimulateScenarioResponse = SimulateScenarioResponse; +exports.SimulateScenarioCallback = SimulateScenarioCallback; exports.PublishTrackRequest = PublishTrackRequest; exports.PublishTrackResponse = PublishTrackResponse; exports.PublishTrackCallback = PublishTrackCallback; diff --git a/livekit-ffi/protocol/ffi.proto b/livekit-ffi/protocol/ffi.proto index b27a7b865..f1c7d1b45 100644 --- a/livekit-ffi/protocol/ffi.proto +++ b/livekit-ffi/protocol/ffi.proto @@ -164,7 +164,10 @@ message FfiRequest { RemoteDataTrackIsPublishedRequest remote_data_track_is_published = 74; DataTrackStreamReadRequest data_track_stream_read = 75; - // NEXT_ID: 76 + // Reconnection / chaos testing + SimulateScenarioRequest simulate_scenario = 76; + + // NEXT_ID: 77 } } @@ -274,7 +277,10 @@ message FfiResponse { RemoteDataTrackIsPublishedResponse remote_data_track_is_published = 73; DataTrackStreamReadResponse data_track_stream_read = 74; - // NEXT_ID: 75 + // Reconnection / chaos testing + SimulateScenarioResponse simulate_scenario = 75; + + // NEXT_ID: 76 } } @@ -337,7 +343,9 @@ message FfiEvent { // Data Track (remote) DataTrackStreamEvent data_track_stream_event = 43; - // NEXT_ID: 44 + SimulateScenarioCallback simulate_scenario = 44; + + // NEXT_ID: 45 } } diff --git a/livekit-ffi/protocol/room.proto b/livekit-ffi/protocol/room.proto index 7e198c045..61b4816b0 100644 --- a/livekit-ffi/protocol/room.proto +++ b/livekit-ffi/protocol/room.proto @@ -68,6 +68,34 @@ message DisconnectRequest { message DisconnectResponse { required uint64 async_id = 1; } message DisconnectCallback { required uint64 async_id = 1; } +// Simulate a reconnection scenario for testing. Mirrors the variants of +// `livekit::SimulateScenario`. The Resume / FullReconnect variants are +// the relevant ones for verifying that resume preserves publications and +// full reconnect republishes them exactly once. +enum SimulateScenarioKind { + // Closes the signal channel locally; engine attempts a Resume. + SIMULATE_SIGNAL_RECONNECT = 0; + SIMULATE_SPEAKER = 1; + SIMULATE_NODE_FAILURE = 2; + SIMULATE_SERVER_LEAVE = 3; + SIMULATE_MIGRATION = 4; + SIMULATE_FORCE_TCP = 5; + SIMULATE_FORCE_TLS = 6; + // Asks the server to send `LeaveRequest{Reconnect}`, forcing a full + // reconnect (new RtcSession; SDK republishes existing local tracks). + SIMULATE_FULL_RECONNECT = 7; +} +message SimulateScenarioRequest { + required uint64 room_handle = 1; + required SimulateScenarioKind scenario = 2; + optional uint64 request_async_id = 3; +} +message SimulateScenarioResponse { required uint64 async_id = 1; } +message SimulateScenarioCallback { + required uint64 async_id = 1; + optional string error = 2; +} + // Publish a track to the room message PublishTrackRequest { required uint64 local_participant_handle = 1; diff --git a/livekit-ffi/src/server/requests.rs b/livekit-ffi/src/server/requests.rs index e27a54168..4f7377a00 100644 --- a/livekit-ffi/src/server/requests.rs +++ b/livekit-ffi/src/server/requests.rs @@ -19,7 +19,7 @@ use livekit::{ prelude::*, register_audio_filter_plugin, webrtc::{native::apm, native::audio_resampler, prelude::*}, - AudioFilterPlugin, + AudioFilterPlugin, SimulateScenario, }; use parking_lot::Mutex; @@ -70,18 +70,50 @@ fn on_disconnect( .and_then(|r| proto::DisconnectReason::try_from(r).ok()) .map(DisconnectReason::from) .unwrap_or(DisconnectReason::ClientInitiated); - let handle = server.async_runtime.spawn(async move { - let ffi_room = - server.retrieve_handle::(disconnect.room_handle).unwrap().clone(); - ffi_room.close(server, reason).await; + let ffi_room = server.retrieve_handle::(disconnect.room_handle)?.clone(); + let handle = server.async_runtime.spawn(async move { + ffi_room.close(server, reason).await; let _ = server.send_event(proto::DisconnectCallback { async_id }.into()); }); server.watch_panic(handle); Ok(proto::DisconnectResponse { async_id }) } +/// Simulate a reconnection scenario for chaos / E2E testing. +/// This is an async function; the FfiClient must wait for the SimulateScenarioCallback. +fn on_simulate_scenario( + server: &'static FfiServer, + request: proto::SimulateScenarioRequest, +) -> FfiResult { + let async_id = server.resolve_async_id(request.request_async_id); + let scenario_kind = proto::SimulateScenarioKind::try_from(request.scenario) + .map_err(|_| FfiError::InvalidRequest("unknown SimulateScenarioKind".into()))?; + let scenario = match scenario_kind { + proto::SimulateScenarioKind::SimulateSignalReconnect => SimulateScenario::SignalReconnect, + proto::SimulateScenarioKind::SimulateSpeaker => SimulateScenario::Speaker, + proto::SimulateScenarioKind::SimulateNodeFailure => SimulateScenario::NodeFailure, + proto::SimulateScenarioKind::SimulateServerLeave => SimulateScenario::ServerLeave, + proto::SimulateScenarioKind::SimulateMigration => SimulateScenario::Migration, + proto::SimulateScenarioKind::SimulateForceTcp => SimulateScenario::ForceTcp, + proto::SimulateScenarioKind::SimulateForceTls => SimulateScenario::ForceTls, + proto::SimulateScenarioKind::SimulateFullReconnect => SimulateScenario::FullReconnect, + }; + + let ffi_room = server.retrieve_handle::(request.room_handle)?.clone(); + + let handle = server.async_runtime.spawn(async move { + let error = match ffi_room.inner.room.simulate_scenario(scenario).await { + Ok(()) => None, + Err(err) => Some(err.to_string()), + }; + let _ = server.send_event(proto::SimulateScenarioCallback { async_id, error }.into()); + }); + server.watch_panic(handle); + Ok(proto::SimulateScenarioResponse { async_id }) +} + /// Publish a track to a room, and send a response to the FfiClient /// The FfiClient musts wait for the LocalTrackPublication fn on_publish_track( @@ -1272,6 +1304,7 @@ pub fn handle_request( Request::Dispose(req) => on_dispose(server, req)?.into(), Request::Connect(req) => on_connect(server, req)?.into(), Request::Disconnect(req) => on_disconnect(server, req)?.into(), + Request::SimulateScenario(req) => on_simulate_scenario(server, req)?.into(), Request::PublishTrack(req) => on_publish_track(server, req)?.into(), Request::UnpublishTrack(req) => on_unpublish_track(server, req)?.into(), Request::PublishData(req) => on_publish_data(server, req)?.into(), diff --git a/livekit/src/rtc_engine/mod.rs b/livekit/src/rtc_engine/mod.rs index 1ad21f7de..ace578368 100644 --- a/livekit/src/rtc_engine/mod.rs +++ b/livekit/src/rtc_engine/mod.rs @@ -53,8 +53,21 @@ pub(crate) type EngineResult = Result; pub const RECONNECT_ATTEMPTS: u32 = 10; pub const RECONNECT_INTERVAL: Duration = Duration::from_secs(5); +/// Settling delay before checking PeerConnection state on the resume path. +/// +/// Lets a freshly issued ICE-restart offer/answer round-trip take effect when the +/// underlying PC was still in `Connected` at the moment we started the reconnect +/// (e.g. signal-only failure). Without this, the resume can return success +/// immediately and the next failure detector then trips the engine into a real +/// disconnect. +/// +/// Only applied to the resume path. Full reconnect builds brand-new PCs which +/// don't suffer from the "looks-Connected-but-isn't" race. +pub const PC_RECONNECT_SETTLE_DELAY: Duration = Duration::from_secs(1); + #[derive(Debug, Clone, Copy, Eq, PartialEq)] pub enum SimulateScenario { + /// Closes the signal channel locally; the engine attempts a Resume. SignalReconnect, Speaker, NodeFailure, @@ -62,6 +75,9 @@ pub enum SimulateScenario { Migration, ForceTcp, ForceTls, + /// Tells the server to issue a `LeaveRequest{Reconnect}`, forcing a + /// full reconnect (new RtcSession, republish required). + FullReconnect, } #[derive(Error, Debug)] @@ -787,7 +803,7 @@ impl EngineInner { } log::error!("restarting connection... attempt: {}", i); - if let Err(err) = self + match self .try_restart_connection( &url, &token, @@ -796,12 +812,23 @@ impl EngineInner { ) .await { - log::error!("restarting connection failed: {}", err); - } else { - let (tx, rx) = oneshot::channel(); - let _ = self.engine_tx.send(EngineEvent::Restarted(tx)); - let _ = rx.await; - return Ok(()); + Ok(()) => { + let (tx, rx) = oneshot::channel(); + let _ = self.engine_tx.send(EngineEvent::Restarted(tx)); + let _ = rx.await; + return Ok(()); + } + Err(err) => { + if let Some(reason) = leave_disconnect_reason(&err) { + log::warn!("server requested disconnect during restart: {:?}", reason); + self.running_handle.write().can_reconnect = false; + self.close(reason).await; + return Err(EngineError::Connection( + "server requested disconnect during restart".into(), + )); + } + log::error!("restarting connection failed: {}", err); + } } } else { if i == 0 { @@ -811,15 +838,26 @@ impl EngineInner { } log::error!("resuming connection... attempt: {}", i); - if let Err(err) = self.try_resume_connection().await { - log::error!("resuming connection failed: {}", err); - let mut running_handle = self.running_handle.write(); - running_handle.full_reconnect = true; - } else { - let (tx, rx) = oneshot::channel(); - let _ = self.engine_tx.send(EngineEvent::Resumed(tx)); - let _ = rx.await; - return Ok(()); + match self.try_resume_connection().await { + Ok(()) => { + let (tx, rx) = oneshot::channel(); + let _ = self.engine_tx.send(EngineEvent::Resumed(tx)); + let _ = rx.await; + return Ok(()); + } + Err(err) => { + if let Some(reason) = leave_disconnect_reason(&err) { + log::warn!("server requested disconnect during resume: {:?}", reason); + self.running_handle.write().can_reconnect = false; + self.close(reason).await; + return Err(EngineError::Connection( + "server requested disconnect during resume".into(), + )); + } + log::error!("resuming connection failed: {}", err); + let mut running_handle = self.running_handle.write(); + running_handle.full_reconnect = true; + } } } @@ -889,12 +927,29 @@ impl EngineInner { let (tx, rx) = oneshot::channel(); let _ = self.engine_tx.send(EngineEvent::SignalResumed { reconnect_response, tx }); - // With SignalResumed, the room will send a SyncState message to the server + // With SignalResumed, the room will send a SyncState message to the server. + // SyncState is a pass-through signal so it goes out immediately even though + // the SignalClient is still in `reconnecting=true` state. let _ = rx.await; // The publisher offer must be sent AFTER the SyncState message session.restart_publisher().await?; - session.wait_pc_connection().await + session.wait_pc_reconnected(PC_RECONNECT_SETTLE_DELAY).await?; + + // Re-check the signal connection BEFORE flushing the queue. If the WS died + // while we were waiting for PCs to come back, draining queued mutations + // would just push them into the void; better to bail and let the engine + // try a fresh resume (or escalate). + if !session.signal_client().is_connected().await { + return Err(EngineError::Connection("signal connection severed during resume".into())); + } + + // Flush queued mutations and clear the `reconnecting` flag — at this point + // the resume has fully recovered, so deferred subscription updates / mutes + // / etc. should now reach the server. Mirrors `client.setReconnected()`. + session.signal_client().set_reconnected().await; + + Ok(()) } } @@ -903,3 +958,67 @@ impl From for EngineError { Self::Internal(err.to_string().into()) } } + +/// Inspect a reconnect-attempt error and return the server-supplied disconnect +/// reason iff the server sent `LeaveRequest{action: Disconnect}` while we were +/// trying to (re)connect. In that case the reconnect loop should bail out +/// rather than escalate to a full reconnect — the server is explicitly telling +/// us to stop trying. `Reconnect`/`Resume` actions still fall through to the +/// normal escalation path. +fn leave_disconnect_reason(err: &EngineError) -> Option { + if let EngineError::Signal(SignalError::LeaveRequest { reason, action }) = err { + if *action == proto::leave_request::Action::Disconnect { + return Some(*reason); + } + } + None +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn leave_disconnect_reason_returns_some_only_for_disconnect_action() { + let disconnect_err = EngineError::Signal(SignalError::LeaveRequest { + reason: DisconnectReason::ServerShutdown, + action: proto::leave_request::Action::Disconnect, + }); + assert_eq!( + leave_disconnect_reason(&disconnect_err), + Some(DisconnectReason::ServerShutdown), + "Disconnect action should propagate the server reason" + ); + + for action in + [proto::leave_request::Action::Reconnect, proto::leave_request::Action::Resume] + { + let err = EngineError::Signal(SignalError::LeaveRequest { + reason: DisconnectReason::ServerShutdown, + action, + }); + assert!( + leave_disconnect_reason(&err).is_none(), + "{:?} action must NOT short-circuit the reconnect loop", + action + ); + } + } + + #[test] + fn leave_disconnect_reason_ignores_non_leave_errors() { + let other_errors = [ + EngineError::Connection("network".into()), + EngineError::Internal("bug".into()), + EngineError::Signal(SignalError::SendError), + EngineError::Signal(SignalError::Timeout("waiting".into())), + ]; + for err in &other_errors { + assert!( + leave_disconnect_reason(err).is_none(), + "{:?} must not be treated as a disconnect Leave", + err + ); + } + } +} diff --git a/livekit/src/rtc_engine/rtc_session.rs b/livekit/src/rtc_engine/rtc_session.rs index ab3a50835..eb1e92cc6 100644 --- a/livekit/src/rtc_engine/rtc_session.rs +++ b/livekit/src/rtc_engine/rtc_session.rs @@ -664,6 +664,16 @@ impl RtcSession { self.inner.wait_pc_connection().await } + /// Wait for PCs to be connected on the resume path. + /// + /// Sleeps `settle_delay` before polling, giving the just-issued ICE + /// restart offer/answer round-trip a chance to take effect when the + /// failure was signal-only (PCs may still report `Connected` immediately + /// after a WS hiccup, even though the new ufrag/pwd hasn't propagated yet). + pub async fn wait_pc_reconnected(&self, settle_delay: Duration) -> EngineResult<()> { + self.inner.wait_pc_connection_with_delay(settle_delay).await + } + /// Ensure the publisher peer connection is connected and the data channel is open. /// This triggers negotiation if needed and waits for the connection to be established. pub async fn ensure_publisher_connected(&self) -> EngineResult<()> { @@ -1755,6 +1765,15 @@ impl SessionInner { })) .await? } + SimulateScenario::FullReconnect => { + self.signal_client + .send(proto::signal_request::Message::Simulate(proto::SimulateScenario { + scenario: Some( + proto::simulate_scenario::Scenario::LeaveRequestFullReconnect(true), + ), + })) + .await; + } } Ok(()) } @@ -1872,7 +1891,10 @@ impl SessionInner { } async fn restart_publisher(&self) -> EngineResult<()> { - if self.has_published.load(Ordering::Acquire) { + // In single-PC mode the publisher is the only transport, so always restart its ICE + // even if the user hasn't explicitly published a track yet. Otherwise only restart + // when we have something to keep alive on the publisher side. + if self.single_pc_mode || self.has_published.load(Ordering::Acquire) { self.publisher_pc .create_and_send_offer(OfferOptions { ice_restart: true, ..Default::default() }) .await?; @@ -1882,7 +1904,16 @@ impl SessionInner { /// Timeout after ['MAX_ICE_CONNECT_TIMEOUT'] async fn wait_pc_connection(&self) -> EngineResult<()> { + self.wait_pc_connection_with_delay(Duration::ZERO).await + } + + /// Like [`Self::wait_pc_connection`] but sleeps `settle_delay` before polling. + async fn wait_pc_connection_with_delay(&self, settle_delay: Duration) -> EngineResult<()> { let wait_connected = async move { + if !settle_delay.is_zero() { + livekit_runtime::sleep(settle_delay).await; + } + loop { if self.closed.load(Ordering::Acquire) { return Err(EngineError::Connection("closed".into())); @@ -1895,7 +1926,10 @@ impl SessionInner { self.subscriber_pc.as_ref().map(|pc| pc.is_connected()).unwrap_or(true) }; - let need_publisher = self.has_published.load(Ordering::Acquire); + // In single-PC mode the publisher is the only transport, so it must always + // be connected — independent of whether the user has published any tracks. + let need_publisher = + self.single_pc_mode || self.has_published.load(Ordering::Acquire); if subscriber_connected && (!need_publisher || publisher_connected) { break; diff --git a/livekit/tests/common/e2e/mod.rs b/livekit/tests/common/e2e/mod.rs index 4f5e70ddb..1afb6a125 100644 --- a/livekit/tests/common/e2e/mod.rs +++ b/livekit/tests/common/e2e/mod.rs @@ -74,10 +74,6 @@ impl From for TestRoomOptions { } } -fn is_local_server_url(url: &str) -> bool { - url.contains("localhost:7880") || url.contains("127.0.0.1:7880") -} - /// Creates the specified number of connections to a shared room for testing. pub async fn test_rooms(count: usize) -> Result)>> { test_rooms_with_options((0..count).map(|_| TestRoomOptions::default())).await @@ -88,22 +84,12 @@ pub async fn test_rooms_with_options( options: impl IntoIterator, ) -> Result)>> { let test_env = TestEnvironment::from_env_or_defaults(); - let force_v0 = is_local_server_url(&test_env.server_url); let room_name = format!("test_room_{}", create_random_uuid()); - if force_v0 { - log::info!("Using localhost test server: forcing single_peer_connection=false for E2E"); - } - let tokens = options .into_iter() .enumerate() .map(|(id, mut options)| -> Result<(String, RoomOptions)> { - if force_v0 { - // Local dev server generally does not support /rtc/v1. Force v0 in generic E2E - // tests to avoid extra fallback latency/flakiness. - options.room.single_peer_connection = false; - } options.grants.room = room_name.clone(); let token = AccessToken::with_api_key(&test_env.api_key, &test_env.api_secret) diff --git a/livekit/tests/data_track_test.rs b/livekit/tests/data_track_test.rs index 7568590ab..8b66bff4f 100644 --- a/livekit/tests/data_track_test.rs +++ b/livekit/tests/data_track_test.rs @@ -434,7 +434,7 @@ async fn test_publisher_side_fault(scenario: SimulateScenario) -> Result<()> { if scenario == SimulateScenario::ForceTcp { // Give some time for the track to be republished. Frames will be dropped until then. - time::sleep(Duration::from_millis(2000)).await; + time::sleep(Duration::from_millis(3000)).await; assert_ne!(initial_sid, track.info().sid(), "Should have new SID"); } diff --git a/livekit/tests/peer_connection_signaling_test.rs b/livekit/tests/peer_connection_signaling_test.rs index 8724f33c7..6c10fb178 100644 --- a/livekit/tests/peer_connection_signaling_test.rs +++ b/livekit/tests/peer_connection_signaling_test.rs @@ -17,13 +17,13 @@ //! These tests verify that both V0 (dual peer connection) and V1 (single peer connection) //! signaling modes work correctly. //! -//! V0 (Dual PC): Traditional mode with separate publisher and subscriber peer connections -//! Works on localhost with `livekit-server --dev` +//! V0 (Dual PC): Traditional mode with separate publisher and subscriber peer connections. +//! V1 (Single PC): Single peer connection for both publish and subscribe via `/rtc/v1`. //! -//! V1 (Single PC): New mode with a single peer connection for both publish and subscribe -//! Requires LiveKit Cloud or a server that supports /rtc/v1 endpoint. -//! NOTE: V1 tests will fall back to V0 on localhost, so to truly test V1, -//! you must set the cloud environment variables. +//! Both modes are supported by `livekit-server --dev` (and by LiveKit Cloud); the test +//! suite exercises whichever path the server actually negotiates. If your server is too +//! old to expose `/rtc/v1`, V1 tests will simply fall back to V0 — no special handling +//! is required from the test framework. //! //! Environment variables: //! - LIVEKIT_URL: The LiveKit server URL (defaults to ws://localhost:7880) @@ -103,30 +103,18 @@ fn get_env_for_mode(_mode: SignalingMode) -> (String, String, String) { (url, api_key, api_secret) } -fn is_local_dev_server(url: &str) -> bool { - url.contains("localhost:7880") || url.contains("127.0.0.1:7880") -} - -fn assert_signaling_mode_state(room: &Room, mode: SignalingMode, url: &str) { +fn assert_signaling_mode_state(room: &Room, mode: SignalingMode, _url: &str) { let active_single_pc = room.is_single_peer_connection_active(); match mode { SignalingMode::DualPC => { assert!(!active_single_pc, "DualPC test should not have single-PC mode active"); } SignalingMode::SinglePC => { - if is_local_dev_server(url) { - // Local dev server behavior may vary by version: - // older versions fallback to v0, newer versions may support /rtc/v1. - log::info!( - "SinglePC on localhost: single_pc_active={} (fallback to v0 expected on older servers)", - active_single_pc - ); - } else { - assert!( - active_single_pc, - "SinglePC requested on non-localhost URL should stay in single-PC mode" - ); - } + assert!( + active_single_pc, + "SinglePC requested but server did not negotiate /rtc/v1 \ + (server may be too old)" + ); } } } @@ -280,26 +268,6 @@ async fn test_v1_publish_ten_video_and_ten_audio_tracks() -> Result<()> { test_publish_ten_video_and_ten_audio_tracks_impl(SignalingMode::SinglePC).await } -/// Test explicit localhost fallback behavior for V1 signaling -#[test_log::test(tokio::test)] -async fn test_v1_localhost_fallback_to_v0() -> Result<()> { - if env::var("LIVEKIT_URL").is_ok() { - log::info!("Skipping localhost fallback test because LIVEKIT_URL override is set"); - return Ok(()); - } - - let room_name = format!("test_v1_localhost_fallback_{}", create_random_uuid()); - let token = create_token(DEFAULT_API_KEY, DEFAULT_API_SECRET, &room_name, "fallback_test")?; - let (room, _events) = - connect_room(DEFAULT_LOCALHOST_URL, &token, SignalingMode::SinglePC).await?; - if room.is_single_peer_connection_active() { - log::info!("Localhost server supports /rtc/v1; skipping fallback assertion"); - return Ok(()); - } - assert!(!room.is_single_peer_connection_active(), "Expected fallback to v0"); - Ok(()) -} - /// Test that a participant with can_subscribe=false in their token can connect without timing out. #[test_log::test(tokio::test)] async fn test_v0_connect_can_subscribe_false() -> Result<()> { @@ -323,6 +291,38 @@ async fn test_v1_double_reconnect() -> Result<()> { test_double_reconnect_impl(SignalingMode::SinglePC).await } +/// Corner case: resume without ever having published a track. In single-PC mode +/// this exercises the publisher ICE restart even though `has_published=false` +/// (the fix for the single-PC publisher gating bug). In dual-PC subscriber- +/// primary mode it just confirms a no-track resume doesn't regress. +#[test_log::test(tokio::test)] +async fn test_v0_resume_without_prior_publish() -> Result<()> { + test_resume_without_prior_publish_impl(SignalingMode::DualPC).await +} + +#[test_log::test(tokio::test)] +async fn test_v1_resume_without_prior_publish() -> Result<()> { + test_resume_without_prior_publish_impl(SignalingMode::SinglePC).await +} + +/// Corner case: a queueable mutation (mute) issued *during* a signal +/// reconnect must reach the server after the resume completes. This is the +/// test that exercises both halves of the pass-through fix: +/// 1. Trickle ICE candidates emitted internally during the reconnect must +/// flow to the server (else ICE wouldn't reconnect and the resume would +/// time out — implicit in any successful resume). +/// 2. The user's mute_track call must be queued, NOT dropped, and flushed +/// after `set_reconnected()`. We verify the subscriber observes the mute. +#[test_log::test(tokio::test)] +async fn test_v0_mute_during_reconnect_lands_on_server() -> Result<()> { + test_mute_during_reconnect_impl(SignalingMode::DualPC).await +} + +#[test_log::test(tokio::test)] +async fn test_v1_mute_during_reconnect_lands_on_server() -> Result<()> { + test_mute_during_reconnect_impl(SignalingMode::SinglePC).await +} + // ==================== Test Implementations ==================== /// Test basic connection @@ -818,6 +818,152 @@ async fn test_connect_can_subscribe_false_impl(mode: SignalingMode) -> Result<() Ok(()) } +/// Test resume on a room that has not published anything. +/// +/// In single-PC mode the publisher PC is the *only* transport. Pre-fix, the +/// engine skipped the ICE restart on resume when `has_published=false`, leaving +/// the only transport on stale ICE credentials and tripping a full reconnect. +/// This test connects with `single_peer_connection=true`, never publishes, +/// triggers `SignalReconnect`, and asserts the resume completes. +async fn test_resume_without_prior_publish_impl(mode: SignalingMode) -> Result<()> { + let (url, api_key, api_secret) = get_env_for_mode(mode); + let room_name = format!("test_{:?}_no_pub_resume_{}", mode, create_random_uuid()); + let token = create_token(&api_key, &api_secret, &room_name, "no_pub_tester")?; + + let (room, mut events) = connect_room(&url, &token, mode).await?; + assert_signaling_mode_state(&room, mode, &url); + + assert_eq!( + room.local_participant().track_publications().len(), + 0, + "precondition: no tracks published" + ); + + log::info!("[{}] Triggering SignalReconnect with no published tracks", mode.name()); + room.simulate_scenario(SimulateScenario::SignalReconnect).await?; + + let wait_reconnected = async { + loop { + let Some(event) = events.recv().await else { + return Err(anyhow!("Event channel closed")); + }; + if let RoomEvent::Reconnected = event { + return Ok(()); + } + } + }; + timeout(Duration::from_secs(30), wait_reconnected) + .await + .context("Timeout waiting for resume to complete with no published tracks")??; + assert_eq!(room.connection_state(), ConnectionState::Connected); + + // Now publish — the post-resume PC must accept negotiation. If the resume + // left the publisher PC on stale ICE (the bug), this publish would fail or + // hang because the renegotiation rides the same dead transport. + let room_arc = Arc::new(room); + let sine_params = + SineParameters { freq: 440.0, amplitude: 1.0, sample_rate: 48000, num_channels: 1 }; + let mut sine_track = SineTrack::new(room_arc.clone(), sine_params); + timeout(Duration::from_secs(10), sine_track.publish()) + .await + .context("Timeout publishing after resume-without-prior-publish")??; + + Ok(()) +} + +/// Test that a queueable mutation issued *during* a signal reconnect reaches +/// the server after the resume completes, and the subscriber observes it. +/// +/// Exercises both halves of the pass-through fix: +/// 1. Pass-through Trickles emitted internally during the reconnect must flow +/// over the new WS — implicit in any successful resume. +/// 2. The user's mute call (`Mute` is queueable) must be queued, not dropped, +/// and flushed by `set_reconnected()` after the resume completes. The +/// subscriber should see `RoomEvent::TrackMuted`. +async fn test_mute_during_reconnect_impl(mode: SignalingMode) -> Result<()> { + log::info!("[{}] Testing mute during reconnect", mode.name()); + let mut rooms = test_rooms_with_options([room_options(mode), room_options(mode)]).await?; + let (sub_room, mut sub_events) = rooms.pop().unwrap(); + let (pub_room, mut pub_events) = rooms.pop().unwrap(); + let pub_room_arc = Arc::new(pub_room); + + let sine_params = + SineParameters { freq: 440.0, amplitude: 1.0, sample_rate: 48000, num_channels: 1 }; + let mut sine_track = SineTrack::new(pub_room_arc.clone(), sine_params); + sine_track.publish().await?; + + // Wait for the subscriber to receive the track before reconnecting. + let wait_subscribed = async { + loop { + let Some(event) = sub_events.recv().await else { + return Err(anyhow!("Event channel closed")); + }; + if let RoomEvent::TrackSubscribed { .. } = event { + return Ok(()); + } + } + }; + timeout(Duration::from_secs(15), wait_subscribed) + .await + .context("Timeout waiting for initial track subscription")??; + + // Snapshot the publication so we can mute it during the reconnect window. + let pub_publication = pub_room_arc + .local_participant() + .track_publications() + .into_iter() + .next() + .map(|(_, p)| p) + .ok_or_else(|| anyhow!("publisher has no publications after publish"))?; + let track_sid = pub_publication.sid().to_string(); + + log::info!("[{}] Triggering reconnect, then immediately muting", mode.name()); + pub_room_arc.simulate_scenario(SimulateScenario::SignalReconnect).await?; + + // The reconnect window opens here. The mute is issued while the engine is + // mid-resume, so the underlying signal_client.send(Mute) is queued (Mute is + // a queueable signal). Without the fix, the queue was drained inside + // `SignalInner::restart` *before* SyncState; now it's drained by + // `set_reconnected()` after the resume has fully recovered. + pub_publication.mute(); + + // Wait for the subscriber to observe the mute. Use a generous timeout + // because the mute can only land after: WS reconnect + SyncState + ICE + // restart + set_reconnected → flush queue → server forwards to subscriber. + let wait_mute = async { + loop { + let Some(event) = sub_events.recv().await else { + return Err(anyhow!("Event channel closed")); + }; + if let RoomEvent::TrackMuted { publication, .. } = event { + if publication.sid().to_string() == track_sid { + return Ok(()); + } + } + } + }; + timeout(Duration::from_secs(30), wait_mute) + .await + .context("Timeout waiting for subscriber to observe mute issued during reconnect")??; + + // Sanity: publisher side should also have completed the resume. + let wait_pub_reconnected = async { + loop { + match pub_events.recv().await { + Some(RoomEvent::Reconnected) => return Ok(()), + Some(_) => {} + None => return Err(anyhow!("Event channel closed")), + } + } + }; + // The Reconnected event likely already fired before we got the mute; + // give a short fallback timeout in case the test framework drained it. + let _ = timeout(Duration::from_secs(5), wait_pub_reconnected).await; + assert_eq!(pub_room_arc.connection_state(), ConnectionState::Connected); + + Ok(()) +} + /// Test two sequential reconnect cycles on the same room connection async fn test_double_reconnect_impl(mode: SignalingMode) -> Result<()> { let (url, api_key, api_secret) = get_env_for_mode(mode); diff --git a/webrtc-sys/src/rtc_error.rs b/webrtc-sys/src/rtc_error.rs index dc254d6ad..53ee20cce 100644 --- a/webrtc-sys/src/rtc_error.rs +++ b/webrtc-sys/src/rtc_error.rs @@ -64,23 +64,51 @@ pub mod ffi { } impl ffi::RtcError { - /// # Safety - /// The value must be correctly encoded - pub unsafe fn from(value: &str) -> Self { - // Parse the hex encoded error from c++ - let error_type = u32::from_str_radix(&value[0..8], 16).unwrap(); - let error_detail = u32::from_str_radix(&value[8..16], 16).unwrap(); - let has_scp_cause_code = u8::from_str_radix(&value[16..18], 16).unwrap(); - let sctp_cause_code = u16::from_str_radix(&value[18..22], 16).unwrap(); - let message = String::from(&value[22..]); // msg isn't encoded - - Self { - error_type: std::mem::transmute(error_type), - error_detail: std::mem::transmute(error_detail), + /// Parse the hex-encoded error string the C++ side stuffs into the + /// `cxx::Exception` "what" message (see `webrtc-sys/src/rtc_error.cpp` + /// `serialize_error`). The format is fixed-width: + /// + /// ```text + /// bytes 0..8 error_type (u32 hex) + /// bytes 8..16 error_detail (u32 hex) + /// bytes 16..18 has_sctp_cause_code (u8 hex, 0 or 1) + /// bytes 18..22 sctp_cause_code (u16 hex) + /// bytes 22.. message (raw, not encoded) + /// ``` + /// + /// Returns `None` if the input is shorter than the fixed header or the + /// header bytes aren't valid hex. Discriminants outside the known + /// variants for `RtcErrorType` / `RtcErrorDetailType` fall back to + /// `None` for the affected field instead of being `transmute`d into + /// the enum (which is instant UB and what nightly's + /// `ptr::copy_nonoverlapping` precondition check was firing on). + pub fn parse(value: &str) -> Option { + if value.len() < 22 { + return None; + } + let error_type = u32::from_str_radix(&value[0..8], 16).ok()?; + let error_detail = u32::from_str_radix(&value[8..16], 16).ok()?; + let has_scp_cause_code = u8::from_str_radix(&value[16..18], 16).ok()?; + let sctp_cause_code = u16::from_str_radix(&value[18..22], 16).ok()?; + let message = String::from(&value[22..]); + + Some(Self { + error_type: rtc_error_type_from_u32(error_type), + error_detail: rtc_error_detail_type_from_u32(error_detail), sctp_cause_code, has_sctp_cause_code: has_scp_cause_code == 1, message, - } + }) + } + + /// Backwards-compatible wrapper for callers that already trust the input + /// is well-formed. + /// + /// # Safety + /// Marked `unsafe` purely for source-compat with prior callers; the body + /// no longer relies on caller-upheld invariants. + pub unsafe fn from(value: &str) -> Self { + Self::parse(value).expect("malformed serialized RtcError") } pub fn ok(&self) -> bool { @@ -88,6 +116,38 @@ impl ffi::RtcError { } } +fn rtc_error_type_from_u32(value: u32) -> ffi::RtcErrorType { + match value { + 0 => ffi::RtcErrorType::None, + 1 => ffi::RtcErrorType::UnsupportedOperation, + 2 => ffi::RtcErrorType::UnsupportedParameter, + 3 => ffi::RtcErrorType::InvalidParameter, + 4 => ffi::RtcErrorType::InvalidRange, + 5 => ffi::RtcErrorType::SyntaxError, + 6 => ffi::RtcErrorType::InvalidState, + 7 => ffi::RtcErrorType::InvalidModification, + 8 => ffi::RtcErrorType::NetworkError, + 9 => ffi::RtcErrorType::ResourceExhausted, + 10 => ffi::RtcErrorType::InternalError, + 11 => ffi::RtcErrorType::OperationErrorWithData, + _ => ffi::RtcErrorType::None, + } +} + +fn rtc_error_detail_type_from_u32(value: u32) -> ffi::RtcErrorDetailType { + match value { + 0 => ffi::RtcErrorDetailType::None, + 1 => ffi::RtcErrorDetailType::DataChannelFailure, + 2 => ffi::RtcErrorDetailType::DtlsFailure, + 3 => ffi::RtcErrorDetailType::FingerprintFailure, + 4 => ffi::RtcErrorDetailType::SctpFailure, + 5 => ffi::RtcErrorDetailType::SdpSyntaxError, + 6 => ffi::RtcErrorDetailType::HardwareEncoderNotAvailable, + 7 => ffi::RtcErrorDetailType::HardwareEncoderError, + _ => ffi::RtcErrorDetailType::None, + } +} + impl Error for ffi::RtcError {} impl Display for ffi::RtcError {