Skip to content

AudioStream: orphaned _run task raises AttributeError: '_ffi_handle' when owned-stream creation fails in __init__ #729

Description

@AlexChalakov

Summary

AudioStream.__init__ starts the background _run task before the fallible stream-handle creation that assigns self._ffi_handle. If the stream-handle creation raises (e.g. the track was already closed because the participant disconnected mid-setup), __init__ aborts and self._ffi_handle is never assigned. The already-running _run task then dereferences self._ffi_handle and raises AttributeError: 'AudioStream' object has no attribute '_ffi_handle'.

Because this happens inside an orphaned task, it surfaces only via the task-done callback on the livekit logger and is structurally uncatchable by application code — the caller sees the real exception from __init__, while the AttributeError leaks as an unretrieved task exception.

Environment

  • livekit 1.1.11 (also present on main as of filing)
  • Component: livekit-rtc/livekit/rtc/audio_stream.py

The offending code (current main)

AudioStream.__init__ creates the task before _ffi_handle / _info are assigned:

self._task = self._loop.create_task(self._run())
self._task.add_done_callback(task_done_logger)

stream: Any = None
if "participant" in kwargs:
    stream = self._create_owned_stream_from_participant(
        participant=kwargs["participant"], track_source=kwargs["track_source"]
    )
else:
    stream = self._create_owned_stream()
self._ffi_handle = FfiHandle(stream.handle.id)
self._info = stream.info

_run immediately awaits an event filtered through _is_event, which reads self._ffi_handle:

async def _run(self) -> None:
    while True:
        event = await self._ffi_queue.wait_for(self._is_event)
        ...

def _is_event(self, e: proto_ffi.FfiEvent) -> bool:
    return e.audio_stream_event.stream_handle == self._ffi_handle.handle

Failure scenario (track gone mid-setup)

  1. AudioStream(track=...) is constructed.
  2. self._task = self._loop.create_task(self._run()) schedules _run.
  3. _create_owned_stream() (or _create_owned_stream_from_participant()) raises — for example because the underlying track was already closed after the remote participant disconnected during subscription setup. (_create_owned_stream opens with assert self._track is not None and issues an FFI request, both of which can fail on a torn-down track.)
  4. __init__ propagates that exception; self._ffi_handle is never set.
  5. The already-scheduled _run task gets its turn on the loop, evaluates the _is_event predicate, and hits self._ffi_handleAttributeError.

Observed error

Traceback (most recent call last):
  File ".../livekit/rtc/audio_stream.py", line ..., in _run
    event = await self._ffi_queue.wait_for(self._is_event)
  File ".../livekit/rtc/audio_stream.py", line ..., in _is_event
    return e.audio_stream_event.stream_handle == self._ffi_handle.handle
AttributeError: 'AudioStream' object has no attribute '_ffi_handle'

This is an unretrieved task exception, reported only through task_done_logger (the done-callback added right after task creation) on the livekit logger. Application code that wraps the AudioStream(...) constructor in try/except can catch the real error from _create_owned_stream, but it has no handle to the orphaned _run task and therefore cannot catch or suppress the AttributeError — it always leaks to the logger.

Minimal repro sketch

import asyncio
from unittest.mock import patch
from livekit import rtc

async def main():
    track = make_fake_audio_track()  # valid enough to enter __init__

    # Force the fallible owned-stream creation to raise, simulating a track
    # torn down mid-setup.
    with patch.object(
        rtc.AudioStream, "_create_owned_stream",
        side_effect=RuntimeError("track already closed"),
    ):
        try:
            rtc.AudioStream(track)
        except RuntimeError:
            pass  # expected: the real error

    # Let the orphaned _run task get scheduled.
    await asyncio.sleep(0)
    await asyncio.sleep(0)
    # OBSERVED: livekit logger emits
    #   AttributeError: 'AudioStream' object has no attribute '_ffi_handle'
    # from the unretrieved _run task, with no way for the caller above to catch it.

asyncio.run(main())

Proposed fix

Preferred — create the task after the handle is assigned. __init__ runs synchronously from create_task(...) through the _ffi_handle / _info assignments; there is no await in between, so _run cannot actually execute until __init__ yields back to the event loop. There is therefore no ordering requirement for the task to start before stream creation — _run only needs self._ffi_queue (subscribed at the top of __init__) and self._ffi_handle to exist when it first runs. Creating the task last is sufficient and correct: if stream creation raises, the task is never created, there is no orphan, and the only exception the caller sees is the meaningful one from _create_owned_stream.

-        self._task = self._loop.create_task(self._run())
-        self._task.add_done_callback(task_done_logger)
-
         stream: Any = None
         if "participant" in kwargs:
             stream = self._create_owned_stream_from_participant(
                 participant=kwargs["participant"], track_source=kwargs["track_source"]
             )
         else:
             stream = self._create_owned_stream()
         self._ffi_handle = FfiHandle(stream.handle.id)
         self._info = stream.info
+
+        self._task = self._loop.create_task(self._run())
+        self._task.add_done_callback(task_done_logger)

Alternative — defensive guard. Initialize self._ffi_handle: FfiHandle | None = None before creating the task and make _is_event return False when it's None. This prevents the AttributeError but is strictly inferior: on the failure path it leaves a live _run task spinning against _ffi_queue with a predicate that can never match (a leaked task/subscription), and it forces _ffi_handle to become Optional, propagating None-checks elsewhere.

The reorder is the smaller, clearer change and removes the orphaned-task class of bug entirely rather than masking a symptom.

Regression test sketch

import asyncio, logging, pytest
from unittest.mock import patch
from livekit import rtc

@pytest.mark.asyncio
async def test_audiostream_failed_stream_creation_does_not_orphan_run_task(caplog):
    track = make_fake_audio_track()
    with patch.object(rtc.AudioStream, "_create_owned_stream",
                      side_effect=RuntimeError("track already closed")):
        with caplog.at_level(logging.ERROR, logger="livekit"):
            with pytest.raises(RuntimeError, match="track already closed"):
                rtc.AudioStream(track)
            await asyncio.sleep(0)
            await asyncio.sleep(0)
    # The only error the caller should see is the real RuntimeError above.
    assert "_ffi_handle" not in caplog.text
    assert "AttributeError" not in caplog.text

Before the fix this fails (the orphaned task logs the AttributeError); after the reorder the task is never created.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions