diff --git a/sentry_sdk/consts.py b/sentry_sdk/consts.py index 59bf2055b2..f078f334e3 100644 --- a/sentry_sdk/consts.py +++ b/sentry_sdk/consts.py @@ -1180,6 +1180,8 @@ class OP: COHERE_CHAT_COMPLETIONS_CREATE = "ai.chat_completions.create.cohere" COHERE_EMBEDDINGS_CREATE = "ai.embeddings.create.cohere" DB = "db" + DB_CURSOR_ITERATOR = "db.cursor.iter" + DB_CURSOR_FETCH = "db.cursor.fetch" DB_REDIS = "db.redis" EVENT_DJANGO = "event.django" FUNCTION = "function" diff --git a/sentry_sdk/integrations/asyncpg.py b/sentry_sdk/integrations/asyncpg.py index a48ec7dbeb..186176d268 100644 --- a/sentry_sdk/integrations/asyncpg.py +++ b/sentry_sdk/integrations/asyncpg.py @@ -20,8 +20,12 @@ ) try: - import asyncpg # type: ignore[import-not-found] - from asyncpg.cursor import BaseCursor # type: ignore + import asyncpg # type: ignore + from asyncpg.cursor import ( # type: ignore + BaseCursor, + Cursor, + CursorIterator, + ) except ImportError: raise DidNotEnable("asyncpg not installed.") @@ -169,6 +173,13 @@ async def _inner(*args: "Any", **kwargs: "Any") -> "T": return await f(*args, **kwargs) cursor = args[0] + if type(cursor) is CursorIterator: + span_op_override_value = OP.DB_CURSOR_ITERATOR + elif type(cursor) is Cursor: + span_op_override_value = OP.DB_CURSOR_FETCH + else: + span_op_override_value = None + query = _normalize_query(cursor._query) with record_sql_queries( cursor=cursor, @@ -178,6 +189,7 @@ async def _inner(*args: "Any", **kwargs: "Any") -> "T": executemany=False, record_cursor_repr=True, span_origin=AsyncPGIntegration.origin, + span_op_override_value=span_op_override_value, ) as span: _set_db_data(span, cursor._connection) res = await f(*args, **kwargs) diff --git a/sentry_sdk/tracing_utils.py b/sentry_sdk/tracing_utils.py index 3b44bad0bb..411f9923ad 100644 --- a/sentry_sdk/tracing_utils.py +++ b/sentry_sdk/tracing_utils.py @@ -134,6 +134,7 @@ def record_sql_queries( executemany: bool, record_cursor_repr: bool = False, span_origin: str = "manual", + span_op_override_value: "Optional[str]" = None, ) -> "Generator[Union[sentry_sdk.tracing.Span, sentry_sdk.traces.StreamedSpan], None, None]": # TODO: Bring back capturing of params by default client = sentry_sdk.get_client() @@ -167,13 +168,15 @@ def record_sql_queries( name="" if query is None else query, attributes={ "sentry.origin": span_origin, - "sentry.op": OP.DB, + "sentry.op": span_op_override_value + if span_op_override_value + else OP.DB, }, ) as span: yield span else: with sentry_sdk.start_span( - op=OP.DB, + op=span_op_override_value if span_op_override_value is not None else OP.DB, name=query, origin=span_origin, ) as span: diff --git a/tests/integrations/asyncpg/test_asyncpg.py b/tests/integrations/asyncpg/test_asyncpg.py index fee791c338..8d719f23b8 100644 --- a/tests/integrations/asyncpg/test_asyncpg.py +++ b/tests/integrations/asyncpg/test_asyncpg.py @@ -21,7 +21,7 @@ import sentry_sdk from sentry_sdk import capture_message, start_transaction -from sentry_sdk.consts import SPANDATA +from sentry_sdk.consts import OP, SPANDATA from sentry_sdk.integrations.asyncpg import AsyncPGIntegration from sentry_sdk.tracing_utils import record_sql_queries from tests.conftest import ApproxDict @@ -1361,18 +1361,24 @@ async def test_query_source_prepare( @pytest.mark.asyncio @pytest.mark.parametrize("span_streaming", [True, False]) -async def test_cursor__bind_exec_creates_spans( +async def test_cursor_iteration_creates_db_cursor_iter_spans( sentry_init, capture_events, capture_items, span_streaming ) -> None: """ - Exercises the bind_exec patch through the iterator that's created in asyncpg when "for record in conn.cursor" is called. - See https://github.com/MagicStack/asyncpg/blob/db8ecc2a38e16fb0c090aef6f5506547c2831c24/asyncpg/cursor.py#L234 + Regression test for https://github.com/getsentry/sentry-python/issues/6576 + + When iterating a server-side cursor with a small prefetch, asyncpg fetches + rows in batches. Each batch triggers BaseCursor._bind_exec (on first query) and + BaseCursor._exec (second query onwards) through CursorIterator.__anext__, which creates a + span with the same query description. The resulting burst of identical spans + causes Sentry's N+1 query detector to raise a false positive. + + To mitigate, we set the "op"/"sentry.op" to `db.cursor.iter` instead of `db` + so that the sentry backend can exclude these spans from n+1 detection. """ sentry_init( integrations=[AsyncPGIntegration()], traces_sample_rate=1.0, - enable_db_query_source=True, - db_query_source_threshold_ms=0, _experiments={ "trace_lifecycle": "stream" if span_streaming else "static", }, @@ -1380,56 +1386,32 @@ async def test_cursor__bind_exec_creates_spans( if span_streaming: items = capture_items("span") + with sentry_sdk.traces.start_span(name="test_segment"): conn: Connection = await connect(PG_CONNECTION_URI) await conn.executemany( "INSERT INTO users(name, password, dob) VALUES($1, $2, $3)", - [ - ("Bob", "secret_pw", datetime.date(1984, 3, 1)), - ("Alice", "pw", datetime.date(1990, 12, 25)), - ], + [(f"user-{i}", "pw", datetime.date(1990, 1, 1)) for i in range(20)], ) async with conn.transaction(): - async for record in conn.cursor( - "SELECT * FROM users WHERE dob > $1", - datetime.date(1970, 1, 1), - ): + async for _record in conn.cursor("SELECT * FROM users", prefetch=5): pass await conn.close() - sentry_sdk.flush() - spans = [item.payload for item in items] - - assert len(spans) == 6 - - connect_span = spans[0] - executemany_span = spans[1] - begin_span = spans[2] - bind_exec_span = spans[3] - commit_span = spans[4] - segment = spans[5] + sentry_sdk.flush() - assert connect_span["name"] == "connect" - assert ( - executemany_span["name"] - == "INSERT INTO users(name, password, dob) VALUES($1, $2, $3)" - ) - assert begin_span["name"] == "BEGIN;" - assert bind_exec_span["name"] == "SELECT * FROM users WHERE dob > $1" - assert commit_span["name"] == "COMMIT;" - assert segment["name"] == "test_segment" + cursor_iter_spans = [ + item.payload + for item in items + if item.payload.get("name") == "SELECT * FROM users" + ] - assert bind_exec_span["attributes"]["sentry.origin"] == "auto.db.asyncpg" - assert bind_exec_span["attributes"]["sentry.op"] == "db" - assert bind_exec_span["attributes"]["db.system.name"] == "postgresql" - assert bind_exec_span["attributes"]["db.driver.name"] == "asyncpg" - assert bind_exec_span["attributes"]["server.address"] == PG_HOST - assert bind_exec_span["attributes"]["server.port"] == PG_PORT - assert bind_exec_span["attributes"]["db.namespace"] == PG_NAME - assert bind_exec_span["attributes"]["db.user"] == PG_USER + assert len(cursor_iter_spans) == 5 + for span in cursor_iter_spans: + assert span["attributes"]["sentry.op"] == OP.DB_CURSOR_ITERATOR else: events = capture_events() @@ -1438,57 +1420,28 @@ async def test_cursor__bind_exec_creates_spans( await conn.executemany( "INSERT INTO users(name, password, dob) VALUES($1, $2, $3)", - [ - ("Bob", "secret_pw", datetime.date(1984, 3, 1)), - ("Alice", "pw", datetime.date(1990, 12, 25)), - ], + [(f"user-{i}", "pw", datetime.date(1990, 1, 1)) for i in range(20)], ) async with conn.transaction(): - async for record in conn.cursor( - "SELECT * FROM users WHERE dob > $1", - datetime.date(1970, 1, 1), - ): + async for _record in conn.cursor("SELECT * FROM users", prefetch=5): pass await conn.close() (event,) = events - assert len(event["spans"]) == 5 - - connect_span = event["spans"][0] - executemany_span = event["spans"][1] - begin_span = event["spans"][2] - bind_exec_span = event["spans"][3] - commit_span = event["spans"][4] + cursor_iter_spans = [ + s for s in event["spans"] if s.get("description") == "SELECT * FROM users" + ] - assert connect_span["description"] == "connect" - assert ( - executemany_span["description"] - == "INSERT INTO users(name, password, dob) VALUES($1, $2, $3)" - ) - assert begin_span["description"] == "BEGIN;" - assert bind_exec_span["description"] == "SELECT * FROM users WHERE dob > $1" - assert commit_span["description"] == "COMMIT;" - - assert bind_exec_span["origin"] == "auto.db.asyncpg" - assert bind_exec_span["data"]["db.system"] == "postgresql" - assert bind_exec_span["data"]["db.driver.name"] == "asyncpg" - assert bind_exec_span["data"]["server.address"] == PG_HOST - assert bind_exec_span["data"]["server.port"] == PG_PORT - assert bind_exec_span["data"]["db.name"] == PG_NAME - assert bind_exec_span["data"]["db.user"] == PG_USER - - _assert_query_source( - bind_exec_span, - span_streaming, - "test_cursor__bind_exec_creates_spans", - ) + assert len(cursor_iter_spans) == 5 + for span in cursor_iter_spans: + assert span["op"] == OP.DB_CURSOR_ITERATOR @pytest.mark.asyncio -async def test_cursor__exec_methods_create_spans(sentry_init, capture_events) -> None: +async def test_cursor_fetch_methods_create_spans(sentry_init, capture_events) -> None: sentry_init( integrations=[AsyncPGIntegration()], traces_sample_rate=1.0, @@ -1543,9 +1496,10 @@ async def test_cursor__exec_methods_create_spans(sentry_init, capture_events) -> assert span["data"]["db.cursor"] is not None assert span["data"]["db.system"] == "postgresql" assert span["data"]["db.driver.name"] == "asyncpg" + assert span["op"] == OP.DB_CURSOR_FETCH assert span["origin"] == "auto.db.asyncpg" _assert_query_source( span, False, - "test_cursor__exec_methods_create_spans", + "test_cursor_fetch_methods_create_spans", )