Skip to content

[Dataflow Streaming] Add a job setting to limit value size in windmill state cache#38458

Open
arunpandianp wants to merge 4 commits into
apache:masterfrom
arunpandianp:maxWindmillStateCacheValueBytes
Open

[Dataflow Streaming] Add a job setting to limit value size in windmill state cache#38458
arunpandianp wants to merge 4 commits into
apache:masterfrom
arunpandianp:maxWindmillStateCacheValueBytes

Conversation

@arunpandianp
Copy link
Copy Markdown
Contributor

@arunpandianp arunpandianp commented May 12, 2026

Also updated cache status page to show key, value size distributions of entries in the cache.

@arunpandianp
Copy link
Copy Markdown
Contributor Author

R: @scwhittle

@gemini-code-assist
Copy link
Copy Markdown
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces a mechanism to control the maximum size of individual entries within the Windmill state cache. By adding a configurable limit, the system can prevent excessively large values from consuming cache space, improving overall memory efficiency and stability. The changes include updates to the pipeline options, the worker's cache logic, and the underlying protocol definitions to support this new setting.

Highlights

  • New Job Setting: Added a new configuration option max_cached_value_bytes to limit the size of individual values stored in the Windmill state cache.
  • Dynamic Cache Management: Implemented logic to dynamically observe and apply cache size limits from job settings, allowing for runtime adjustments.
  • Cache Filtering: Updated the cache persistence layer to invalidate or skip caching for entries that exceed the defined size threshold.
New Features

🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@github-actions
Copy link
Copy Markdown
Contributor

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment assign set of reviewers

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a configurable limit on the size of values stored in the Windmill state cache to prevent large entries from consuming excessive memory. Key changes include adding a new pipeline option, implementing enforcement logic in WindmillStateCache that invalidates oversized entries during persistence, and enabling dynamic overrides via worker job settings. Feedback focused on clarifying that the byte limit includes cache overhead in the documentation, correcting a variable name typo, and ensuring the dynamic override resets to the default value if the configuration is removed.

@arunpandianp
Copy link
Copy Markdown
Contributor Author

/gemini review

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a maximum size limit for cached values in the Windmill state cache, configurable via pipeline options or dynamic job settings. It also adds weight distribution histograms for keys, entries, and values to the worker's status page, with an experiment flag provided to disable this feature if needed. Feedback on the changes includes a potential NullPointerException when accessing job settings, a discrepancy between the documented value size limit and the implementation (which includes object overhead), and performance concerns regarding the complexity of calculating histograms for large states.

Comment on lines +664 to +667
config -> {
windmillStateCache.setMaxCachedValueBytesOverride(
config.userWorkerJobSettings().getMaxCachedValueBytes());
});
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The config.userWorkerJobSettings() call might return null if the settings are not present in the configuration response. Accessing getMaxCachedValueBytes() on a null object will cause a NullPointerException, which could crash the configuration observer thread. It is safer to check for null before accessing the settings.

Suggested change
config -> {
windmillStateCache.setMaxCachedValueBytesOverride(
config.userWorkerJobSettings().getMaxCachedValueBytes());
});
config -> {
if (config.userWorkerJobSettings() != null) {
windmillStateCache.setMaxCachedValueBytesOverride(
config.userWorkerJobSettings().getMaxCachedValueBytes());
}
});

Comment on lines +148 to +151
stateCacheEntry.values.forEach(
(encodedAddress, weightedValue) -> {
stats.addValueWeight(weightedValue.weight);
});
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Iterating over all values in every cache entry to calculate the weight distribution increases the complexity of calculateEntryStats from $O(\text{entries})$ to $O(\text{total values})$. For large caches with many values per entry (e.g., large BagState or MapState), this could cause noticeable delays when accessing the worker's status page. Since this is for a debug page, it might be acceptable, but consider if the performance impact has been evaluated for very large states.

Comment on lines +512 to +513
if (entry.getWeight() <= limit) {
stateCache.put(id, entry);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The limit is compared against entry.getWeight(), which includes the overhead of the StateCacheEntry and StateId (approximately 136 bytes). This means that if a user sets a small limit (e.g., 100 bytes), no values will ever be cached. The documentation in DataflowStreamingPipelineOptions says "maximum size of cached values", which might lead users to believe it only applies to the payload. Consider clarifying the documentation or adjusting the logic to only account for the value size if that was the intent.

Comment on lines +146 to +149
stats.addKeyWeight(idWeight);
stats.addEntryWeight(entryWeight);
stateCacheEntry.values.forEach(
(encodedAddress, weightedValue) -> {
Copy link
Copy Markdown
Contributor Author

@arunpandianp arunpandianp May 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is executed once every (10m) debug capture report. I think computing the histograms here is better than the overhead for keeping it in sync during the processing. Added an experiment to disable the histogram stats if needed. Any other options?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant