diff --git a/.changeset/soft-paws-sink.md b/.changeset/soft-paws-sink.md new file mode 100644 index 00000000..6e6e3d1a --- /dev/null +++ b/.changeset/soft-paws-sink.md @@ -0,0 +1,5 @@ +--- +'@livekit/rtc-node': patch +--- + +Add frame processor support for audio streams diff --git a/packages/livekit-rtc/src/audio_frame.ts b/packages/livekit-rtc/src/audio_frame.ts index 17cae161..75b18f22 100644 --- a/packages/livekit-rtc/src/audio_frame.ts +++ b/packages/livekit-rtc/src/audio_frame.ts @@ -11,6 +11,8 @@ export class AudioFrame { channels: number; samplesPerChannel: number; + private _userdata: Record = {}; + // note: if converting from Uint8Array to Int16Array, *do not* use buffer.slice! // it is marked unstable by Node and can cause undefined behaviour, such as massive chunks of // noise being added to the end. @@ -51,6 +53,11 @@ export class AudioFrame { samplesPerChannel: this.samplesPerChannel, }); } + + /** Returns the user data associated with the audio frame. */ + get userdata() { + return this._userdata; + } } /** diff --git a/packages/livekit-rtc/src/audio_stream.ts b/packages/livekit-rtc/src/audio_stream.ts index bafdddb2..16aac433 100644 --- a/packages/livekit-rtc/src/audio_stream.ts +++ b/packages/livekit-rtc/src/audio_stream.ts @@ -5,12 +5,14 @@ import type { UnderlyingSource } from 'node:stream/web'; import { AudioFrame } from './audio_frame.js'; import type { FfiEvent } from './ffi_client.js'; import { FfiClient, FfiClientEvent, FfiHandle } from './ffi_client.js'; +import { FrameProcessor } from './frame_processor.js'; +import { log } from './log.js'; import type { NewAudioStreamResponse } from './proto/audio_frame_pb.js'; import { AudioStreamType, NewAudioStreamRequest } from './proto/audio_frame_pb.js'; import type { Track } from './track.js'; export interface AudioStreamOptions { - noiseCancellation?: NoiseCancellationOptions; + noiseCancellation?: NoiseCancellationOptions | FrameProcessor; sampleRate?: number; numChannels?: number; frameSizeMs?: number; @@ -18,6 +20,7 @@ export interface AudioStreamOptions { export interface NoiseCancellationOptions { moduleId: string; + // eslint-disable-next-line @typescript-eslint/no-explicit-any options: Record; } @@ -26,7 +29,8 @@ class AudioStreamSource implements UnderlyingSource { private ffiHandle: FfiHandle; private sampleRate: number; private numChannels: number; - private ncOptions?: NoiseCancellationOptions; + private legacyNcOptions?: NoiseCancellationOptions; + private frameProcessor?: FrameProcessor; private frameSizeMs?: number; constructor( @@ -37,7 +41,11 @@ class AudioStreamSource implements UnderlyingSource { if (sampleRateOrOptions !== undefined && typeof sampleRateOrOptions !== 'number') { this.sampleRate = sampleRateOrOptions.sampleRate ?? 48000; this.numChannels = sampleRateOrOptions.numChannels ?? 1; - this.ncOptions = sampleRateOrOptions.noiseCancellation; + if (sampleRateOrOptions.noiseCancellation instanceof FrameProcessor) { + this.frameProcessor = sampleRateOrOptions.noiseCancellation; + } else { + this.legacyNcOptions = sampleRateOrOptions.noiseCancellation; + } this.frameSizeMs = sampleRateOrOptions.frameSizeMs; } else { this.sampleRate = (sampleRateOrOptions as number) ?? 48000; @@ -50,10 +58,10 @@ class AudioStreamSource implements UnderlyingSource { sampleRate: this.sampleRate, numChannels: this.numChannels, frameSizeMs: this.frameSizeMs, - ...(this.ncOptions + ...(this.legacyNcOptions ? { - audioFilterModuleId: this.ncOptions.moduleId, - audioFilterOptions: JSON.stringify(this.ncOptions.options), + audioFilterModuleId: this.legacyNcOptions.moduleId, + audioFilterOptions: JSON.stringify(this.legacyNcOptions.options), } : {}), }); @@ -85,7 +93,14 @@ class AudioStreamSource implements UnderlyingSource { const streamEvent = ev.message.value.message; switch (streamEvent.case) { case 'frameReceived': - const frame = AudioFrame.fromOwnedInfo(streamEvent.value.frame!); + let frame = AudioFrame.fromOwnedInfo(streamEvent.value.frame!); + if (this.frameProcessor && this.frameProcessor.isEnabled()) { + try { + frame = this.frameProcessor.process(frame); + } catch (err: unknown) { + log.warn(`Frame processing failed, passing through original frame: ${err}`); + } + } this.controller.enqueue(frame); break; case 'eos': diff --git a/packages/livekit-rtc/src/data_streams/stream_reader.ts b/packages/livekit-rtc/src/data_streams/stream_reader.ts index 1265dc8b..8e0528c0 100644 --- a/packages/livekit-rtc/src/data_streams/stream_reader.ts +++ b/packages/livekit-rtc/src/data_streams/stream_reader.ts @@ -53,14 +53,14 @@ export class ByteStreamReader extends BaseStreamReader { try { const { done, value } = await reader.read(); if (done) { - return { done: true, value: undefined as any }; + return { done: true, value: undefined as unknown }; } else { this.handleChunkReceived(value); return { done: false, value: value.content! }; } - } catch (error: any) { + } catch (error: unknown) { log.error('error processing stream update: %s', error); - return { done: true, value: undefined }; + return { done: true, value: undefined as unknown }; } }, @@ -135,7 +135,7 @@ export class TextStreamReader extends BaseStreamReader { value: decoder.decode(value.content!), }; } - } catch (error: any) { + } catch (error: unknown) { log.error('error processing stream update: %s', error); return { done: true, value: undefined }; } diff --git a/packages/livekit-rtc/src/frame_processor.ts b/packages/livekit-rtc/src/frame_processor.ts new file mode 100644 index 00000000..70f6be11 --- /dev/null +++ b/packages/livekit-rtc/src/frame_processor.ts @@ -0,0 +1,29 @@ +// SPDX-FileCopyrightText: 2025 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import { type AudioFrame } from './audio_frame.js'; +import { type VideoFrame } from './video_frame.js'; + +export type FrameProcessorStreamInfo = { + roomName: string; + participantIdentity: string; + publicationSid: string; +}; + +export type FrameProcessorCredentials = { + token: string; + url: string; +}; + +export abstract class FrameProcessor { + abstract isEnabled(): boolean; + abstract setEnabled(enabled: boolean): void; + + // eslint-disable-next-line @typescript-eslint/no-unused-vars + onStreamInfoUpdated(_info: FrameProcessorStreamInfo): void {} + // eslint-disable-next-line @typescript-eslint/no-unused-vars + onCredentialsUpdated(_credentials: FrameProcessorCredentials): void {} + + abstract process(frame: Frame): Frame; + abstract close(): void; +} diff --git a/packages/livekit-rtc/src/index.ts b/packages/livekit-rtc/src/index.ts index 7e5d6433..d60dc5a7 100644 --- a/packages/livekit-rtc/src/index.ts +++ b/packages/livekit-rtc/src/index.ts @@ -50,3 +50,8 @@ export type { ChatMessage } from './types.js'; export { VideoFrame } from './video_frame.js'; export { VideoSource } from './video_source.js'; export { VideoStream, type VideoFrameEvent } from './video_stream.js'; +export { + FrameProcessor, + type FrameProcessorStreamInfo, + type FrameProcessorCredentials, +} from './frame_processor.js'; diff --git a/packages/livekit-rtc/src/room.ts b/packages/livekit-rtc/src/room.ts index d2c318de..b68778f7 100644 --- a/packages/livekit-rtc/src/room.ts +++ b/packages/livekit-rtc/src/room.ts @@ -18,7 +18,7 @@ import { FfiClient, FfiClientEvent, FfiHandle } from './ffi_client.js'; import { log } from './log.js'; import type { Participant } from './participant.js'; import { LocalParticipant, RemoteParticipant } from './participant.js'; -import { EncryptionState, EncryptionType } from './proto/e2ee_pb.js'; +import { EncryptionState, type EncryptionType } from './proto/e2ee_pb.js'; import type { FfiEvent } from './proto/ffi_pb.js'; import type { DisconnectReason, OwnedParticipant } from './proto/participant_pb.js'; import type { DataStream_Trailer, DisconnectCallback } from './proto/room_pb.js'; @@ -179,10 +179,10 @@ export class Room extends (EventEmitter as new () => TypedEmitter /** * Connects to a LiveKit room using the provided URL and access token. - * @param url The WebSocket URL of the LiveKit server - * @param token A valid LiveKit access token for authentication - * @param opts Optional room configuration options - * @throws ConnectError if connection fails + * @param url - The WebSocket URL of the LiveKit server + * @param token - A valid LiveKit access token for authentication + * @param opts - Optional room configuration options + * @throws ConnectError - if connection fails */ async connect(url: string, token: string, opts?: RoomOptions) { const options = { ...defaultRoomOptions, ...opts }; @@ -268,9 +268,9 @@ export class Room extends (EventEmitter as new () => TypedEmitter /** * Registers a handler for incoming text data streams on a specific topic. * Text streams are used for receiving structured text data from other participants. - * @param topic The topic to listen for text streams on - * @param callback Function to handle incoming text stream data - * @throws Error if a handler for this topic is already registered + * @param topic - The topic to listen for text streams on + * @param callback - Function to handle incoming text stream data + * @throws Error - if a handler for this topic is already registered */ registerTextStreamHandler(topic: string, callback: TextStreamHandler) { if (this.textStreamHandlers.has(topic)) { @@ -286,9 +286,9 @@ export class Room extends (EventEmitter as new () => TypedEmitter /** * Registers a handler for incoming byte data streams on a specific topic. * Byte streams are used for receiving binary data like files from other participants. - * @param topic The topic to listen for byte streams on - * @param callback Function to handle incoming byte stream data - * @throws Error if a handler for this topic is already registered + * @param topic - The topic to listen for byte streams on + * @param callback - Function to handle incoming byte stream data + * @throws Error - if a handler for this topic is already registered */ registerByteStreamHandler(topic: string, callback: ByteStreamHandler) { if (this.byteStreamHandlers.has(topic)) { @@ -415,8 +415,8 @@ export class Room extends (EventEmitter as new () => TypedEmitter } this.emit(RoomEvent.TrackSubscribed, publication.track!, publication, participant); - } catch (e: any) { - console.warn(`RoomEvent.TrackSubscribed: ${e.message}`); + } catch (e: unknown) { + console.warn(`RoomEvent.TrackSubscribed: ${(e as Error).message}`); } } else if (ev.case == 'trackUnsubscribed') { try { @@ -428,8 +428,8 @@ export class Room extends (EventEmitter as new () => TypedEmitter publication.track = undefined; publication.subscribed = false; this.emit(RoomEvent.TrackUnsubscribed, track, publication, participant); - } catch (e: any) { - console.warn(`RoomEvent.TrackUnsubscribed: ${e.message}`); + } catch (e: unknown) { + console.warn(`RoomEvent.TrackUnsubscribed: ${(e as Error).message}`); } } else if (ev.case == 'trackSubscriptionFailed') { try { @@ -440,8 +440,8 @@ export class Room extends (EventEmitter as new () => TypedEmitter participant, ev.value.error, ); - } catch (e: any) { - console.warn(`RoomEvent.TrackSubscriptionFailed: ${e.message}`); + } catch (e: unknown) { + console.warn(`RoomEvent.TrackSubscriptionFailed: ${(e as Error).message}`); } } else if (ev.case == 'trackMuted') { try { @@ -454,8 +454,8 @@ export class Room extends (EventEmitter as new () => TypedEmitter publication.track.info!.muted = true; } this.emit(RoomEvent.TrackMuted, publication, participant); - } catch (e: any) { - console.warn(`RoomEvent.TrackMuted: ${e.message}`); + } catch (e: unknown) { + console.warn(`RoomEvent.TrackMuted: ${(e as Error).message}`); } } else if (ev.case == 'trackUnmuted') { try { @@ -468,8 +468,8 @@ export class Room extends (EventEmitter as new () => TypedEmitter publication.track.info!.muted = false; } this.emit(RoomEvent.TrackUnmuted, publication, participant); - } catch (e: any) { - console.warn(`RoomEvent.TrackUnmuted: ${e.message}`); + } catch (e: unknown) { + console.warn(`RoomEvent.TrackUnmuted: ${(e as Error).message}`); } } else if (ev.case == 'activeSpeakersChanged') { try { @@ -477,8 +477,8 @@ export class Room extends (EventEmitter as new () => TypedEmitter this.requireParticipantByIdentity(identity), ); this.emit(RoomEvent.ActiveSpeakersChanged, activeSpeakers); - } catch (e: any) { - console.warn(`RoomEvent.ActiveSpeakersChanged: ${e.message}`); + } catch (e: unknown) { + console.warn(`RoomEvent.ActiveSpeakersChanged: ${(e as Error).message}`); } } else if (ev.case == 'roomMetadataChanged') { this.info.metadata = ev.value.metadata ?? ''; @@ -488,16 +488,16 @@ export class Room extends (EventEmitter as new () => TypedEmitter const participant = this.requireParticipantByIdentity(ev.value.participantIdentity!); participant.info.metadata = ev.value.metadata; this.emit(RoomEvent.ParticipantMetadataChanged, participant.metadata, participant); - } catch (e: any) { - console.warn(`RoomEvent.ParticipantMetadataChanged: ${e.message}`); + } catch (e: unknown) { + console.warn(`RoomEvent.ParticipantMetadataChanged: ${(e as Error).message}`); } } else if (ev.case == 'participantNameChanged') { try { const participant = this.requireParticipantByIdentity(ev.value.participantIdentity!); participant.info.name = ev.value.name; this.emit(RoomEvent.ParticipantNameChanged, participant.name!, participant); - } catch (e: any) { - console.warn(`RoomEvent.ParticipantNameChanged: ${e.message}`); + } catch (e: unknown) { + console.warn(`RoomEvent.ParticipantNameChanged: ${(e as Error).message}`); } } else if (ev.case == 'participantAttributesChanged') { try { @@ -519,15 +519,15 @@ export class Room extends (EventEmitter as new () => TypedEmitter ); this.emit(RoomEvent.ParticipantAttributesChanged, changedAttributes, participant); } - } catch (e: any) { - console.warn(`RoomEvent.ParticipantAttributesChanged: ${e.message}`); + } catch (e: unknown) { + console.warn(`RoomEvent.ParticipantAttributesChanged: ${(e as Error).message}`); } } else if (ev.case == 'connectionQualityChanged') { try { const participant = this.requireParticipantByIdentity(ev.value.participantIdentity!); this.emit(RoomEvent.ConnectionQualityChanged, ev.value.quality!, participant); - } catch (e: any) { - console.warn(`RoomEvent.ConnectionQualityChanged: ${e.message}`); + } catch (e: unknown) { + console.warn(`RoomEvent.ConnectionQualityChanged: ${(e as Error).message}`); } } else if (ev.case == 'chatMessage') { const participant = this.retrieveParticipantByIdentity(ev.value.participantIdentity!); @@ -611,8 +611,8 @@ export class Room extends (EventEmitter as new () => TypedEmitter !!ev.value.isEncrypted, participant, ); - } catch (e: any) { - console.warn(`RoomEvent.ParticipantEncryptionStatusChanged: ${e.message}`); + } catch (e: unknown) { + console.warn(`RoomEvent.ParticipantEncryptionStatusChanged: ${(e as Error).message}`); } } };