Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/soft-paws-sink.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@livekit/rtc-node': patch
---

Add frame processor support for audio streams
7 changes: 7 additions & 0 deletions packages/livekit-rtc/src/audio_frame.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ export class AudioFrame {
channels: number;
samplesPerChannel: number;

private _userdata: Record<string, unknown> = {};

// note: if converting from Uint8Array to Int16Array, *do not* use buffer.slice!
// it is marked unstable by Node and can cause undefined behaviour, such as massive chunks of
// noise being added to the end.
Expand Down Expand Up @@ -51,6 +53,11 @@ export class AudioFrame {
samplesPerChannel: this.samplesPerChannel,
});
}

/** Returns the user data associated with the audio frame. */
get userdata() {
return this._userdata;
}
}

/**
Expand Down
29 changes: 22 additions & 7 deletions packages/livekit-rtc/src/audio_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,22 @@ import type { UnderlyingSource } from 'node:stream/web';
import { AudioFrame } from './audio_frame.js';
import type { FfiEvent } from './ffi_client.js';
import { FfiClient, FfiClientEvent, FfiHandle } from './ffi_client.js';
import { FrameProcessor } from './frame_processor.js';
import { log } from './log.js';
import type { NewAudioStreamResponse } from './proto/audio_frame_pb.js';
import { AudioStreamType, NewAudioStreamRequest } from './proto/audio_frame_pb.js';
import type { Track } from './track.js';

export interface AudioStreamOptions {
noiseCancellation?: NoiseCancellationOptions;
noiseCancellation?: NoiseCancellationOptions | FrameProcessor<AudioFrame>;
sampleRate?: number;
numChannels?: number;
frameSizeMs?: number;
}

export interface NoiseCancellationOptions {
moduleId: string;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
options: Record<string, any>;
}

Expand All @@ -26,7 +29,8 @@ class AudioStreamSource implements UnderlyingSource<AudioFrame> {
private ffiHandle: FfiHandle;
private sampleRate: number;
private numChannels: number;
private ncOptions?: NoiseCancellationOptions;
private legacyNcOptions?: NoiseCancellationOptions;
private frameProcessor?: FrameProcessor<AudioFrame>;
private frameSizeMs?: number;

constructor(
Expand All @@ -37,7 +41,11 @@ class AudioStreamSource implements UnderlyingSource<AudioFrame> {
if (sampleRateOrOptions !== undefined && typeof sampleRateOrOptions !== 'number') {
this.sampleRate = sampleRateOrOptions.sampleRate ?? 48000;
this.numChannels = sampleRateOrOptions.numChannels ?? 1;
this.ncOptions = sampleRateOrOptions.noiseCancellation;
if (sampleRateOrOptions.noiseCancellation instanceof FrameProcessor) {
this.frameProcessor = sampleRateOrOptions.noiseCancellation;
} else {
this.legacyNcOptions = sampleRateOrOptions.noiseCancellation;
}
this.frameSizeMs = sampleRateOrOptions.frameSizeMs;
} else {
this.sampleRate = (sampleRateOrOptions as number) ?? 48000;
Expand All @@ -50,10 +58,10 @@ class AudioStreamSource implements UnderlyingSource<AudioFrame> {
sampleRate: this.sampleRate,
numChannels: this.numChannels,
frameSizeMs: this.frameSizeMs,
...(this.ncOptions
...(this.legacyNcOptions
? {
audioFilterModuleId: this.ncOptions.moduleId,
audioFilterOptions: JSON.stringify(this.ncOptions.options),
audioFilterModuleId: this.legacyNcOptions.moduleId,
audioFilterOptions: JSON.stringify(this.legacyNcOptions.options),
}
: {}),
});
Expand Down Expand Up @@ -85,7 +93,14 @@ class AudioStreamSource implements UnderlyingSource<AudioFrame> {
const streamEvent = ev.message.value.message;
switch (streamEvent.case) {
case 'frameReceived':
const frame = AudioFrame.fromOwnedInfo(streamEvent.value.frame!);
let frame = AudioFrame.fromOwnedInfo(streamEvent.value.frame!);
if (this.frameProcessor && this.frameProcessor.isEnabled()) {
try {
frame = this.frameProcessor.process(frame);
} catch (err: unknown) {
log.warn(`Frame processing failed, passing through original frame: ${err}`);
}
}
this.controller.enqueue(frame);
break;
case 'eos':
Expand Down
8 changes: 4 additions & 4 deletions packages/livekit-rtc/src/data_streams/stream_reader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,14 @@ export class ByteStreamReader extends BaseStreamReader<ByteStreamInfo> {
try {
const { done, value } = await reader.read();
if (done) {
return { done: true, value: undefined as any };
return { done: true, value: undefined as unknown };
} else {
this.handleChunkReceived(value);
return { done: false, value: value.content! };
}
} catch (error: any) {
} catch (error: unknown) {
log.error('error processing stream update: %s', error);
return { done: true, value: undefined };
return { done: true, value: undefined as unknown };
}
},

Expand Down Expand Up @@ -135,7 +135,7 @@ export class TextStreamReader extends BaseStreamReader<TextStreamInfo> {
value: decoder.decode(value.content!),
};
}
} catch (error: any) {
} catch (error: unknown) {
log.error('error processing stream update: %s', error);
return { done: true, value: undefined };
}
Expand Down
29 changes: 29 additions & 0 deletions packages/livekit-rtc/src/frame_processor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// SPDX-FileCopyrightText: 2025 LiveKit, Inc.
//
// SPDX-License-Identifier: Apache-2.0
import { type AudioFrame } from './audio_frame.js';
import { type VideoFrame } from './video_frame.js';

export type FrameProcessorStreamInfo = {
roomName: string;
participantIdentity: string;
publicationSid: string;
};

export type FrameProcessorCredentials = {
token: string;
url: string;
};

export abstract class FrameProcessor<Frame extends VideoFrame | AudioFrame> {
abstract isEnabled(): boolean;
abstract setEnabled(enabled: boolean): void;

// eslint-disable-next-line @typescript-eslint/no-unused-vars
onStreamInfoUpdated(_info: FrameProcessorStreamInfo): void {}
// eslint-disable-next-line @typescript-eslint/no-unused-vars
onCredentialsUpdated(_credentials: FrameProcessorCredentials): void {}

abstract process(frame: Frame): Frame;
abstract close(): void;
}
5 changes: 5 additions & 0 deletions packages/livekit-rtc/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,8 @@ export type { ChatMessage } from './types.js';
export { VideoFrame } from './video_frame.js';
export { VideoSource } from './video_source.js';
export { VideoStream, type VideoFrameEvent } from './video_stream.js';
export {
FrameProcessor,
type FrameProcessorStreamInfo,
type FrameProcessorCredentials,
} from './frame_processor.js';
66 changes: 33 additions & 33 deletions packages/livekit-rtc/src/room.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import { FfiClient, FfiClientEvent, FfiHandle } from './ffi_client.js';
import { log } from './log.js';
import type { Participant } from './participant.js';
import { LocalParticipant, RemoteParticipant } from './participant.js';
import { EncryptionState, EncryptionType } from './proto/e2ee_pb.js';
import { EncryptionState, type EncryptionType } from './proto/e2ee_pb.js';
import type { FfiEvent } from './proto/ffi_pb.js';
import type { DisconnectReason, OwnedParticipant } from './proto/participant_pb.js';
import type { DataStream_Trailer, DisconnectCallback } from './proto/room_pb.js';
Expand Down Expand Up @@ -179,10 +179,10 @@ export class Room extends (EventEmitter as new () => TypedEmitter<RoomCallbacks>

/**
* Connects to a LiveKit room using the provided URL and access token.
* @param url The WebSocket URL of the LiveKit server
* @param token A valid LiveKit access token for authentication
* @param opts Optional room configuration options
* @throws ConnectError if connection fails
* @param url - The WebSocket URL of the LiveKit server
* @param token - A valid LiveKit access token for authentication
* @param opts - Optional room configuration options
* @throws ConnectError - if connection fails
*/
async connect(url: string, token: string, opts?: RoomOptions) {
const options = { ...defaultRoomOptions, ...opts };
Expand Down Expand Up @@ -268,9 +268,9 @@ export class Room extends (EventEmitter as new () => TypedEmitter<RoomCallbacks>
/**
* Registers a handler for incoming text data streams on a specific topic.
* Text streams are used for receiving structured text data from other participants.
* @param topic The topic to listen for text streams on
* @param callback Function to handle incoming text stream data
* @throws Error if a handler for this topic is already registered
* @param topic - The topic to listen for text streams on
* @param callback - Function to handle incoming text stream data
* @throws Error - if a handler for this topic is already registered
*/
registerTextStreamHandler(topic: string, callback: TextStreamHandler) {
if (this.textStreamHandlers.has(topic)) {
Expand All @@ -286,9 +286,9 @@ export class Room extends (EventEmitter as new () => TypedEmitter<RoomCallbacks>
/**
* Registers a handler for incoming byte data streams on a specific topic.
* Byte streams are used for receiving binary data like files from other participants.
* @param topic The topic to listen for byte streams on
* @param callback Function to handle incoming byte stream data
* @throws Error if a handler for this topic is already registered
* @param topic - The topic to listen for byte streams on
* @param callback - Function to handle incoming byte stream data
* @throws Error - if a handler for this topic is already registered
*/
registerByteStreamHandler(topic: string, callback: ByteStreamHandler) {
if (this.byteStreamHandlers.has(topic)) {
Expand Down Expand Up @@ -415,8 +415,8 @@ export class Room extends (EventEmitter as new () => TypedEmitter<RoomCallbacks>
}

this.emit(RoomEvent.TrackSubscribed, publication.track!, publication, participant);
} catch (e: any) {
console.warn(`RoomEvent.TrackSubscribed: ${e.message}`);
} catch (e: unknown) {
console.warn(`RoomEvent.TrackSubscribed: ${(e as Error).message}`);
}
} else if (ev.case == 'trackUnsubscribed') {
try {
Expand All @@ -428,8 +428,8 @@ export class Room extends (EventEmitter as new () => TypedEmitter<RoomCallbacks>
publication.track = undefined;
publication.subscribed = false;
this.emit(RoomEvent.TrackUnsubscribed, track, publication, participant);
} catch (e: any) {
console.warn(`RoomEvent.TrackUnsubscribed: ${e.message}`);
} catch (e: unknown) {
console.warn(`RoomEvent.TrackUnsubscribed: ${(e as Error).message}`);
}
} else if (ev.case == 'trackSubscriptionFailed') {
try {
Expand All @@ -440,8 +440,8 @@ export class Room extends (EventEmitter as new () => TypedEmitter<RoomCallbacks>
participant,
ev.value.error,
);
} catch (e: any) {
console.warn(`RoomEvent.TrackSubscriptionFailed: ${e.message}`);
} catch (e: unknown) {
console.warn(`RoomEvent.TrackSubscriptionFailed: ${(e as Error).message}`);
}
} else if (ev.case == 'trackMuted') {
try {
Expand All @@ -454,8 +454,8 @@ export class Room extends (EventEmitter as new () => TypedEmitter<RoomCallbacks>
publication.track.info!.muted = true;
}
this.emit(RoomEvent.TrackMuted, publication, participant);
} catch (e: any) {
console.warn(`RoomEvent.TrackMuted: ${e.message}`);
} catch (e: unknown) {
console.warn(`RoomEvent.TrackMuted: ${(e as Error).message}`);
}
} else if (ev.case == 'trackUnmuted') {
try {
Expand All @@ -468,17 +468,17 @@ export class Room extends (EventEmitter as new () => TypedEmitter<RoomCallbacks>
publication.track.info!.muted = false;
}
this.emit(RoomEvent.TrackUnmuted, publication, participant);
} catch (e: any) {
console.warn(`RoomEvent.TrackUnmuted: ${e.message}`);
} catch (e: unknown) {
console.warn(`RoomEvent.TrackUnmuted: ${(e as Error).message}`);
}
} else if (ev.case == 'activeSpeakersChanged') {
try {
const activeSpeakers = ev.value.participantIdentities.map((identity) =>
this.requireParticipantByIdentity(identity),
);
this.emit(RoomEvent.ActiveSpeakersChanged, activeSpeakers);
} catch (e: any) {
console.warn(`RoomEvent.ActiveSpeakersChanged: ${e.message}`);
} catch (e: unknown) {
console.warn(`RoomEvent.ActiveSpeakersChanged: ${(e as Error).message}`);
}
} else if (ev.case == 'roomMetadataChanged') {
this.info.metadata = ev.value.metadata ?? '';
Expand All @@ -488,16 +488,16 @@ export class Room extends (EventEmitter as new () => TypedEmitter<RoomCallbacks>
const participant = this.requireParticipantByIdentity(ev.value.participantIdentity!);
participant.info.metadata = ev.value.metadata;
this.emit(RoomEvent.ParticipantMetadataChanged, participant.metadata, participant);
} catch (e: any) {
console.warn(`RoomEvent.ParticipantMetadataChanged: ${e.message}`);
} catch (e: unknown) {
console.warn(`RoomEvent.ParticipantMetadataChanged: ${(e as Error).message}`);
}
} else if (ev.case == 'participantNameChanged') {
try {
const participant = this.requireParticipantByIdentity(ev.value.participantIdentity!);
participant.info.name = ev.value.name;
this.emit(RoomEvent.ParticipantNameChanged, participant.name!, participant);
} catch (e: any) {
console.warn(`RoomEvent.ParticipantNameChanged: ${e.message}`);
} catch (e: unknown) {
console.warn(`RoomEvent.ParticipantNameChanged: ${(e as Error).message}`);
}
} else if (ev.case == 'participantAttributesChanged') {
try {
Expand All @@ -519,15 +519,15 @@ export class Room extends (EventEmitter as new () => TypedEmitter<RoomCallbacks>
);
this.emit(RoomEvent.ParticipantAttributesChanged, changedAttributes, participant);
}
} catch (e: any) {
console.warn(`RoomEvent.ParticipantAttributesChanged: ${e.message}`);
} catch (e: unknown) {
console.warn(`RoomEvent.ParticipantAttributesChanged: ${(e as Error).message}`);
}
} else if (ev.case == 'connectionQualityChanged') {
try {
const participant = this.requireParticipantByIdentity(ev.value.participantIdentity!);
this.emit(RoomEvent.ConnectionQualityChanged, ev.value.quality!, participant);
} catch (e: any) {
console.warn(`RoomEvent.ConnectionQualityChanged: ${e.message}`);
} catch (e: unknown) {
console.warn(`RoomEvent.ConnectionQualityChanged: ${(e as Error).message}`);
}
} else if (ev.case == 'chatMessage') {
const participant = this.retrieveParticipantByIdentity(ev.value.participantIdentity!);
Expand Down Expand Up @@ -611,8 +611,8 @@ export class Room extends (EventEmitter as new () => TypedEmitter<RoomCallbacks>
!!ev.value.isEncrypted,
participant,
);
} catch (e: any) {
console.warn(`RoomEvent.ParticipantEncryptionStatusChanged: ${e.message}`);
} catch (e: unknown) {
console.warn(`RoomEvent.ParticipantEncryptionStatusChanged: ${(e as Error).message}`);
}
}
};
Expand Down
Loading