Skip to content

Commit de7c40b

Browse files
committed
Introduce and use constants for Spanner Change Streams CDC time increment and heartbeat milliseconds.
1 parent b996930 commit de7c40b

2 files changed

Lines changed: 16 additions & 4 deletions

File tree

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,13 @@
1919

2020
import static java.util.stream.Collectors.toList;
2121
import static org.apache.beam.sdk.io.gcp.spanner.MutationUtils.isPointDelete;
22+
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_CDC_TIME_INCREMENT;
2223
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_CHANGE_STREAM_NAME;
24+
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_HEARTBEAT_MILLIS;
2325
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_INCLUSIVE_END_AT;
2426
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_INCLUSIVE_START_AT;
27+
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_LOW_LATENCY_CDC_TIME_INCREMENT;
28+
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_LOW_LATENCY_HEARTBEAT_MILLIS;
2529
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_RPC_PRIORITY;
2630
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_WATERMARK_REFRESH_RATE;
2731
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.MAX_INCLUSIVE_END_AT;
@@ -537,8 +541,8 @@ public static ReadChangeStream readChangeStream() {
537541
.setRpcPriority(DEFAULT_RPC_PRIORITY)
538542
.setInclusiveStartAt(DEFAULT_INCLUSIVE_START_AT)
539543
.setInclusiveEndAt(DEFAULT_INCLUSIVE_END_AT)
540-
.setCdcTimeIncrement(Duration.standardMinutes(2))
541-
.setHeartbeatMillis(2000)
544+
.setCdcTimeIncrement(DEFAULT_CDC_TIME_INCREMENT)
545+
.setHeartbeatMillis(DEFAULT_HEARTBEAT_MILLIS)
542546
.build();
543547
}
544548

@@ -1931,8 +1935,8 @@ public ReadChangeStream withUsingPlainTextChannel(boolean plainText) {
19311935

19321936
public ReadChangeStream withLowLatency() {
19331937
return toBuilder()
1934-
.setCdcTimeIncrement(Duration.standardSeconds(1))
1935-
.setHeartbeatMillis(100)
1938+
.setCdcTimeIncrement(DEFAULT_LOW_LATENCY_CDC_TIME_INCREMENT)
1939+
.setHeartbeatMillis(DEFAULT_LOW_LATENCY_HEARTBEAT_MILLIS)
19361940
.build();
19371941
}
19381942

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/ChangeStreamsConstants.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,14 @@ public class ChangeStreamsConstants {
4949
*/
5050
public static final Timestamp DEFAULT_INCLUSIVE_END_AT = MAX_INCLUSIVE_END_AT;
5151

52+
public static final Duration DEFAULT_CDC_TIME_INCREMENT = Duration.standardMinutes(2);
53+
54+
public static final int DEFAULT_HEARTBEAT_MILLIS = 2000;
55+
56+
public static final Duration DEFAULT_LOW_LATENCY_CDC_TIME_INCREMENT = Duration.standardSeconds(1);
57+
58+
public static final int DEFAULT_LOW_LATENCY_HEARTBEAT_MILLIS = 100;
59+
5260
/** The default priority for a change stream query is {@link RpcPriority#HIGH}. */
5361
public static final RpcPriority DEFAULT_RPC_PRIORITY = RpcPriority.HIGH;
5462

0 commit comments

Comments
 (0)