Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,13 @@ public interface BigQueryOptions

void setStorageWriteMaxInflightBytes(Long value);

@Description(
"Maximum time in seconds a Storage Write API append request is allowed to wait in the "
+ "request callback queue before timing out. Overrides Storage Write API default (5 min)")
Comment thread
claudevdm marked this conversation as resolved.
Integer getStorageWriteApiMaxRequestCallbackWaitTimeSec();

void setStorageWriteApiMaxRequestCallbackWaitTimeSec(Integer value);
Comment thread
claudevdm marked this conversation as resolved.
Comment thread
claudevdm marked this conversation as resolved.

@Description(
"Enables multiplexing mode, where multiple tables can share the same connection. Only available when writing with STORAGE_API_AT_LEAST_ONCE"
+ " mode. This is recommended if your write operation is creating 20+ connections. When using multiplexing, consider tuning "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1496,6 +1496,7 @@ public static class WriteStreamServiceImpl implements WriteStreamService {
private final BigQueryWriteClient newWriteClient;
private final long storageWriteMaxInflightRequests;
private final long storageWriteMaxInflightBytes;
private final @Nullable Integer storageWriteApiMaxRequestCallbackWaitTimeSec;
Comment thread
claudevdm marked this conversation as resolved.
Comment thread
claudevdm marked this conversation as resolved.
private final BigQueryIOMetadata bqIOMetadata;
private final PipelineOptions options;

Expand All @@ -1506,6 +1507,8 @@ public static class WriteStreamServiceImpl implements WriteStreamService {
this.options = options;
this.storageWriteMaxInflightRequests = bqOptions.getStorageWriteMaxInflightRequests();
this.storageWriteMaxInflightBytes = bqOptions.getStorageWriteMaxInflightBytes();
this.storageWriteApiMaxRequestCallbackWaitTimeSec =
bqOptions.getStorageWriteApiMaxRequestCallbackWaitTimeSec();
this.bqIOMetadata = BigQueryIOMetadata.create();
}

Expand All @@ -1514,6 +1517,8 @@ public WriteStreamServiceImpl(BigQueryOptions bqOptions) {
this.options = bqOptions;
this.storageWriteMaxInflightRequests = bqOptions.getStorageWriteMaxInflightRequests();
this.storageWriteMaxInflightBytes = bqOptions.getStorageWriteMaxInflightBytes();
this.storageWriteApiMaxRequestCallbackWaitTimeSec =
bqOptions.getStorageWriteApiMaxRequestCallbackWaitTimeSec();
this.bqIOMetadata = BigQueryIOMetadata.create();
}

Expand Down Expand Up @@ -1578,6 +1583,11 @@ public StreamAppendClient getStreamAppendClient(
options.as(BigQueryOptions.class).getMaxConnectionPoolConnections())
.build());

if (storageWriteApiMaxRequestCallbackWaitTimeSec != null) {
StreamWriter.setMaxRequestCallbackWaitTime(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's a little bit strange that setMaxRequestCallbackWaitTime is a public static method.

This means users have a workaround that is to set it directly using a JvmInitializer or other mechanism before this gets released

java.time.Duration.ofSeconds(storageWriteApiMaxRequestCallbackWaitTimeSec));
}
Comment thread
claudevdm marked this conversation as resolved.
Comment thread
claudevdm marked this conversation as resolved.

StreamWriter streamWriter =
StreamWriter.newBuilder(streamName, newWriteClient)
.setExecutorProvider(
Expand Down
Loading