diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java index 9727048e47aa..600bb9b7f209 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java @@ -249,6 +249,16 @@ public interface DataflowStreamingPipelineOptions extends PipelineOptions { void setIsWindmillServiceDirectPathEnabled(boolean isWindmillServiceDirectPathEnabled); + /** + * The maximum size of cached values in bytes. Values larger than this limit will not be cached by + * the windmill state cache + */ + @Description("The maximum size of cached values in bytes.") + @Default.Long(Long.MAX_VALUE) + Long getMaxWindmillStateCacheValueBytes(); + + void setMaxWindmillStateCacheValueBytes(Long value); + /** * Factory for creating local Windmill address. Reads from system propery 'windmill.hostport' for * backwards compatibility. diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index f5e5adab1556..580a29d9d48c 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -50,6 +50,7 @@ import org.apache.beam.runners.dataflow.worker.streaming.WeightedSemaphore; import org.apache.beam.runners.dataflow.worker.streaming.WorkHeartbeatResponseProcessor; import org.apache.beam.runners.dataflow.worker.streaming.config.ComputationConfig; +import org.apache.beam.runners.dataflow.worker.streaming.config.ComputationConfig.Fetcher; import org.apache.beam.runners.dataflow.worker.streaming.config.FixedGlobalConfigHandle; import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingApplianceComputationConfigFetcher; import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingEngineComputationConfigFetcher; @@ -113,6 +114,7 @@ import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySinkMetrics; import org.apache.beam.sdk.metrics.MetricsEnvironment; +import org.apache.beam.sdk.options.ExperimentalOptions; import org.apache.beam.sdk.util.construction.CoderTranslation; import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.auth.MoreCallCredentials; @@ -633,6 +635,10 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o WindmillStateCache.builder() .setSizeMb(options.getWorkerCacheMb()) .setSupportMapViaMultimap(options.isEnableStreamingEngine()) + .setMaxCachedValueBytes(options.getMaxWindmillStateCacheValueBytes()) + .setEnableHistogram( + !ExperimentalOptions.hasExperiment( + options, "disable_windmill_user_state_cache_histogram")) .build(); GrpcWindmillStreamFactory.Builder windmillStreamFactoryBuilder = @@ -651,6 +657,15 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o windmillStateCache::forComputation, ID_GENERATOR)); + Fetcher configFetcher = configFetcherComputationStateCacheAndWindmillClient.configFetcher(); + configFetcher + .getGlobalConfigHandle() + .registerConfigObserver( + config -> { + windmillStateCache.setMaxCachedValueBytesOverride( + config.userWorkerJobSettings().getMaxCachedValueBytes()); + }); + ComputationStateCache computationStateCache = configFetcherComputationStateCacheAndWindmillClient.computationStateCache(); WindmillServerStub windmillServer = @@ -689,7 +704,7 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o return new StreamingDataflowWorker( windmillServer, clientId, - configFetcherComputationStateCacheAndWindmillClient.configFetcher(), + configFetcher, computationStateCache, windmillStateCache, workExecutor, diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCache.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCache.java index 07c9599c866a..722f4d88c2c5 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCache.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCache.java @@ -75,9 +75,18 @@ public class WindmillStateCache implements StatusDataProvider { private final ConcurrentMap keyIndex; private final long workerCacheBytes; // Copy workerCacheMb and convert to bytes. private final boolean supportMapViaMultimap; - - WindmillStateCache(long sizeMb, boolean supportMapViaMultimap) { + private final long defaultMaxCachedValueBytes; + private final boolean enableHistogram; + private volatile long maxCachedValueBytesOverride = -1L; + + WindmillStateCache( + long sizeMb, + boolean supportMapViaMultimap, + long maxCachedValueBytes, + boolean enableHistogram) { this.workerCacheBytes = sizeMb * MEGABYTES; + this.defaultMaxCachedValueBytes = maxCachedValueBytes; + this.enableHistogram = enableHistogram; int stateCacheConcurrencyLevel = Math.max(STATE_CACHE_CONCURRENCY_LEVEL, Runtime.getRuntime().availableProcessors()); this.stateCache = @@ -99,11 +108,27 @@ public interface Builder { Builder setSupportMapViaMultimap(boolean supportMapViaMultimap); + Builder setMaxCachedValueBytes(long maxCachedValueBytes); + + Builder setEnableHistogram(boolean enableHistogram); + WindmillStateCache build(); } public static Builder builder() { - return new AutoBuilder_WindmillStateCache_Builder().setSupportMapViaMultimap(false); + return new AutoBuilder_WindmillStateCache_Builder() + .setSupportMapViaMultimap(false) + .setMaxCachedValueBytes(Long.MAX_VALUE) + .setEnableHistogram(true); + } + + public void setMaxCachedValueBytesOverride(long limit) { + this.maxCachedValueBytesOverride = limit; + } + + private long getMaxCachedValueBytesLimit() { + long override = maxCachedValueBytesOverride; + return override >= 0 ? override : defaultMaxCachedValueBytes; } private EntryStats calculateEntryStats() { @@ -111,10 +136,20 @@ private EntryStats calculateEntryStats() { BiConsumer consumer = (stateId, stateCacheEntry) -> { stats.entries++; - stats.idWeight += stateId.getWeight(); - stats.entryWeight += stateCacheEntry.getWeight(); + long idWeight = stateId.getWeight(); + stats.idWeight += idWeight; + long entryWeight = stateCacheEntry.getWeight(); + stats.entryWeight += entryWeight; stats.entryValues += stateCacheEntry.values.size(); stats.maxEntryValues = Math.max(stats.maxEntryValues, stateCacheEntry.values.size()); + if (enableHistogram) { + stats.addKeyWeight(idWeight); + stats.addEntryWeight(entryWeight); + stateCacheEntry.values.forEach( + (encodedAddress, weightedValue) -> { + stats.addValueWeight(weightedValue.weight); + }); + } }; stateCache.asMap().forEach(consumer); return stats; @@ -138,27 +173,60 @@ public ForComputation forComputation(String computation) { return new ForComputation(computation); } + private static String formatHistogram(long[] histogram) { + return String.format( + "[<128B:%d, <256B:%d, <512B:%d, <1KB:%d, <10KB:%d, <1MB:%d, >=1MB:%d]", + histogram[0], + histogram[1], + histogram[2], + histogram[3], + histogram[4], + histogram[5], + histogram[6]); + } + /** Print summary statistics of the cache to the given {@link PrintWriter}. */ @Override public void appendSummaryHtml(PrintWriter response) { response.println("Cache Stats:
"); - response.println( - "" - + "" - + "" - + ""); CacheStats cacheStats = stateCache.stats(); EntryStats entryStats = calculateEntryStats(); - response.println(""); - response.println(""); - response.println(""); - response.println(""); - response.println(""); - response.println(""); - response.println(""); - response.println(""); - response.println(""); - response.println("
Hit RatioEvictionsEntriesEntry ValuesMax Entry ValuesId WeightEntry WeightMax WeightKeys
" + cacheStats.hitRate() + "" + cacheStats.evictionCount() + "" + entryStats.entries + "(" + stateCache.size() + " inc. weak) " + entryStats.entryValues + "" + entryStats.maxEntryValues + "" + entryStats.idWeight / MEGABYTES + "MB" + entryStats.entryWeight / MEGABYTES + "MB" + getMaxWeight() / MEGABYTES + "MB" + keyIndex.size() + "

"); + + response.println("Hit Ratio" + cacheStats.hitRate() + ""); + response.println("Evictions" + cacheStats.evictionCount() + ""); + response.println( + "Entries" + + entryStats.entries + + " (" + + stateCache.size() + + " inc. weak)"); + response.println("Entry Values" + entryStats.entryValues + ""); + response.println( + "Max Entry Values" + entryStats.maxEntryValues + ""); + response.println( + "Id Weight" + entryStats.idWeight / MEGABYTES + "MB"); + response.println( + "Entry Weight" + entryStats.entryWeight / MEGABYTES + "MB"); + response.println("Max Weight" + getMaxWeight() / MEGABYTES + "MB"); + response.println("Keys" + keyIndex.size() + ""); + response.println( + "Value Size Limit" + getMaxCachedValueBytesLimit() + " bytes"); + if (enableHistogram) { + response.println( + "Entry Weight Dist" + + formatHistogram(entryStats.entryWeightHistogram) + + ""); + response.println( + "Value Weight Dist" + + formatHistogram(entryStats.valueWeightHistogram) + + ""); + response.println( + "Key Weight Dist" + + formatHistogram(entryStats.keyWeightHistogram) + + ""); + } + + response.println("
"); } public BaseStatusServlet statusServlet() { @@ -180,6 +248,31 @@ private static class EntryStats { long entryWeight; long entryValues; long maxEntryValues; + long[] entryWeightHistogram = new long[7]; + long[] valueWeightHistogram = new long[7]; + long[] keyWeightHistogram = new long[7]; + + void addEntryWeight(long weight) { + entryWeightHistogram[getBucket(weight)]++; + } + + void addValueWeight(long weight) { + valueWeightHistogram[getBucket(weight)]++; + } + + void addKeyWeight(long weight) { + keyWeightHistogram[getBucket(weight)]++; + } + + private int getBucket(long weight) { + if (weight < 128) return 0; + if (weight < 256) return 1; + if (weight < 512) return 2; + if (weight < 1024) return 3; + if (weight < 10 * 1024) return 4; + if (weight < 1024 * 1024) return 5; + return 6; + } } /** @@ -413,7 +506,15 @@ public void put( } public void persist() { - localCache.forEach(stateCache::put); + long limit = WindmillStateCache.this.getMaxCachedValueBytesLimit(); + localCache.forEach( + (id, entry) -> { + if (entry.getWeight() <= limit) { + stateCache.put(id, entry); + } else { + stateCache.invalidate(id); + } + }); } } } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCacheTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCacheTest.java index bbb8e4c93c07..649206bb3777 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCacheTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCacheTest.java @@ -270,6 +270,68 @@ public void testMaxWeight() throws Exception { assertEquals(400 * MEGABYTES, cache.getMaxWeight()); } + @Test + public void testMaxCachedValueBytes() throws Exception { + cache.setMaxCachedValueBytesOverride( + 100); // Set limit to 100 bytes, per cache entry overhead is 136. + + WindmillStateCache.ForKeyAndFamily keyCache = + cache.forComputation(COMPUTATION).forKey(COMPUTATION_KEY, 0L, 1L).forFamily(STATE_FAMILY); + + TestStateTag tag1 = new TestStateTag("tag1"); + TestStateTag tag2 = new TestStateTag("tag2"); + + putInCache(keyCache, StateNamespaces.global(), tag1, new TestState("g1"), 10); + keyCache.persist(); + + // It should not be in global cache because it's too large. + keyCache = + cache.forComputation(COMPUTATION).forKey(COMPUTATION_KEY, 0L, 2L).forFamily(STATE_FAMILY); + assertEquals(Optional.empty(), getFromCache(keyCache, StateNamespaces.global(), tag1)); + + // Now set limit larger. + cache.setMaxCachedValueBytesOverride(1000); + + putInCache(keyCache, StateNamespaces.global(), tag2, new TestState("g2"), 10); + keyCache.persist(); + + // It should be in global cache. + keyCache = + cache.forComputation(COMPUTATION).forKey(COMPUTATION_KEY, 0L, 3L).forFamily(STATE_FAMILY); + assertEquals( + Optional.of(new TestState("g2")), getFromCache(keyCache, StateNamespaces.global(), tag2)); + + // Now update it to be larger than limit. + putInCache(keyCache, StateNamespaces.global(), tag2, new TestState("g2_large"), 2000); + keyCache.persist(); + + // It should be removed from global cache. + keyCache = + cache.forComputation(COMPUTATION).forKey(COMPUTATION_KEY, 0L, 4L).forFamily(STATE_FAMILY); + assertEquals(Optional.empty(), getFromCache(keyCache, StateNamespaces.global(), tag2)); + } + + @Test + public void testDisableHistogram() throws Exception { + WindmillStateCache noHistogramCache = + WindmillStateCache.builder().setSizeMb(400).setEnableHistogram(false).build(); + WindmillStateCache.ForKeyAndFamily keyCache = + noHistogramCache + .forComputation(COMPUTATION) + .forKey(COMPUTATION_KEY, 0L, 1L) + .forFamily(STATE_FAMILY); + + putInCache( + keyCache, StateNamespaces.global(), new TestStateTag("tag1"), new TestState("g1"), 2); + keyCache.persist(); + + java.io.StringWriter writer = new java.io.StringWriter(); + noHistogramCache.appendSummaryHtml(new java.io.PrintWriter(writer)); + String summary = writer.toString(); + + org.junit.Assert.assertFalse(summary.contains("Entry Weight Dist")); + } + /** Verifies that values are cached in the appropriate namespaces. */ @Test public void testInvalidation() throws Exception { diff --git a/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto b/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto index b7579cbacb8e..99e830621f9b 100644 --- a/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto +++ b/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto @@ -986,6 +986,8 @@ message UserWorkerRunnerV1Settings { optional ConnectivityType connectivity_type = 4 [default = CONNECTIVITY_TYPE_DEFAULT]; + optional int64 max_cached_value_bytes = 5 [default = -1]; + reserved 1, 2; }