From 2ec6939b74d0fd76c0db0d8fa13fd2e83e458582 Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Tue, 12 May 2026 08:22:20 +0000 Subject: [PATCH 1/6] [Dataflow Streaming] Add a job setting to limit value size in windmill state cache --- .../DataflowStreamingPipelineOptions.java | 10 +++++ .../worker/StreamingDataflowWorker.java | 15 ++++++- .../windmill/state/WindmillStateCache.java | 26 ++++++++++-- .../state/WindmillStateCacheTest.java | 41 +++++++++++++++++++ .../windmill/src/main/proto/windmill.proto | 2 + 5 files changed, 90 insertions(+), 4 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java index 9727048e47aa..600bb9b7f209 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java @@ -249,6 +249,16 @@ public interface DataflowStreamingPipelineOptions extends PipelineOptions { void setIsWindmillServiceDirectPathEnabled(boolean isWindmillServiceDirectPathEnabled); + /** + * The maximum size of cached values in bytes. Values larger than this limit will not be cached by + * the windmill state cache + */ + @Description("The maximum size of cached values in bytes.") + @Default.Long(Long.MAX_VALUE) + Long getMaxWindmillStateCacheValueBytes(); + + void setMaxWindmillStateCacheValueBytes(Long value); + /** * Factory for creating local Windmill address. Reads from system propery 'windmill.hostport' for * backwards compatibility. diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index f5e5adab1556..acce4c76f350 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -50,6 +50,7 @@ import org.apache.beam.runners.dataflow.worker.streaming.WeightedSemaphore; import org.apache.beam.runners.dataflow.worker.streaming.WorkHeartbeatResponseProcessor; import org.apache.beam.runners.dataflow.worker.streaming.config.ComputationConfig; +import org.apache.beam.runners.dataflow.worker.streaming.config.ComputationConfig.Fetcher; import org.apache.beam.runners.dataflow.worker.streaming.config.FixedGlobalConfigHandle; import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingApplianceComputationConfigFetcher; import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingEngineComputationConfigFetcher; @@ -633,6 +634,7 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o WindmillStateCache.builder() .setSizeMb(options.getWorkerCacheMb()) .setSupportMapViaMultimap(options.isEnableStreamingEngine()) + .setMaxCachedValueBytes(options.getMaxWindmillStateCacheValueBytes()) .build(); GrpcWindmillStreamFactory.Builder windmillStreamFactoryBuilder = @@ -651,6 +653,17 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o windmillStateCache::forComputation, ID_GENERATOR)); + Fetcher configedFetcher = configFetcherComputationStateCacheAndWindmillClient.configFetcher(); + configedFetcher + .getGlobalConfigHandle() + .registerConfigObserver( + config -> { + if (config.userWorkerJobSettings().hasMaxCachedValueBytes()) { + windmillStateCache.setMaxCachedValueBytesOverride( + config.userWorkerJobSettings().getMaxCachedValueBytes()); + } + }); + ComputationStateCache computationStateCache = configFetcherComputationStateCacheAndWindmillClient.computationStateCache(); WindmillServerStub windmillServer = @@ -689,7 +702,7 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o return new StreamingDataflowWorker( windmillServer, clientId, - configFetcherComputationStateCacheAndWindmillClient.configFetcher(), + configedFetcher, computationStateCache, windmillStateCache, workExecutor, diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCache.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCache.java index 07c9599c866a..ba4e01df1a85 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCache.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCache.java @@ -75,9 +75,12 @@ public class WindmillStateCache implements StatusDataProvider { private final ConcurrentMap keyIndex; private final long workerCacheBytes; // Copy workerCacheMb and convert to bytes. private final boolean supportMapViaMultimap; + private final long defaultMaxCachedValueBytes; + private volatile long maxCachedValueBytesOverride = -1L; - WindmillStateCache(long sizeMb, boolean supportMapViaMultimap) { + WindmillStateCache(long sizeMb, boolean supportMapViaMultimap, long maxCachedValueBytes) { this.workerCacheBytes = sizeMb * MEGABYTES; + this.defaultMaxCachedValueBytes = maxCachedValueBytes; int stateCacheConcurrencyLevel = Math.max(STATE_CACHE_CONCURRENCY_LEVEL, Runtime.getRuntime().availableProcessors()); this.stateCache = @@ -99,11 +102,19 @@ public interface Builder { Builder setSupportMapViaMultimap(boolean supportMapViaMultimap); + Builder setMaxCachedValueBytes(long maxCachedValueBytes); + WindmillStateCache build(); } public static Builder builder() { - return new AutoBuilder_WindmillStateCache_Builder().setSupportMapViaMultimap(false); + return new AutoBuilder_WindmillStateCache_Builder() + .setSupportMapViaMultimap(false) + .setMaxCachedValueBytes(Long.MAX_VALUE); + } + + public void setMaxCachedValueBytesOverride(long limit) { + this.maxCachedValueBytesOverride = limit; } private EntryStats calculateEntryStats() { @@ -413,7 +424,16 @@ public void put( } public void persist() { - localCache.forEach(stateCache::put); + long override = WindmillStateCache.this.maxCachedValueBytesOverride; + long limit = override >= 0 ? override : WindmillStateCache.this.defaultMaxCachedValueBytes; + localCache.forEach( + (id, entry) -> { + if (entry.getWeight() <= limit) { + stateCache.put(id, entry); + } else { + stateCache.invalidate(id); + } + }); } } } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCacheTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCacheTest.java index bbb8e4c93c07..fb50b7892974 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCacheTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCacheTest.java @@ -270,6 +270,47 @@ public void testMaxWeight() throws Exception { assertEquals(400 * MEGABYTES, cache.getMaxWeight()); } + @Test + public void testMaxCachedValueBytes() throws Exception { + cache.setMaxCachedValueBytesOverride( + 100); // Set limit to 100 bytes, per cache entry overhead is 136. + + WindmillStateCache.ForKeyAndFamily keyCache = + cache.forComputation(COMPUTATION).forKey(COMPUTATION_KEY, 0L, 1L).forFamily(STATE_FAMILY); + + TestStateTag tag1 = new TestStateTag("tag1"); + TestStateTag tag2 = new TestStateTag("tag2"); + + putInCache(keyCache, StateNamespaces.global(), tag1, new TestState("g1"), 10); + keyCache.persist(); + + // It should not be in global cache because it's too large. + keyCache = + cache.forComputation(COMPUTATION).forKey(COMPUTATION_KEY, 0L, 2L).forFamily(STATE_FAMILY); + assertEquals(Optional.empty(), getFromCache(keyCache, StateNamespaces.global(), tag1)); + + // Now set limit larger. + cache.setMaxCachedValueBytesOverride(1000); + + putInCache(keyCache, StateNamespaces.global(), tag2, new TestState("g2"), 10); + keyCache.persist(); + + // It should be in global cache. + keyCache = + cache.forComputation(COMPUTATION).forKey(COMPUTATION_KEY, 0L, 3L).forFamily(STATE_FAMILY); + assertEquals( + Optional.of(new TestState("g2")), getFromCache(keyCache, StateNamespaces.global(), tag2)); + + // Now update it to be larger than limit. + putInCache(keyCache, StateNamespaces.global(), tag2, new TestState("g2_large"), 2000); + keyCache.persist(); + + // It should be removed from global cache. + keyCache = + cache.forComputation(COMPUTATION).forKey(COMPUTATION_KEY, 0L, 4L).forFamily(STATE_FAMILY); + assertEquals(Optional.empty(), getFromCache(keyCache, StateNamespaces.global(), tag2)); + } + /** Verifies that values are cached in the appropriate namespaces. */ @Test public void testInvalidation() throws Exception { diff --git a/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto b/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto index b7579cbacb8e..a3779de790a0 100644 --- a/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto +++ b/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto @@ -986,6 +986,8 @@ message UserWorkerRunnerV1Settings { optional ConnectivityType connectivity_type = 4 [default = CONNECTIVITY_TYPE_DEFAULT]; + optional int64 max_cached_value_bytes = 5; + reserved 1, 2; } From 59a7c3d8161b0f30345f771ca4db443ff8de72da Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Tue, 12 May 2026 08:42:16 +0000 Subject: [PATCH 2/6] Add key, value size distribution to state cache stats --- .../worker/StreamingDataflowWorker.java | 4 + .../windmill/state/WindmillStateCache.java | 113 +++++++++++++++--- .../state/WindmillStateCacheTest.java | 21 ++++ 3 files changed, 119 insertions(+), 19 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index acce4c76f350..39480a0ab749 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -114,6 +114,7 @@ import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySinkMetrics; import org.apache.beam.sdk.metrics.MetricsEnvironment; +import org.apache.beam.sdk.options.ExperimentalOptions; import org.apache.beam.sdk.util.construction.CoderTranslation; import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.auth.MoreCallCredentials; @@ -635,6 +636,9 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o .setSizeMb(options.getWorkerCacheMb()) .setSupportMapViaMultimap(options.isEnableStreamingEngine()) .setMaxCachedValueBytes(options.getMaxWindmillStateCacheValueBytes()) + .setEnableHistogram( + !ExperimentalOptions.hasExperiment( + options, "disable_windmill_user_state_cache_histogram")) .build(); GrpcWindmillStreamFactory.Builder windmillStreamFactoryBuilder = diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCache.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCache.java index ba4e01df1a85..ba452a0cc01b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCache.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCache.java @@ -76,11 +76,17 @@ public class WindmillStateCache implements StatusDataProvider { private final long workerCacheBytes; // Copy workerCacheMb and convert to bytes. private final boolean supportMapViaMultimap; private final long defaultMaxCachedValueBytes; + private final boolean enableHistogram; private volatile long maxCachedValueBytesOverride = -1L; - WindmillStateCache(long sizeMb, boolean supportMapViaMultimap, long maxCachedValueBytes) { + WindmillStateCache( + long sizeMb, + boolean supportMapViaMultimap, + long maxCachedValueBytes, + boolean enableHistogram) { this.workerCacheBytes = sizeMb * MEGABYTES; this.defaultMaxCachedValueBytes = maxCachedValueBytes; + this.enableHistogram = enableHistogram; int stateCacheConcurrencyLevel = Math.max(STATE_CACHE_CONCURRENCY_LEVEL, Runtime.getRuntime().availableProcessors()); this.stateCache = @@ -104,13 +110,16 @@ public interface Builder { Builder setMaxCachedValueBytes(long maxCachedValueBytes); + Builder setEnableHistogram(boolean enableHistogram); + WindmillStateCache build(); } public static Builder builder() { return new AutoBuilder_WindmillStateCache_Builder() .setSupportMapViaMultimap(false) - .setMaxCachedValueBytes(Long.MAX_VALUE); + .setMaxCachedValueBytes(Long.MAX_VALUE) + .setEnableHistogram(true); } public void setMaxCachedValueBytesOverride(long limit) { @@ -122,10 +131,20 @@ private EntryStats calculateEntryStats() { BiConsumer consumer = (stateId, stateCacheEntry) -> { stats.entries++; - stats.idWeight += stateId.getWeight(); - stats.entryWeight += stateCacheEntry.getWeight(); + long idWeight = stateId.getWeight(); + stats.idWeight += idWeight; + long entryWeight = stateCacheEntry.getWeight(); + stats.entryWeight += entryWeight; stats.entryValues += stateCacheEntry.values.size(); stats.maxEntryValues = Math.max(stats.maxEntryValues, stateCacheEntry.values.size()); + if (enableHistogram) { + stats.addKeyWeight(idWeight); + stats.addEntryWeight(entryWeight); + stateCacheEntry.values.forEach( + (encodedAddress, weightedValue) -> { + stats.addValueWeight(weightedValue.weight); + }); + } }; stateCache.asMap().forEach(consumer); return stats; @@ -149,27 +168,58 @@ public ForComputation forComputation(String computation) { return new ForComputation(computation); } + private static String formatHistogram(long[] histogram) { + return String.format( + "[<128B:%d, <256B:%d, <512B:%d, <1KB:%d, <10KB:%d, <1MB:%d, >=1MB:%d]", + histogram[0], + histogram[1], + histogram[2], + histogram[3], + histogram[4], + histogram[5], + histogram[6]); + } + /** Print summary statistics of the cache to the given {@link PrintWriter}. */ @Override public void appendSummaryHtml(PrintWriter response) { response.println("Cache Stats:
"); - response.println( - "" - + "" - + "" - + ""); CacheStats cacheStats = stateCache.stats(); EntryStats entryStats = calculateEntryStats(); - response.println(""); - response.println(""); - response.println(""); - response.println(""); - response.println(""); - response.println(""); - response.println(""); - response.println(""); - response.println(""); - response.println("
Hit RatioEvictionsEntriesEntry ValuesMax Entry ValuesId WeightEntry WeightMax WeightKeys
" + cacheStats.hitRate() + "" + cacheStats.evictionCount() + "" + entryStats.entries + "(" + stateCache.size() + " inc. weak) " + entryStats.entryValues + "" + entryStats.maxEntryValues + "" + entryStats.idWeight / MEGABYTES + "MB" + entryStats.entryWeight / MEGABYTES + "MB" + getMaxWeight() / MEGABYTES + "MB" + keyIndex.size() + "

"); + + response.println("Hit Ratio" + cacheStats.hitRate() + ""); + response.println("Evictions" + cacheStats.evictionCount() + ""); + response.println( + "Entries" + + entryStats.entries + + " (" + + stateCache.size() + + " inc. weak)"); + response.println("Entry Values" + entryStats.entryValues + ""); + response.println( + "Max Entry Values" + entryStats.maxEntryValues + ""); + response.println( + "Id Weight" + entryStats.idWeight / MEGABYTES + "MB"); + response.println( + "Entry Weight" + entryStats.entryWeight / MEGABYTES + "MB"); + response.println("Max Weight" + getMaxWeight() / MEGABYTES + "MB"); + response.println("Keys" + keyIndex.size() + ""); + if (enableHistogram) { + response.println( + "Entry Weight Dist" + + formatHistogram(entryStats.entryWeightHistogram) + + ""); + response.println( + "Value Weight Dist" + + formatHistogram(entryStats.valueWeightHistogram) + + ""); + response.println( + "Key Weight Dist" + + formatHistogram(entryStats.keyWeightHistogram) + + ""); + } + + response.println("
"); } public BaseStatusServlet statusServlet() { @@ -191,6 +241,31 @@ private static class EntryStats { long entryWeight; long entryValues; long maxEntryValues; + long[] entryWeightHistogram = new long[7]; + long[] valueWeightHistogram = new long[7]; + long[] keyWeightHistogram = new long[7]; + + void addEntryWeight(long weight) { + entryWeightHistogram[getBucket(weight)]++; + } + + void addValueWeight(long weight) { + valueWeightHistogram[getBucket(weight)]++; + } + + void addKeyWeight(long weight) { + keyWeightHistogram[getBucket(weight)]++; + } + + private int getBucket(long weight) { + if (weight < 128) return 0; + if (weight < 256) return 1; + if (weight < 512) return 2; + if (weight < 1024) return 3; + if (weight < 10 * 1024) return 4; + if (weight < 1024 * 1024) return 5; + return 6; + } } /** diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCacheTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCacheTest.java index fb50b7892974..649206bb3777 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCacheTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCacheTest.java @@ -311,6 +311,27 @@ public void testMaxCachedValueBytes() throws Exception { assertEquals(Optional.empty(), getFromCache(keyCache, StateNamespaces.global(), tag2)); } + @Test + public void testDisableHistogram() throws Exception { + WindmillStateCache noHistogramCache = + WindmillStateCache.builder().setSizeMb(400).setEnableHistogram(false).build(); + WindmillStateCache.ForKeyAndFamily keyCache = + noHistogramCache + .forComputation(COMPUTATION) + .forKey(COMPUTATION_KEY, 0L, 1L) + .forFamily(STATE_FAMILY); + + putInCache( + keyCache, StateNamespaces.global(), new TestStateTag("tag1"), new TestState("g1"), 2); + keyCache.persist(); + + java.io.StringWriter writer = new java.io.StringWriter(); + noHistogramCache.appendSummaryHtml(new java.io.PrintWriter(writer)); + String summary = writer.toString(); + + org.junit.Assert.assertFalse(summary.contains("Entry Weight Dist")); + } + /** Verifies that values are cached in the appropriate namespaces. */ @Test public void testInvalidation() throws Exception { From af26fb1f6d2f899bfe5217bcc8987a0272651e41 Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Tue, 12 May 2026 08:52:39 +0000 Subject: [PATCH 3/6] Address comments --- .../dataflow/worker/StreamingDataflowWorker.java | 12 +++++------- .../worker/windmill/src/main/proto/windmill.proto | 2 +- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index 39480a0ab749..580a29d9d48c 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -657,15 +657,13 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o windmillStateCache::forComputation, ID_GENERATOR)); - Fetcher configedFetcher = configFetcherComputationStateCacheAndWindmillClient.configFetcher(); - configedFetcher + Fetcher configFetcher = configFetcherComputationStateCacheAndWindmillClient.configFetcher(); + configFetcher .getGlobalConfigHandle() .registerConfigObserver( config -> { - if (config.userWorkerJobSettings().hasMaxCachedValueBytes()) { - windmillStateCache.setMaxCachedValueBytesOverride( - config.userWorkerJobSettings().getMaxCachedValueBytes()); - } + windmillStateCache.setMaxCachedValueBytesOverride( + config.userWorkerJobSettings().getMaxCachedValueBytes()); }); ComputationStateCache computationStateCache = @@ -706,7 +704,7 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o return new StreamingDataflowWorker( windmillServer, clientId, - configedFetcher, + configFetcher, computationStateCache, windmillStateCache, workExecutor, diff --git a/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto b/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto index a3779de790a0..99e830621f9b 100644 --- a/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto +++ b/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto @@ -986,7 +986,7 @@ message UserWorkerRunnerV1Settings { optional ConnectivityType connectivity_type = 4 [default = CONNECTIVITY_TYPE_DEFAULT]; - optional int64 max_cached_value_bytes = 5; + optional int64 max_cached_value_bytes = 5 [default = -1]; reserved 1, 2; } From e55179080a9b002c43d2b30e53446ae394853800 Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Tue, 12 May 2026 08:58:38 +0000 Subject: [PATCH 4/6] Add max value size limit to status page --- .../worker/windmill/state/WindmillStateCache.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCache.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCache.java index ba452a0cc01b..722f4d88c2c5 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCache.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCache.java @@ -126,6 +126,11 @@ public void setMaxCachedValueBytesOverride(long limit) { this.maxCachedValueBytesOverride = limit; } + private long getMaxCachedValueBytesLimit() { + long override = maxCachedValueBytesOverride; + return override >= 0 ? override : defaultMaxCachedValueBytes; + } + private EntryStats calculateEntryStats() { EntryStats stats = new EntryStats(); BiConsumer consumer = @@ -204,6 +209,8 @@ public void appendSummaryHtml(PrintWriter response) { "Entry Weight" + entryStats.entryWeight / MEGABYTES + "MB"); response.println("Max Weight" + getMaxWeight() / MEGABYTES + "MB"); response.println("Keys" + keyIndex.size() + ""); + response.println( + "Value Size Limit" + getMaxCachedValueBytesLimit() + " bytes"); if (enableHistogram) { response.println( "Entry Weight Dist" @@ -499,8 +506,7 @@ public void put( } public void persist() { - long override = WindmillStateCache.this.maxCachedValueBytesOverride; - long limit = override >= 0 ? override : WindmillStateCache.this.defaultMaxCachedValueBytes; + long limit = WindmillStateCache.this.getMaxCachedValueBytesLimit(); localCache.forEach( (id, entry) -> { if (entry.getWeight() <= limit) { From 10e004243a6615f747ee0c0b17c6f2bd1da1d366 Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Thu, 14 May 2026 08:25:58 +0000 Subject: [PATCH 5/6] Address comments --- .../DataflowStreamingPipelineOptions.java | 10 +-- .../worker/StreamingDataflowWorker.java | 6 +- .../windmill/state/WindmillStateCache.java | 67 +++++++------------ .../state/WindmillStateCacheTest.java | 6 +- .../windmill/src/main/proto/windmill.proto | 2 +- 5 files changed, 35 insertions(+), 56 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java index 600bb9b7f209..90375ad445ae 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java @@ -250,14 +250,14 @@ public interface DataflowStreamingPipelineOptions extends PipelineOptions { void setIsWindmillServiceDirectPathEnabled(boolean isWindmillServiceDirectPathEnabled); /** - * The maximum size of cached values in bytes. Values larger than this limit will not be cached by - * the windmill state cache + * The maximum size of cached entries in bytes. Entries (eg: values, bags) larger than this limit + * will not be cached by the windmill state cache */ - @Description("The maximum size of cached values in bytes.") + @Description("The maximum size of cached entries in bytes.") @Default.Long(Long.MAX_VALUE) - Long getMaxWindmillStateCacheValueBytes(); + Long getMaxWindmillStateCacheEntryBytes(); - void setMaxWindmillStateCacheValueBytes(Long value); + void setMaxWindmillStateCacheEntryBytes(Long value); /** * Factory for creating local Windmill address. Reads from system propery 'windmill.hostport' for diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index 580a29d9d48c..4d070da995b3 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -635,7 +635,7 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o WindmillStateCache.builder() .setSizeMb(options.getWorkerCacheMb()) .setSupportMapViaMultimap(options.isEnableStreamingEngine()) - .setMaxCachedValueBytes(options.getMaxWindmillStateCacheValueBytes()) + .setMaxCachedEntryBytes(options.getMaxWindmillStateCacheEntryBytes()) .setEnableHistogram( !ExperimentalOptions.hasExperiment( options, "disable_windmill_user_state_cache_histogram")) @@ -662,8 +662,8 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o .getGlobalConfigHandle() .registerConfigObserver( config -> { - windmillStateCache.setMaxCachedValueBytesOverride( - config.userWorkerJobSettings().getMaxCachedValueBytes()); + windmillStateCache.setMaxCachedEntryBytesOverride( + config.userWorkerJobSettings().getMaxCachedEntryBytes()); }); ComputationStateCache computationStateCache = diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCache.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCache.java index 722f4d88c2c5..7515db000852 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCache.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCache.java @@ -32,6 +32,7 @@ import org.apache.beam.runners.dataflow.worker.status.BaseStatusServlet; import org.apache.beam.runners.dataflow.worker.status.StatusDataProvider; import org.apache.beam.runners.dataflow.worker.streaming.ShardedKey; +import org.apache.beam.runners.dataflow.worker.util.SimpleByteHistogram; import org.apache.beam.runners.dataflow.worker.util.common.worker.InternedByteString; import org.apache.beam.sdk.state.State; import org.apache.beam.sdk.util.Weighted; @@ -75,17 +76,17 @@ public class WindmillStateCache implements StatusDataProvider { private final ConcurrentMap keyIndex; private final long workerCacheBytes; // Copy workerCacheMb and convert to bytes. private final boolean supportMapViaMultimap; - private final long defaultMaxCachedValueBytes; + private final long defaultMaxCachedEntryBytes; private final boolean enableHistogram; - private volatile long maxCachedValueBytesOverride = -1L; + private volatile long maxCachedEntryBytesOverride = -1L; WindmillStateCache( long sizeMb, boolean supportMapViaMultimap, - long maxCachedValueBytes, + long maxCachedEntryBytes, boolean enableHistogram) { this.workerCacheBytes = sizeMb * MEGABYTES; - this.defaultMaxCachedValueBytes = maxCachedValueBytes; + this.defaultMaxCachedEntryBytes = maxCachedEntryBytes; this.enableHistogram = enableHistogram; int stateCacheConcurrencyLevel = Math.max(STATE_CACHE_CONCURRENCY_LEVEL, Runtime.getRuntime().availableProcessors()); @@ -108,7 +109,7 @@ public interface Builder { Builder setSupportMapViaMultimap(boolean supportMapViaMultimap); - Builder setMaxCachedValueBytes(long maxCachedValueBytes); + Builder setMaxCachedEntryBytes(long maxCachedEntryBytes); Builder setEnableHistogram(boolean enableHistogram); @@ -118,17 +119,17 @@ public interface Builder { public static Builder builder() { return new AutoBuilder_WindmillStateCache_Builder() .setSupportMapViaMultimap(false) - .setMaxCachedValueBytes(Long.MAX_VALUE) + .setMaxCachedEntryBytes(Long.MAX_VALUE) .setEnableHistogram(true); } - public void setMaxCachedValueBytesOverride(long limit) { - this.maxCachedValueBytesOverride = limit; + public void setMaxCachedEntryBytesOverride(long limit) { + this.maxCachedEntryBytesOverride = limit; } - private long getMaxCachedValueBytesLimit() { - long override = maxCachedValueBytesOverride; - return override >= 0 ? override : defaultMaxCachedValueBytes; + private long getMaxCachedEntryBytesLimit() { + long override = maxCachedEntryBytesOverride; + return override >= 0 ? override : defaultMaxCachedEntryBytes; } private EntryStats calculateEntryStats() { @@ -173,18 +174,6 @@ public ForComputation forComputation(String computation) { return new ForComputation(computation); } - private static String formatHistogram(long[] histogram) { - return String.format( - "[<128B:%d, <256B:%d, <512B:%d, <1KB:%d, <10KB:%d, <1MB:%d, >=1MB:%d]", - histogram[0], - histogram[1], - histogram[2], - histogram[3], - histogram[4], - histogram[5], - histogram[6]); - } - /** Print summary statistics of the cache to the given {@link PrintWriter}. */ @Override public void appendSummaryHtml(PrintWriter response) { @@ -210,19 +199,19 @@ public void appendSummaryHtml(PrintWriter response) { response.println("Max Weight" + getMaxWeight() / MEGABYTES + "MB"); response.println("Keys" + keyIndex.size() + ""); response.println( - "Value Size Limit" + getMaxCachedValueBytesLimit() + " bytes"); + "Entry Size Limit" + getMaxCachedEntryBytesLimit() + " bytes"); if (enableHistogram) { response.println( "Entry Weight Dist" - + formatHistogram(entryStats.entryWeightHistogram) + + entryStats.entryWeightHistogram.format() + ""); response.println( "Value Weight Dist" - + formatHistogram(entryStats.valueWeightHistogram) + + entryStats.valueWeightHistogram.format() + ""); response.println( "Key Weight Dist" - + formatHistogram(entryStats.keyWeightHistogram) + + entryStats.keyWeightHistogram.format() + ""); } @@ -248,30 +237,20 @@ private static class EntryStats { long entryWeight; long entryValues; long maxEntryValues; - long[] entryWeightHistogram = new long[7]; - long[] valueWeightHistogram = new long[7]; - long[] keyWeightHistogram = new long[7]; + SimpleByteHistogram entryWeightHistogram = new SimpleByteHistogram(); + SimpleByteHistogram valueWeightHistogram = new SimpleByteHistogram(); + SimpleByteHistogram keyWeightHistogram = new SimpleByteHistogram(); void addEntryWeight(long weight) { - entryWeightHistogram[getBucket(weight)]++; + entryWeightHistogram.add(weight); } void addValueWeight(long weight) { - valueWeightHistogram[getBucket(weight)]++; + valueWeightHistogram.add(weight); } void addKeyWeight(long weight) { - keyWeightHistogram[getBucket(weight)]++; - } - - private int getBucket(long weight) { - if (weight < 128) return 0; - if (weight < 256) return 1; - if (weight < 512) return 2; - if (weight < 1024) return 3; - if (weight < 10 * 1024) return 4; - if (weight < 1024 * 1024) return 5; - return 6; + keyWeightHistogram.add(weight); } } @@ -506,7 +485,7 @@ public void put( } public void persist() { - long limit = WindmillStateCache.this.getMaxCachedValueBytesLimit(); + long limit = WindmillStateCache.this.getMaxCachedEntryBytesLimit(); localCache.forEach( (id, entry) -> { if (entry.getWeight() <= limit) { diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCacheTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCacheTest.java index 649206bb3777..2d3d9b5ccff2 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCacheTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCacheTest.java @@ -271,8 +271,8 @@ public void testMaxWeight() throws Exception { } @Test - public void testMaxCachedValueBytes() throws Exception { - cache.setMaxCachedValueBytesOverride( + public void testMaxCachedEntryBytes() throws Exception { + cache.setMaxCachedEntryBytesOverride( 100); // Set limit to 100 bytes, per cache entry overhead is 136. WindmillStateCache.ForKeyAndFamily keyCache = @@ -290,7 +290,7 @@ public void testMaxCachedValueBytes() throws Exception { assertEquals(Optional.empty(), getFromCache(keyCache, StateNamespaces.global(), tag1)); // Now set limit larger. - cache.setMaxCachedValueBytesOverride(1000); + cache.setMaxCachedEntryBytesOverride(1000); putInCache(keyCache, StateNamespaces.global(), tag2, new TestState("g2"), 10); keyCache.persist(); diff --git a/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto b/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto index 99e830621f9b..58e4f7df3c34 100644 --- a/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto +++ b/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto @@ -986,7 +986,7 @@ message UserWorkerRunnerV1Settings { optional ConnectivityType connectivity_type = 4 [default = CONNECTIVITY_TYPE_DEFAULT]; - optional int64 max_cached_value_bytes = 5 [default = -1]; + optional int64 max_cached_entry_bytes = 5 [default = -1]; reserved 1, 2; } From 5097df1b87a1470ee0406cea6139fabc6b4da498 Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Thu, 14 May 2026 08:44:49 +0000 Subject: [PATCH 6/6] Address comments --- .../worker/util/SimpleByteHistogram.java | 43 ++++++++++++++++ .../worker/util/SimpleByteHistogramTest.java | 51 +++++++++++++++++++ 2 files changed, 94 insertions(+) create mode 100644 runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/SimpleByteHistogram.java create mode 100644 runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/SimpleByteHistogramTest.java diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/SimpleByteHistogram.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/SimpleByteHistogram.java new file mode 100644 index 000000000000..6b90ca8df9ef --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/SimpleByteHistogram.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.util; + +/** A simple histogram to track byte sizes. */ +public class SimpleByteHistogram { + private final long[] buckets = new long[7]; + + public void add(long weight) { + buckets[getBucket(weight)]++; + } + + private int getBucket(long weight) { + if (weight < 128) return 0; + if (weight < 256) return 1; + if (weight < 512) return 2; + if (weight < 1024) return 3; + if (weight < 10 * 1024) return 4; + if (weight < 1024 * 1024) return 5; + return 6; + } + + public String format() { + return String.format( + "[<128B:%d, <256B:%d, <512B:%d, <1KB:%d, <10KB:%d, <1MB:%d, >=1MB:%d]", + buckets[0], buckets[1], buckets[2], buckets[3], buckets[4], buckets[5], buckets[6]); + } +} diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/SimpleByteHistogramTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/SimpleByteHistogramTest.java new file mode 100644 index 000000000000..252300a19550 --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/SimpleByteHistogramTest.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.util; + +import static org.junit.Assert.assertEquals; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link SimpleByteHistogram}. */ +@RunWith(JUnit4.class) +public class SimpleByteHistogramTest { + + @Test + public void testHistogram() { + SimpleByteHistogram histogram = new SimpleByteHistogram(); + histogram.add(10); // <128B + histogram.add(127); // <128B + histogram.add(128); // <256B + histogram.add(255); // <256B + histogram.add(256); // <512B + histogram.add(511); // <512B + histogram.add(512); // <1KB + histogram.add(1023); // <1KB + histogram.add(1024); // <10KB + histogram.add(10240 - 1); // <10KB + histogram.add(10240); // <1MB + histogram.add(1048576 - 1); // <1MB + histogram.add(1048576); // >=1MB + histogram.add(2000000); // >=1MB + + String expected = "[<128B:2, <256B:2, <512B:2, <1KB:2, <10KB:2, <1MB:2, >=1MB:2]"; + assertEquals(expected, histogram.format()); + } +}