[Dataflow Streaming] Add a job setting to limit value size in windmill state cache#38458
[Dataflow Streaming] Add a job setting to limit value size in windmill state cache#38458arunpandianp wants to merge 4 commits into
Conversation
|
R: @scwhittle |
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a mechanism to control the maximum size of individual entries within the Windmill state cache. By adding a configurable limit, the system can prevent excessively large values from consuming cache space, improving overall memory efficiency and stability. The changes include updates to the pipeline options, the worker's cache logic, and the underlying protocol definitions to support this new setting. Highlights
New Features🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
|
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment |
There was a problem hiding this comment.
Code Review
This pull request introduces a configurable limit on the size of values stored in the Windmill state cache to prevent large entries from consuming excessive memory. Key changes include adding a new pipeline option, implementing enforcement logic in WindmillStateCache that invalidates oversized entries during persistence, and enabling dynamic overrides via worker job settings. Feedback focused on clarifying that the byte limit includes cache overhead in the documentation, correcting a variable name typo, and ensuring the dynamic override resets to the default value if the configuration is removed.
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces a maximum size limit for cached values in the Windmill state cache, configurable via pipeline options or dynamic job settings. It also adds weight distribution histograms for keys, entries, and values to the worker's status page, with an experiment flag provided to disable this feature if needed. Feedback on the changes includes a potential NullPointerException when accessing job settings, a discrepancy between the documented value size limit and the implementation (which includes object overhead), and performance concerns regarding the complexity of calculating histograms for large states.
| config -> { | ||
| windmillStateCache.setMaxCachedValueBytesOverride( | ||
| config.userWorkerJobSettings().getMaxCachedValueBytes()); | ||
| }); |
There was a problem hiding this comment.
The config.userWorkerJobSettings() call might return null if the settings are not present in the configuration response. Accessing getMaxCachedValueBytes() on a null object will cause a NullPointerException, which could crash the configuration observer thread. It is safer to check for null before accessing the settings.
| config -> { | |
| windmillStateCache.setMaxCachedValueBytesOverride( | |
| config.userWorkerJobSettings().getMaxCachedValueBytes()); | |
| }); | |
| config -> { | |
| if (config.userWorkerJobSettings() != null) { | |
| windmillStateCache.setMaxCachedValueBytesOverride( | |
| config.userWorkerJobSettings().getMaxCachedValueBytes()); | |
| } | |
| }); |
| stateCacheEntry.values.forEach( | ||
| (encodedAddress, weightedValue) -> { | ||
| stats.addValueWeight(weightedValue.weight); | ||
| }); |
There was a problem hiding this comment.
Iterating over all values in every cache entry to calculate the weight distribution increases the complexity of calculateEntryStats from BagState or MapState), this could cause noticeable delays when accessing the worker's status page. Since this is for a debug page, it might be acceptable, but consider if the performance impact has been evaluated for very large states.
| if (entry.getWeight() <= limit) { | ||
| stateCache.put(id, entry); |
There was a problem hiding this comment.
The limit is compared against entry.getWeight(), which includes the overhead of the StateCacheEntry and StateId (approximately 136 bytes). This means that if a user sets a small limit (e.g., 100 bytes), no values will ever be cached. The documentation in DataflowStreamingPipelineOptions says "maximum size of cached values", which might lead users to believe it only applies to the payload. Consider clarifying the documentation or adjusting the logic to only account for the value size if that was the intent.
| stats.addKeyWeight(idWeight); | ||
| stats.addEntryWeight(entryWeight); | ||
| stateCacheEntry.values.forEach( | ||
| (encodedAddress, weightedValue) -> { |
There was a problem hiding this comment.
This is executed once every (10m) debug capture report. I think computing the histograms here is better than the overhead for keeping it in sync during the processing. Added an experiment to disable the histogram stats if needed. Any other options?
Also updated cache status page to show key, value size distributions of entries in the cache.