-
Notifications
You must be signed in to change notification settings - Fork 437
Description
Disclaimer
While a real problem exists affecting my production usage of asyncpg, the proposed cause and solution was found by an LLM, specifically Claude.
Problem
I'm using asyncpg 0.29.0 with spilo for a HA Postgres 14 cluster, where-in the application only ever talks to the Postgres leader. When a fail-over happens, often times I've noticed a problem where my pending asyncio tasks that are dependent on Postgres queries end up hanging indefinitely. This issue does not remediate itself without a restart of the application.
Below is an example stack trace that I observe before the tasks hang:
2025-02-10T18:37:13.272796680Z [ERROR] reconciliation:
2025-02-10T18:37:13.272812281Z Traceback (most recent call last):
2025-02-10T18:37:13.272815361Z File "/app/reconciliation.py", line XXXX, in reconcile_data
2025-02-10T18:37:13.272817871Z new_data = await self.load_new_data(conn)
2025-02-10T18:37:13.272820601Z ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2025-02-10T18:37:13.272822641Z File "/app/reconciliation.py", line XXXX, in load_new_data
2025-02-10T18:37:13.272824721Z async for row in conn.cursor(
2025-02-10T18:37:13.272826701Z File ".venv/lib/python3.11/site-packages/asyncpg/cursor.py", line 238, in __anext__
2025-02-10T18:37:13.272828831Z buffer = await self._exec(self._prefetch, self._timeout)
2025-02-10T18:37:13.272830771Z ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2025-02-10T18:37:13.272832691Z File ".venv/lib/python3.11/site-packages/asyncpg/cursor.py", line 157, in _exec
2025-02-10T18:37:13.272834711Z buffer, _, self._exhausted = await protocol.execute(
2025-02-10T18:37:13.272854611Z ^^^^^^^^^^^^^^^^^^^^^^^
2025-02-10T18:37:13.272860981Z File "asyncpg/protocol/protocol.pyx", line 326, in execute
2025-02-10T18:37:13.272863051Z File "asyncpg/protocol/protocol.pyx", line 314, in asyncpg.protocol.protocol.BaseProtocol.execute
2025-02-10T18:37:13.272865031Z File "asyncpg/protocol/coreproto.pyx", line 1043, in asyncpg.protocol.protocol.CoreProtocol._execute
2025-02-10T18:37:13.272867081Z File "asyncpg/protocol/coreproto.pyx", line 789, in asyncpg.protocol.protocol.CoreProtocol._set_state
2025-02-10T18:37:13.272869041Z asyncpg.exceptions._base.InternalClientError: cannot switch to state 16; another operation (2) is in progress
2025-02-10T18:37:13.272871071Z
2025-02-10T18:37:13.272872851Z During handling of the above exception, another exception occurred:
2025-02-10T18:37:13.272874931Z
2025-02-10T18:37:13.272876861Z Traceback (most recent call last):
2025-02-10T18:37:13.272878841Z File "/app/reconciliation.py", line XXXX, in start
2025-02-10T18:37:13.272880801Z await self.reconcile(http_session)
2025-02-10T18:37:13.272882751Z File "/app/reconciliation.py", line XXXX, in reconcile
2025-02-10T18:37:13.272887211Z await asyncio.gather(*tasks)
2025-02-10T18:37:13.272889121Z File "/app/reconciliation.py", line XXXX, in reconcile_items
2025-02-10T18:37:13.272891091Z await asyncio.gather(*tasks)
2025-02-10T18:37:13.272893011Z File "/app/reconciliation.py", line XXXX, in reconcile_data
2025-02-10T18:37:13.272894941Z async with conn.transaction():
2025-02-10T18:37:13.272897311Z File ".venv/lib/python3.11/site-packages/asyncpg/transaction.py", line 72, in __aexit__
2025-02-10T18:37:13.272899321Z self._check_conn_validity('__aexit__')
2025-02-10T18:37:13.272901181Z File ".venv/lib/python3.11/site-packages/asyncpg/connresource.py", line 41, in _check_conn_validity
2025-02-10T18:37:13.272903171Z raise exceptions.InterfaceError(
2025-02-10T18:37:13.272905051Z asyncpg.exceptions._base.InterfaceError: cannot call Transaction.__aexit__(): the underlying connection is closed
Findings
I've channelled the use of an LLM to reason about the problem for me, to which it has pointed out a potential problem with PoolConnectionHolder.release
When asyncpg raises InternalClientError inside execute() (e.g. because the server sent a FATAL error mid-cursor during a failover, leaving the
protocol in PROTOCOL_ERROR_CONSUME while the client tries to set state PROTOCOL_EXECUTE), the error handler calls _coreproto_error() → abort().
abort() sets protocol.closing = True and calls transport.abort().When connection_lost() subsequently fires, _on_connection_lost() sees closing=True and takes the fast path — it only handles the pending waiter.
The else branch, which calls con._cleanup(), is skipped entirely.con._cleanup() is the only call site for holder._release_on_close(), which is the only way a PoolConnectionHolder is returned to Pool._queue.
With the holder permanently absent from the queue, every subsequent pool.acquire() blocks forever on self._queue.get().The fix is one line in PoolConnectionHolder.release() (pool.py):
if self._con.is_closed():
self._release() # ensure holder is always returned to the queue
returnThe existing code returned early here on the assumption that _release_on_close() would be called via con._cleanup() — an assumption that breaks
specifically when abort() is the cause of closure.
Proposed solution
I've also succeeded in using the LLM to generate a reproducer, which demonstrates the problem in addition to monkey-patching the release method and verifying the correct behavior afterwards. The reproducer is dependent on podman for standing up a temporary Postgres container.
import asyncio, socket, subprocess, sys
import asyncpg
from asyncpg.pool import PoolConnectionHolder
PG_IMAGE = "docker.io/library/postgres:14-alpine"
async def wait_for_postgres(port: int):
for _ in range(30):
try:
c = await asyncpg.connect(host="127.0.0.1", port=port,
user="x", password="x", database="x", timeout=1)
await c.close();
break
except Exception:
await asyncio.sleep(1)
else:
raise RuntimeError("Postgres did not become ready within 60s")
def make_pool(port: int):
# max_size=1: one stuck holder immediately exhausts the pool.
return asyncpg.create_pool(host="127.0.0.1", port=port,
user="x", password="x", database="x",
min_size=0, max_size=1)
async def try_acquire(pool):
try:
async with asyncio.timeout(2):
async with pool.acquire() as _:
return True
except asyncio.TimeoutError:
return False
async def main() -> None:
port: int
with socket.socket() as s:
s.bind(("127.0.0.1", 0))
port = s.getsockname()[1]
cid = subprocess.check_output(
["podman", "run", "--rm", "-d", f"-p{port}:5432",
"-ePOSTGRES_USER=x", "-ePOSTGRES_PASSWORD=x", "-ePOSTGRES_DB=x",
PG_IMAGE], text=True,
).strip()
print(f"Container {cid[:12]}, port {port}. Waiting for Postgres...")
try:
await wait_for_postgres(port)
async def trigger(pool):
async with pool.acquire() as conn:
conn._con._protocol.abort() # exactly what _coreproto_error() calls
await asyncio.sleep(0) # let connection_lost() fire
# ── Without fix ──────────────────────────────────────────────────────
pool = await make_pool(port)
print("\n=== Without fix ===")
await trigger(pool)
h = pool._holders[0]
print(f" _in_use set: {h._in_use is not None} | in queue: {pool._queue.qsize() > 0}")
print(f" acquire succeeded: {await try_acquire(pool)}")
pool.terminate()
# ── With fix ─────────────────────────────────────────────────────────
orig = PoolConnectionHolder.release
async def fixed_release(self, timeout):
if self._con.is_closed():
self._release();
return
return await orig(self, timeout)
PoolConnectionHolder.release = fixed_release
pool = await make_pool(port)
print("\n=== With fix ===")
await trigger(pool)
h = pool._holders[0]
print(f" _in_use set: {h._in_use is not None} | in queue: {pool._queue.qsize() > 0}")
print(f" acquire succeeded: {await try_acquire(pool)}")
pool.terminate()
PoolConnectionHolder.release = orig
finally:
subprocess.run(["podman", "stop", cid], capture_output=True)
print("\nContainer stopped.")
asyncio.run(main())
This reproducer has been tested against both PostgreSQL 14 and PostgreSQL 16 docker images. In the above example, acquire blocks indefinitely without the updated release implementation