diff --git a/src/google/adk/sessions/in_memory_session_service.py b/src/google/adk/sessions/in_memory_session_service.py index 934e712036..eda7826a67 100644 --- a/src/google/adk/sessions/in_memory_session_service.py +++ b/src/google/adk/sessions/in_memory_session_service.py @@ -336,12 +336,20 @@ def _warning(message: str) -> None: _warning(f'session_id {session_id} not in sessions[app_name][user_id]') return event + # Fetch the canonical storage session early so we can check for duplicate + # event IDs before modifying any state. The same event can be delivered + # more than once when the orchestrator broadcasts a shared-state delta to + # several concurrent session references; deduplicating here prevents + # double-application of state updates and duplicate entries in event lists. + storage_session = self.sessions[app_name][user_id].get(session_id) + if any(e.id == event.id for e in storage_session.events): + return event + # Update the in-memory session. await super().append_event(session=session, event=event) session.last_update_time = event.timestamp - # Update the storage session - storage_session = self.sessions[app_name][user_id].get(session_id) + # Update the storage session if the caller holds a stale copy. if storage_session is not session: storage_session.events.append(event) storage_session.last_update_time = event.timestamp diff --git a/tests/unittests/sessions/test_session_service.py b/tests/unittests/sessions/test_session_service.py index 02f5159a45..eddb5c884c 100644 --- a/tests/unittests/sessions/test_session_service.py +++ b/tests/unittests/sessions/test_session_service.py @@ -1650,3 +1650,75 @@ async def tracking_fn(**kwargs): finally: database_session_service._select_required_state = original_fn await service.close() + + +# --------------------------------------------------------------------------- +# Regression tests for duplicate-event deduplication (issue #5723) +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + 'session_service', + [InMemorySessionService()], + ids=['in_memory'], +) +async def test_append_event_is_idempotent_for_same_event_id(session_service): + """Appending the same event ID twice must not duplicate entries or state.""" + app_name = 'test_app' + user_id = 'user_dup' + session = await session_service.create_session( + app_name=app_name, user_id=user_id, session_id='session_dup' + ) + + event = Event( + invocation_id='inv_dup', + author='user', + actions=EventActions(state_delta={'session:counter': 1}), + ) + + # Append the same event object twice (simulates a duplicate broadcast). + await session_service.append_event(session=session, event=event) + await session_service.append_event(session=session, event=event) + + # The storage session must contain the event exactly once. + retrieved = await session_service.get_session( + app_name=app_name, user_id=user_id, session_id='session_dup' + ) + matching = [e for e in retrieved.events if e.id == event.id] + assert len(matching) == 1, ( + f'Expected 1 occurrence of event {event.id!r}, got {len(matching)}' + ) + + # State must not be double-applied. + assert retrieved.state.get('session:counter') == 1, ( + 'State was applied more than once — duplicate event caused double-apply' + ) + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + 'session_service', + [InMemorySessionService()], + ids=['in_memory'], +) +async def test_append_different_events_not_deduplicated(session_service): + """Events with distinct IDs must both be stored.""" + app_name = 'test_app' + user_id = 'user_multi' + session = await session_service.create_session( + app_name=app_name, user_id=user_id, session_id='session_multi' + ) + + e1 = Event(invocation_id='inv_a', author='user') + e2 = Event(invocation_id='inv_b', author='agent') + + await session_service.append_event(session=session, event=e1) + await session_service.append_event(session=session, event=e2) + + retrieved = await session_service.get_session( + app_name=app_name, user_id=user_id, session_id='session_multi' + ) + assert len(retrieved.events) == 2, ( + f'Expected 2 distinct events, got {len(retrieved.events)}' + )