Summary
AudioStream.__init__ starts the background _run task before the fallible stream-handle creation that assigns self._ffi_handle. If the stream-handle creation raises (e.g. the track was already closed because the participant disconnected mid-setup), __init__ aborts and self._ffi_handle is never assigned. The already-running _run task then dereferences self._ffi_handle and raises AttributeError: 'AudioStream' object has no attribute '_ffi_handle'.
Because this happens inside an orphaned task, it surfaces only via the task-done callback on the livekit logger and is structurally uncatchable by application code — the caller sees the real exception from __init__, while the AttributeError leaks as an unretrieved task exception.
Environment
livekit 1.1.11 (also present on main as of filing)
- Component:
livekit-rtc/livekit/rtc/audio_stream.py
The offending code (current main)
AudioStream.__init__ creates the task before _ffi_handle / _info are assigned:
self._task = self._loop.create_task(self._run())
self._task.add_done_callback(task_done_logger)
stream: Any = None
if "participant" in kwargs:
stream = self._create_owned_stream_from_participant(
participant=kwargs["participant"], track_source=kwargs["track_source"]
)
else:
stream = self._create_owned_stream()
self._ffi_handle = FfiHandle(stream.handle.id)
self._info = stream.info
_run immediately awaits an event filtered through _is_event, which reads self._ffi_handle:
async def _run(self) -> None:
while True:
event = await self._ffi_queue.wait_for(self._is_event)
...
def _is_event(self, e: proto_ffi.FfiEvent) -> bool:
return e.audio_stream_event.stream_handle == self._ffi_handle.handle
Failure scenario (track gone mid-setup)
AudioStream(track=...) is constructed.
self._task = self._loop.create_task(self._run()) schedules _run.
_create_owned_stream() (or _create_owned_stream_from_participant()) raises — for example because the underlying track was already closed after the remote participant disconnected during subscription setup. (_create_owned_stream opens with assert self._track is not None and issues an FFI request, both of which can fail on a torn-down track.)
__init__ propagates that exception; self._ffi_handle is never set.
- The already-scheduled
_run task gets its turn on the loop, evaluates the _is_event predicate, and hits self._ffi_handle → AttributeError.
Observed error
Traceback (most recent call last):
File ".../livekit/rtc/audio_stream.py", line ..., in _run
event = await self._ffi_queue.wait_for(self._is_event)
File ".../livekit/rtc/audio_stream.py", line ..., in _is_event
return e.audio_stream_event.stream_handle == self._ffi_handle.handle
AttributeError: 'AudioStream' object has no attribute '_ffi_handle'
This is an unretrieved task exception, reported only through task_done_logger (the done-callback added right after task creation) on the livekit logger. Application code that wraps the AudioStream(...) constructor in try/except can catch the real error from _create_owned_stream, but it has no handle to the orphaned _run task and therefore cannot catch or suppress the AttributeError — it always leaks to the logger.
Minimal repro sketch
import asyncio
from unittest.mock import patch
from livekit import rtc
async def main():
track = make_fake_audio_track() # valid enough to enter __init__
# Force the fallible owned-stream creation to raise, simulating a track
# torn down mid-setup.
with patch.object(
rtc.AudioStream, "_create_owned_stream",
side_effect=RuntimeError("track already closed"),
):
try:
rtc.AudioStream(track)
except RuntimeError:
pass # expected: the real error
# Let the orphaned _run task get scheduled.
await asyncio.sleep(0)
await asyncio.sleep(0)
# OBSERVED: livekit logger emits
# AttributeError: 'AudioStream' object has no attribute '_ffi_handle'
# from the unretrieved _run task, with no way for the caller above to catch it.
asyncio.run(main())
Proposed fix
Preferred — create the task after the handle is assigned. __init__ runs synchronously from create_task(...) through the _ffi_handle / _info assignments; there is no await in between, so _run cannot actually execute until __init__ yields back to the event loop. There is therefore no ordering requirement for the task to start before stream creation — _run only needs self._ffi_queue (subscribed at the top of __init__) and self._ffi_handle to exist when it first runs. Creating the task last is sufficient and correct: if stream creation raises, the task is never created, there is no orphan, and the only exception the caller sees is the meaningful one from _create_owned_stream.
- self._task = self._loop.create_task(self._run())
- self._task.add_done_callback(task_done_logger)
-
stream: Any = None
if "participant" in kwargs:
stream = self._create_owned_stream_from_participant(
participant=kwargs["participant"], track_source=kwargs["track_source"]
)
else:
stream = self._create_owned_stream()
self._ffi_handle = FfiHandle(stream.handle.id)
self._info = stream.info
+
+ self._task = self._loop.create_task(self._run())
+ self._task.add_done_callback(task_done_logger)
Alternative — defensive guard. Initialize self._ffi_handle: FfiHandle | None = None before creating the task and make _is_event return False when it's None. This prevents the AttributeError but is strictly inferior: on the failure path it leaves a live _run task spinning against _ffi_queue with a predicate that can never match (a leaked task/subscription), and it forces _ffi_handle to become Optional, propagating None-checks elsewhere.
The reorder is the smaller, clearer change and removes the orphaned-task class of bug entirely rather than masking a symptom.
Regression test sketch
import asyncio, logging, pytest
from unittest.mock import patch
from livekit import rtc
@pytest.mark.asyncio
async def test_audiostream_failed_stream_creation_does_not_orphan_run_task(caplog):
track = make_fake_audio_track()
with patch.object(rtc.AudioStream, "_create_owned_stream",
side_effect=RuntimeError("track already closed")):
with caplog.at_level(logging.ERROR, logger="livekit"):
with pytest.raises(RuntimeError, match="track already closed"):
rtc.AudioStream(track)
await asyncio.sleep(0)
await asyncio.sleep(0)
# The only error the caller should see is the real RuntimeError above.
assert "_ffi_handle" not in caplog.text
assert "AttributeError" not in caplog.text
Before the fix this fails (the orphaned task logs the AttributeError); after the reorder the task is never created.
Summary
AudioStream.__init__starts the background_runtask before the fallible stream-handle creation that assignsself._ffi_handle. If the stream-handle creation raises (e.g. the track was already closed because the participant disconnected mid-setup),__init__aborts andself._ffi_handleis never assigned. The already-running_runtask then dereferencesself._ffi_handleand raisesAttributeError: 'AudioStream' object has no attribute '_ffi_handle'.Because this happens inside an orphaned task, it surfaces only via the task-done callback on the
livekitlogger and is structurally uncatchable by application code — the caller sees the real exception from__init__, while theAttributeErrorleaks as an unretrieved task exception.Environment
livekit1.1.11 (also present onmainas of filing)livekit-rtc/livekit/rtc/audio_stream.pyThe offending code (current
main)AudioStream.__init__creates the task before_ffi_handle/_infoare assigned:_runimmediately awaits an event filtered through_is_event, which readsself._ffi_handle:Failure scenario (track gone mid-setup)
AudioStream(track=...)is constructed.self._task = self._loop.create_task(self._run())schedules_run._create_owned_stream()(or_create_owned_stream_from_participant()) raises — for example because the underlying track was already closed after the remote participant disconnected during subscription setup. (_create_owned_streamopens withassert self._track is not Noneand issues an FFI request, both of which can fail on a torn-down track.)__init__propagates that exception;self._ffi_handleis never set._runtask gets its turn on the loop, evaluates the_is_eventpredicate, and hitsself._ffi_handle→AttributeError.Observed error
This is an unretrieved task exception, reported only through
task_done_logger(the done-callback added right after task creation) on thelivekitlogger. Application code that wraps theAudioStream(...)constructor intry/exceptcan catch the real error from_create_owned_stream, but it has no handle to the orphaned_runtask and therefore cannot catch or suppress theAttributeError— it always leaks to the logger.Minimal repro sketch
Proposed fix
Preferred — create the task after the handle is assigned.
__init__runs synchronously fromcreate_task(...)through the_ffi_handle/_infoassignments; there is noawaitin between, so_runcannot actually execute until__init__yields back to the event loop. There is therefore no ordering requirement for the task to start before stream creation —_runonly needsself._ffi_queue(subscribed at the top of__init__) andself._ffi_handleto exist when it first runs. Creating the task last is sufficient and correct: if stream creation raises, the task is never created, there is no orphan, and the only exception the caller sees is the meaningful one from_create_owned_stream.Alternative — defensive guard. Initialize
self._ffi_handle: FfiHandle | None = Nonebefore creating the task and make_is_eventreturnFalsewhen it'sNone. This prevents theAttributeErrorbut is strictly inferior: on the failure path it leaves a live_runtask spinning against_ffi_queuewith a predicate that can never match (a leaked task/subscription), and it forces_ffi_handleto becomeOptional, propagatingNone-checks elsewhere.The reorder is the smaller, clearer change and removes the orphaned-task class of bug entirely rather than masking a symptom.
Regression test sketch
Before the fix this fails (the orphaned task logs the
AttributeError); after the reorder the task is never created.