diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java index 2f16a64b0d76..face2ef5841a 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java @@ -153,6 +153,13 @@ public interface BigQueryOptions void setStorageWriteMaxInflightBytes(Long value); + @Description( + "Maximum time in seconds a Storage Write API append request is allowed to wait in the " + + "request callback queue before timing out. Overrides Storage Write API default (5 min)") + Integer getStorageWriteApiMaxRequestCallbackWaitTimeSec(); + + void setStorageWriteApiMaxRequestCallbackWaitTimeSec(Integer value); + @Description( "Enables multiplexing mode, where multiple tables can share the same connection. Only available when writing with STORAGE_API_AT_LEAST_ONCE" + " mode. This is recommended if your write operation is creating 20+ connections. When using multiplexing, consider tuning " diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java index aa9a5fd310b0..14765a65ff0b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java @@ -1496,6 +1496,7 @@ public static class WriteStreamServiceImpl implements WriteStreamService { private final BigQueryWriteClient newWriteClient; private final long storageWriteMaxInflightRequests; private final long storageWriteMaxInflightBytes; + private final @Nullable Integer storageWriteApiMaxRequestCallbackWaitTimeSec; private final BigQueryIOMetadata bqIOMetadata; private final PipelineOptions options; @@ -1506,6 +1507,8 @@ public static class WriteStreamServiceImpl implements WriteStreamService { this.options = options; this.storageWriteMaxInflightRequests = bqOptions.getStorageWriteMaxInflightRequests(); this.storageWriteMaxInflightBytes = bqOptions.getStorageWriteMaxInflightBytes(); + this.storageWriteApiMaxRequestCallbackWaitTimeSec = + bqOptions.getStorageWriteApiMaxRequestCallbackWaitTimeSec(); this.bqIOMetadata = BigQueryIOMetadata.create(); } @@ -1514,6 +1517,8 @@ public WriteStreamServiceImpl(BigQueryOptions bqOptions) { this.options = bqOptions; this.storageWriteMaxInflightRequests = bqOptions.getStorageWriteMaxInflightRequests(); this.storageWriteMaxInflightBytes = bqOptions.getStorageWriteMaxInflightBytes(); + this.storageWriteApiMaxRequestCallbackWaitTimeSec = + bqOptions.getStorageWriteApiMaxRequestCallbackWaitTimeSec(); this.bqIOMetadata = BigQueryIOMetadata.create(); } @@ -1578,6 +1583,11 @@ public StreamAppendClient getStreamAppendClient( options.as(BigQueryOptions.class).getMaxConnectionPoolConnections()) .build()); + if (storageWriteApiMaxRequestCallbackWaitTimeSec != null) { + StreamWriter.setMaxRequestCallbackWaitTime( + java.time.Duration.ofSeconds(storageWriteApiMaxRequestCallbackWaitTimeSec)); + } + StreamWriter streamWriter = StreamWriter.newBuilder(streamName, newWriteClient) .setExecutorProvider(