Skip to content

Add MessageBufferConfig to allow custom back-pressure config for message.new events#6472

Open
VelikovPetar wants to merge 2 commits into
developfrom
port/v6-to-develop/message-buffer-config
Open

Add MessageBufferConfig to allow custom back-pressure config for message.new events#6472
VelikovPetar wants to merge 2 commits into
developfrom
port/v6-to-develop/message-buffer-config

Conversation

@VelikovPetar
Copy link
Copy Markdown
Contributor

@VelikovPetar VelikovPetar commented May 25, 2026

Goal

Ports V6 PR #6406 (commit 3b6c6546418d010ba4dc29847459bece845bd878) to develop.

Linear issue: AND-1166

Manual port (not a clean cherry-pick) — v7 restructured stream-chat-android-state into stream-chat-android-client and merged StatePluginConfig fields into ChatClientConfig. As a consequence the file paths, packages, and the MessageLimitConfig location all moved:

V6 develop (v7)
stream-chat-android-state module stream-chat-android-client module
io.getstream.chat.android.state.plugin.config.MessageLimitConfig io.getstream.chat.android.client.api.MessageLimitConfig
StatePluginConfig.messageLimitConfig ChatClientConfig.messageLimitConfig

Same intent as the V6 PR — high-traffic channel types (e.g. livestreams) can produce a flood of message.new events that arrive faster than the sequential event-handling pipeline can process them. The current implementation funnels every socket event through a single MutableSharedFlow with extraBufferCapacity = Int.MAX_VALUE, which means there is no back-pressure: a burst of new-message events queues up unbounded memory and starves more important signals (reads, bans, member updates) of timely processing.

This PR introduces a MessageBufferConfig (under MessageLimitConfig.messageBufferConfig) that lets integrators opt specific channel types into a bounded buffer for NewMessageEvents, with a configurable overflow strategy (DROP_OLDEST / DROP_LATEST). Signal-critical events and events for non-opted-in channel types are unaffected. Default is a no-op.

Implementation

  • Added MessageBufferConfig (under MessageLimitConfig.messageBufferConfig) exposing:
    • channelTypes: Set<String> — channel types whose NewMessageEvents go through the bounded buffer (empty by default → feature is a no-op).
    • capacity: Int — buffer capacity (defaults to Int.MAX_VALUE).
    • overflow: MessageBufferOverflow — overflow strategy (DROP_OLDEST / DROP_LATEST, defaults to DROP_OLDEST).
  • EventHandlerSequential now allocates a secondary MutableSharedFlow (bufferedNewMessageEvents) lazily, only when buffering is enabled, so the default configuration pays no cost for it.
  • Two listener variants:
    • defaultSocketEventListener — the existing unbuffered path; used when no channel types are opted in.
    • bufferedSocketEventListener — routes NewMessageEvents for opted-in channel types to the bounded flow, and everything else (including non-opted-in NewMessageEvents and all other event types) to the unbuffered flow.
  • startListening() picks the listener based on bufferConfig.channelTypes.isNotEmpty() and only collects from bufferedNewMessageEvents when buffering is enabled.
  • StreamStatePluginFactory wires the config from ChatClientConfig.messageLimitConfig.messageBufferConfig into EventHandlerSequential.

The bounded flow shares the same downstream pipeline (socketEventCollectorhandleBatchEvent) as the unbuffered flow, so ordering inside each flow is preserved and back-pressure is applied independently per flow.

Files changed (7)

  • stream-chat-android-client/api/stream-chat-android-client.api — auto-regenerated via apiDump
  • stream-chat-android-client/.../api/ChatClientConfig.kt
  • stream-chat-android-client/.../internal/state/event/handler/internal/EventHandlerSequential.kt
  • stream-chat-android-client/.../internal/state/plugin/factory/StreamStatePluginFactory.kt
  • stream-chat-android-client/.../test/.../internal/state/event/TotalUnreadCountTest.kt
  • stream-chat-android-client/.../test/.../internal/state/event/handler/internal/EventHandlerSequentialTest.kt
  • stream-chat-android-client/.../test/.../internal/state/event/handler/internal/EventHandlerSequentialUserMessagesDeletedTest.kt

Stats match V6 exactly: +392 / −26.

UI Changes

No UI changes.

Testing

  • Added unit tests in EventHandlerSequentialTest covering:
    • NewMessageEvents for opted-in channel types are routed through the bounded buffer.
    • NewMessageEvents for non-opted-in channel types and all non-NewMessageEvent events keep using the unbuffered path.
    • DROP_OLDEST / DROP_LATEST overflow strategies behave as expected when the buffer is full.
    • Bombarding the listener with 5,000 events at capacity 100 drops oldest events under load.
  • Existing tests (TotalUnreadCountTest, EventHandlerSequentialUserMessagesDeletedTest) updated to pass the new bufferConfig argument.

Verification on develop:

  • detekt ✅
  • spotless ✅ (no changes)
  • apiDump ✅
  • unit tests ✅ (EventHandlerSequentialTest, EventHandlerSequentialUserMessagesDeletedTest, TotalUnreadCountTest, StateRegistryTest)

Summary by CodeRabbit

  • New Features
    • Added configurable message buffering for inbound message events by channel type with customizable capacity limits
    • Introduced message overflow strategies to handle capacity limits: drop oldest or drop latest messages when buffer is full
    • Extended message limit configuration to support bounded buffering with flexible overflow handling

Review Change Stack

…nfig for message.new events

Co-Authored-By: Claude <noreply@anthropic.com>
@VelikovPetar VelikovPetar added the pr:new-feature New feature label May 25, 2026
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 25, 2026

PR checklist ✅

All required conditions are satisfied:

  • Title length is OK (or ignored by label).
  • At least one pr: label exists.
  • Sections ### Goal, ### Implementation, and ### Testing are filled, or the PR is bot-authored.
  • An issue is linked (Linear ticket or GitHub issue), or the PR is bot-authored.

🎉 Great job! This PR is ready for review.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 25, 2026

SDK Size Comparison 📏

SDK Before After Difference Status
stream-chat-android-client 5.83 MB 5.84 MB 0.00 MB 🟢
stream-chat-android-ui-components 11.07 MB 11.07 MB 0.00 MB 🟢
stream-chat-android-compose 12.46 MB 12.47 MB 0.00 MB 🟢

@VelikovPetar VelikovPetar marked this pull request as ready for review May 25, 2026 12:42
@VelikovPetar VelikovPetar requested a review from a team as a code owner May 25, 2026 12:42
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 25, 2026

Walkthrough

This PR introduces configurable bounded buffering for socket NewMessageEvents in the Stream chat client. It adds public configuration types (MessageBufferConfig, MessageBufferOverflow), implements buffering logic with overflow strategy selection in EventHandlerSequential, wires configuration through the plugin factory, and validates overflow behavior with new test coverage.

Changes

Message Buffering and Overflow Handling

Layer / File(s) Summary
Public API contract for buffering configuration
stream-chat-android-client/api/stream-chat-android-client.api, stream-chat-android-client/src/main/java/io/getstream/chat/android/client/api/ChatClientConfig.kt
MessageBufferConfig data class exposes channel types, integer capacity, and MessageBufferOverflow strategy. MessageBufferOverflow enum defines DROP_OLDEST and DROP_LATEST overflow strategies. MessageLimitConfig is updated to include messageBufferConfig as a component.
EventHandlerSequential buffering implementation
stream-chat-android-client/src/main/java/io/getstream/chat/android/client/internal/state/event/handler/internal/EventHandlerSequential.kt
Constructor accepts bufferConfig parameter. Introduces lazily-created bounded bufferedNewMessageEvents flow that maps MessageBufferOverflow to MutableSharedFlow overflow behavior. Routes socket events conditionally through buffered or default listeners based on channel type configuration. Centralizes emit outcome logging and refactors startListening() to support optional buffering.
Plugin factory configuration wiring
stream-chat-android-client/src/main/java/io/getstream/chat/android/client/internal/state/plugin/factory/StreamStatePluginFactory.kt
Extracts messageBufferConfig from ChatClientConfig.messageLimitConfig and passes it through createEventHandler into EventHandlerSequential constructor.
Test infrastructure and buffering behavior validation
stream-chat-android-client/src/test/java/io/getstream/chat/android/client/internal/state/event/TotalUnreadCountTest.kt, stream-chat-android-client/src/test/java/io/getstream/chat/android/client/internal/state/event/handler/internal/EventHandlerSequentialTest.kt, stream-chat-android-client/src/test/java/io/getstream/chat/android/client/internal/state/event/handler/internal/EventHandlerSequentialUserMessagesDeletedTest.kt
Test fixtures updated to pass bufferConfig = MessageBufferConfig() into EventHandlerSequential. EventHandlerSequentialTest enhanced with listener capture via AtomicReference, side-effect gating using CompletableDeferred, and new overflow behavior tests validating DROP_OLDEST, DROP_LATEST, non-buffered passthrough, and stress testing with thousands of concurrent NewMessageEvents.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related PRs

  • GetStream/stream-chat-android#6406: Both PRs add the same MessageBufferConfig/overflow wiring to EventHandlerSequential so NewMessageEvent handling uses a lazily-created bounded buffer (with DROP_OLDEST/DROP_LATEST) for configured channel types, and the buffer config is threaded through the respective plugin/factory wiring.

Suggested reviewers

  • gpunto

Poem

🐰 Bounded buffers now keep messages in line,
Drop the oldest or newest by your design,
Channel types buffered with care and with grace,
Events flow swiftly through the right place,
Configuration complete, overflow at rest! ✨

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 4.35% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description check ✅ Passed The PR description comprehensively covers Goal, Implementation, Testing, and Files changed sections, thoroughly documenting the feature port, configuration details, and test coverage.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.
Title check ✅ Passed The title directly and accurately describes the main change: introducing MessageBufferConfig for back-pressure configuration of message.new events.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch port/v6-to-develop/message-buffer-config

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In
`@stream-chat-android-client/src/main/java/io/getstream/chat/android/client/api/ChatClientConfig.kt`:
- Around line 133-136: Revert MessageLimitConfig to keep the original
primary-constructor shape (single parameter channelMessageLimits) and make
messageBufferConfig an additive property declared in the class body (public val
messageBufferConfig: MessageBufferConfig = MessageBufferConfig()) instead of
adding it to the primary constructor; this preserves the existing generated
one-arg constructor, copy and componentN signatures for binary compatibility
while still exposing the new messageBufferConfig field.
- Around line 218-221: Add an API-level guard to reject non-positive buffer
sizes: in the MessageBufferConfig data class (symbol MessageBufferConfig)
validate the capacity property on construction and throw an
IllegalArgumentException if capacity <= 0 (except when using sentinel values you
intentionally allow), since capacity is used as extraBufferCapacity for
MutableSharedFlow with onBufferOverflow (MessageBufferOverflow) and
MutableSharedFlow requires extraBufferCapacity > 0 for DROP_* policies; ensure
the error message clearly names capacity and MessageBufferConfig so callers see
what's wrong.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: c56d6018-28d0-4d38-a129-e44964a672b9

📥 Commits

Reviewing files that changed from the base of the PR and between 4f2d20b and a9972f1.

📒 Files selected for processing (7)
  • stream-chat-android-client/api/stream-chat-android-client.api
  • stream-chat-android-client/src/main/java/io/getstream/chat/android/client/api/ChatClientConfig.kt
  • stream-chat-android-client/src/main/java/io/getstream/chat/android/client/internal/state/event/handler/internal/EventHandlerSequential.kt
  • stream-chat-android-client/src/main/java/io/getstream/chat/android/client/internal/state/plugin/factory/StreamStatePluginFactory.kt
  • stream-chat-android-client/src/test/java/io/getstream/chat/android/client/internal/state/event/TotalUnreadCountTest.kt
  • stream-chat-android-client/src/test/java/io/getstream/chat/android/client/internal/state/event/handler/internal/EventHandlerSequentialTest.kt
  • stream-chat-android-client/src/test/java/io/getstream/chat/android/client/internal/state/event/handler/internal/EventHandlerSequentialUserMessagesDeletedTest.kt

Comment on lines 133 to 136
public data class MessageLimitConfig(
public val channelMessageLimits: Set<ChannelMessageLimit> = setOf(),
public val messageBufferConfig: MessageBufferConfig = MessageBufferConfig(),
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Keep MessageLimitConfig binary compatible.

Adding messageBufferConfig to the primary constructor changes the generated public data-class ABI: the existing one-arg constructor disappears and the copy / componentN signatures change as well. That will break already-compiled SDK consumers on upgrade, so this needs an additive shape instead of a primary-constructor change.

As per coding guidelines, "Favour additive API changes and mark deprecations with clear migration paths; validate public APIs and maintain binary compatibility".

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@stream-chat-android-client/src/main/java/io/getstream/chat/android/client/api/ChatClientConfig.kt`
around lines 133 - 136, Revert MessageLimitConfig to keep the original
primary-constructor shape (single parameter channelMessageLimits) and make
messageBufferConfig an additive property declared in the class body (public val
messageBufferConfig: MessageBufferConfig = MessageBufferConfig()) instead of
adding it to the primary constructor; this preserves the existing generated
one-arg constructor, copy and componentN signatures for binary compatibility
while still exposing the new messageBufferConfig field.

Comment on lines +218 to +221
public data class MessageBufferConfig(
public val channelTypes: Set<String> = emptySet(),
public val capacity: Int = Int.MAX_VALUE,
public val overflow: MessageBufferOverflow = MessageBufferOverflow.DROP_OLDEST,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

rg -nC3 'MessageBufferConfig|capacity|MutableSharedFlow|extraBufferCapacity|onBufferOverflow|require\s*\(' stream-chat-android-client/src/main

Repository: GetStream/stream-chat-android

Length of output: 27801


🏁 Script executed:

#!/bin/bash
set -euo pipefail

FILE="stream-chat-android-client/src/main/java/io/getstream/chat/android/client/api/ChatClientConfig.kt"
# show MessageBufferConfig + its KDoc
nl -ba "$FILE" | sed -n '150,270p'

FILE2="stream-chat-android-client/src/main/java/io/getstream/chat/android/client/internal/state/event/handler/internal/EventHandlerSequential.kt"
# show where capacity is used
nl -ba "$FILE2" | sed -n '140,230p'

# find all usages of bufferConfig.capacity
rg -n "bufferConfig\.capacity" stream-chat-android-client/src/main/java/io/getstream/chat/android/client -S

Repository: GetStream/stream-chat-android

Length of output: 116


🏁 Script executed:

#!/bin/bash
set -euo pipefail

FILE="stream-chat-android-client/src/main/java/io/getstream/chat/android/client/api/ChatClientConfig.kt"
# show MessageBufferConfig KDoc and data class
cat -n "$FILE" | sed -n '150,260p'

FILE2="stream-chat-android-client/src/main/java/io/getstream/chat/android/client/internal/state/event/handler/internal/EventHandlerSequential.kt"
# show where capacity is used
cat -n "$FILE2" | sed -n '150,230p'

# find all usages of bufferConfig.capacity
rg -n "bufferConfig\.capacity" stream-chat-android-client/src/main/java/io/getstream/chat/android/client -S

Repository: GetStream/stream-chat-android

Length of output: 9460


🏁 Script executed:

#!/bin/bash
set -euo pipefail

FILE="stream-chat-android-client/src/main/java/io/getstream/chat/android/client/api/ChatClientConfig.kt"
cat -n "$FILE" | sed -n '150,260p'

FILE2="stream-chat-android-client/src/main/java/io/getstream/chat/android/client/internal/state/event/handler/internal/EventHandlerSequential.kt"
cat -n "$FILE2" | sed -n '150,230p'

rg -n "bufferConfig\.capacity" stream-chat-android-client/src/main/java/io/getstream/chat/android/client -S

Repository: GetStream/stream-chat-android

Length of output: 9460


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# 1) Look for any require/validation mentioning MessageBufferConfig or capacity
rg -n --hidden --no-ignore-vcs \
  "MessageBufferConfig|bufferConfig\.capacity|capacity must|capacity must be|require\s*\(.*capacity|capacity\s*[><=]\s*\d" \
  stream-chat-android-client/src/main/java/io/getstream/chat/android/client -S

# 2) Find where MessageBufferConfig is instantiated/propagated
rg -n "MessageBufferConfig\(" stream-chat-android-client/src/main/java/io/getstream/chat/android/client -S

Repository: GetStream/stream-chat-android

Length of output: 2527


🏁 Script executed:

#!/bin/bash
set -euo pipefail

rg -n --hidden --no-ignore-vcs \
  "(MessageBufferConfig\s*\.?\s*capacity|bufferConfig\.capacity|capacity must be|require\s*\(.*capacity|capacity\s*[><]=?\s*[0-9]+)" \
  . -S

Repository: GetStream/stream-chat-android

Length of output: 274


🌐 Web query:

Kotlinx coroutines MutableSharedFlow extraBufferCapacity negative value behavior

💡 Result:

In kotlinx.coroutines, MutableSharedFlow(extraBufferCapacity=…) does not have any runtime “negative value” behavior: it validates inputs and throws IllegalArgumentException if extraBufferCapacity is negative. Specifically, the MutableSharedFlow(replay, extraBufferCapacity, …) constructor function contains a check require(extraBufferCapacity >= 0) { "extraBufferCapacity cannot be negative, but was $extraBufferCapacity" }, so passing a negative extraBufferCapacity will fail immediately with IllegalArgumentException (before any flow emissions). [1] Related notes: - The documentation for MutableSharedFlow also states that extraBufferCapacity “cannot be negative”. [2] - There is additional validation logic for the combination of (replay, extraBufferCapacity, onBufferOverflow): if you choose a non-default onBufferOverflow strategy other than BufferOverflow.SUSPEND, then replay or extraBufferCapacity must be > 0; otherwise it will also throw. [1][2] So for your query “MutableSharedFlow extraBufferCapacity negative value behavior”: the behavior is “constructor rejects the value and throws IllegalArgumentException,” not a special buffering/overflow semantics. [1][2]

Citations:


Reject non-positive capacity values at the API boundary.

MessageBufferConfig.capacity is passed directly as extraBufferCapacity when constructing the buffered MutableSharedFlow with onBufferOverflow = DROP_OLDEST/DROP_LATEST. MutableSharedFlow rejects negative values and also requires extraBufferCapacity > 0 for DROP_* (replay defaults to 0), so capacity <= 0 will surface as an IllegalArgumentException at runtime when buffering becomes active.

Suggested guard
 public data class MessageBufferConfig(
     public val channelTypes: Set<String> = emptySet(),
     public val capacity: Int = Int.MAX_VALUE,
     public val overflow: MessageBufferOverflow = MessageBufferOverflow.DROP_OLDEST,
-) 
+) {
+    init {
+        require(capacity > 0) { "capacity must be > 0" }
+    }
+}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
public data class MessageBufferConfig(
public val channelTypes: Set<String> = emptySet(),
public val capacity: Int = Int.MAX_VALUE,
public val overflow: MessageBufferOverflow = MessageBufferOverflow.DROP_OLDEST,
public data class MessageBufferConfig(
public val channelTypes: Set<String> = emptySet(),
public val capacity: Int = Int.MAX_VALUE,
public val overflow: MessageBufferOverflow = MessageBufferOverflow.DROP_OLDEST,
) {
init {
require(capacity > 0) { "capacity must be > 0" }
}
}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@stream-chat-android-client/src/main/java/io/getstream/chat/android/client/api/ChatClientConfig.kt`
around lines 218 - 221, Add an API-level guard to reject non-positive buffer
sizes: in the MessageBufferConfig data class (symbol MessageBufferConfig)
validate the capacity property on construction and throw an
IllegalArgumentException if capacity <= 0 (except when using sentinel values you
intentionally allow), since capacity is used as extraBufferCapacity for
MutableSharedFlow with onBufferOverflow (MessageBufferOverflow) and
MutableSharedFlow requires extraBufferCapacity > 0 for DROP_* policies; ensure
the error message clearly names capacity and MessageBufferConfig so callers see
what's wrong.

@VelikovPetar VelikovPetar changed the title Add MessageBufferConfig to allow custom back-pressure config for message.new events Add MessageBufferConfig to allow custom back-pressure config for message.new events May 25, 2026
@sonarqubecloud
Copy link
Copy Markdown

Quality Gate Failed Quality Gate failed

Failed conditions
78.6% Coverage on New Code (required ≥ 80%)

See analysis details on SonarQube Cloud

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

pr:new-feature New feature

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants