-
Notifications
You must be signed in to change notification settings - Fork 4.5k
[SpannerIO] Add low-latency configuration in Spanner Change Streams #37580
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,8 +20,11 @@ | |
| import static java.util.stream.Collectors.toList; | ||
| import static org.apache.beam.sdk.io.gcp.spanner.MutationUtils.isPointDelete; | ||
| import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_CHANGE_STREAM_NAME; | ||
| import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_HEARTBEAT_MILLIS; | ||
| import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_INCLUSIVE_END_AT; | ||
| import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_INCLUSIVE_START_AT; | ||
| import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_LOW_LATENCY_REAL_TIME_CHECKPOINT_INTERVAL; | ||
| import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_REAL_TIME_CHECKPOINT_INTERVAL; | ||
| import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_RPC_PRIORITY; | ||
| import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_WATERMARK_REFRESH_RATE; | ||
| import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.MAX_INCLUSIVE_END_AT; | ||
|
|
@@ -537,6 +540,8 @@ public static ReadChangeStream readChangeStream() { | |
| .setRpcPriority(DEFAULT_RPC_PRIORITY) | ||
| .setInclusiveStartAt(DEFAULT_INCLUSIVE_START_AT) | ||
| .setInclusiveEndAt(DEFAULT_INCLUSIVE_END_AT) | ||
| .setRealTimeCheckpointInterval(DEFAULT_REAL_TIME_CHECKPOINT_INTERVAL) | ||
| .setHeartbeatMillis(DEFAULT_HEARTBEAT_MILLIS) | ||
| .build(); | ||
| } | ||
|
|
||
|
|
@@ -1761,6 +1766,10 @@ public abstract static class ReadChangeStream | |
|
|
||
| abstract @Nullable ValueProvider<Boolean> getPlainText(); | ||
|
|
||
| abstract Duration getRealTimeCheckpointInterval(); | ||
|
|
||
| abstract Integer getHeartbeatMillis(); | ||
|
|
||
| abstract Builder toBuilder(); | ||
|
|
||
| @AutoValue.Builder | ||
|
|
@@ -1790,6 +1799,16 @@ abstract static class Builder { | |
|
|
||
| abstract Builder setPlainText(ValueProvider<Boolean> plainText); | ||
|
|
||
| /** | ||
| * When caught up to real-time, checkpoint processing of change stream this often. This sets a | ||
| * bound on latency of processing if a steady trickle of elements prevents the heartbeat | ||
| * interval from triggering. | ||
| */ | ||
| abstract Builder setRealTimeCheckpointInterval(Duration realTimeCheckpointInterval); | ||
|
|
||
| /** Heartbeat interval for all change stream queries. */ | ||
| abstract Builder setHeartbeatMillis(Integer heartbeatMillis); | ||
|
|
||
| abstract ReadChangeStream build(); | ||
| } | ||
|
|
||
|
|
@@ -1912,6 +1931,16 @@ public ReadChangeStream withUsingPlainTextChannel(boolean plainText) { | |
| return withUsingPlainTextChannel(ValueProvider.StaticValueProvider.of(plainText)); | ||
| } | ||
|
|
||
| /** | ||
| * Configures the change stream to checkpoint and flush output targeting low latency at the cost | ||
| * of higher rpc rate and cpu usage. | ||
| */ | ||
| public ReadChangeStream withLowLatency() { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. where is this function gets called?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it would be called by the user:
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so default behaviour is not changed, only users who configure IO with |
||
| return toBuilder() | ||
| .setRealTimeCheckpointInterval(DEFAULT_LOW_LATENCY_REAL_TIME_CHECKPOINT_INTERVAL) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I understand you are lowering this to 1s to try to let the query return early so that you can do checkpoint and move forward the start ts for the next query.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @tianz101 , should I update PR with 100ms heartbeat interval in other PR? Public docs state 1s is minimum. Can we update docs as well so there is no issue from support perspective.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is also issue with clock skew, see comments above. I feel heartbeat PR #37718 is better. |
||
| .build(); | ||
| } | ||
|
|
||
| @Override | ||
| public PCollection<DataChangeRecord> expand(PBegin input) { | ||
| checkArgument( | ||
|
|
@@ -2018,13 +2047,19 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta | |
| MoreObjects.firstNonNull(getWatermarkRefreshRate(), DEFAULT_WATERMARK_REFRESH_RATE); | ||
| final CacheFactory cacheFactory = new CacheFactory(daoFactory, watermarkRefreshRate); | ||
|
|
||
| final long heartbeatMillis = getHeartbeatMillis().longValue(); | ||
|
|
||
| final InitializeDoFn initializeDoFn = | ||
| new InitializeDoFn(daoFactory, mapperFactory, startTimestamp, endTimestamp); | ||
| new InitializeDoFn( | ||
| daoFactory, mapperFactory, startTimestamp, endTimestamp, heartbeatMillis); | ||
| final DetectNewPartitionsDoFn detectNewPartitionsDoFn = | ||
| new DetectNewPartitionsDoFn( | ||
| daoFactory, mapperFactory, actionFactory, cacheFactory, metrics); | ||
| final Duration realTimeCheckpointInterval = getRealTimeCheckpointInterval(); | ||
|
|
||
| final ReadChangeStreamPartitionDoFn readChangeStreamPartitionDoFn = | ||
| new ReadChangeStreamPartitionDoFn(daoFactory, mapperFactory, actionFactory, metrics); | ||
| new ReadChangeStreamPartitionDoFn( | ||
| daoFactory, mapperFactory, actionFactory, metrics, realTimeCheckpointInterval); | ||
| final PostProcessingMetricsDoFn postProcessingMetricsDoFn = | ||
| new PostProcessingMetricsDoFn(metrics); | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -49,6 +49,13 @@ public class ChangeStreamsConstants { | |
| */ | ||
| public static final Timestamp DEFAULT_INCLUSIVE_END_AT = MAX_INCLUSIVE_END_AT; | ||
|
|
||
| public static final Duration DEFAULT_REAL_TIME_CHECKPOINT_INTERVAL = Duration.standardMinutes(2); | ||
|
|
||
| public static final int DEFAULT_HEARTBEAT_MILLIS = 2000; | ||
|
|
||
| public static final Duration DEFAULT_LOW_LATENCY_REAL_TIME_CHECKPOINT_INTERVAL = | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: format to 1 or 2 lines. |
||
| Duration.standardSeconds(1); | ||
|
|
||
| /** The default priority for a change stream query is {@link RpcPriority#HIGH}. */ | ||
| public static final RpcPriority DEFAULT_RPC_PRIORITY = RpcPriority.HIGH; | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -91,6 +91,7 @@ public class QueryChangeStreamAction { | |
| private final PartitionEventRecordAction partitionEventRecordAction; | ||
| private final ChangeStreamMetrics metrics; | ||
| private final boolean isMutableChangeStream; | ||
| private final Duration realTimeCheckpointInterval; | ||
|
|
||
| /** | ||
| * Constructs an action class for performing a change stream query for a given partition. | ||
|
|
@@ -109,6 +110,7 @@ public class QueryChangeStreamAction { | |
| * @param PartitionEventRecordAction action class to process {@link PartitionEventRecord}s | ||
| * @param metrics metrics gathering class | ||
| * @param isMutableChangeStream whether the change stream is mutable or not | ||
| * @param realTimeCheckpointInterval duration to add to current time | ||
| */ | ||
| QueryChangeStreamAction( | ||
| ChangeStreamDao changeStreamDao, | ||
|
|
@@ -122,7 +124,8 @@ public class QueryChangeStreamAction { | |
| PartitionEndRecordAction partitionEndRecordAction, | ||
| PartitionEventRecordAction partitionEventRecordAction, | ||
| ChangeStreamMetrics metrics, | ||
| boolean isMutableChangeStream) { | ||
| boolean isMutableChangeStream, | ||
| Duration realTimeCheckpointInterval) { | ||
| this.changeStreamDao = changeStreamDao; | ||
| this.partitionMetadataDao = partitionMetadataDao; | ||
| this.changeStreamRecordMapper = changeStreamRecordMapper; | ||
|
|
@@ -135,6 +138,7 @@ public class QueryChangeStreamAction { | |
| this.partitionEventRecordAction = partitionEventRecordAction; | ||
| this.metrics = metrics; | ||
| this.isMutableChangeStream = isMutableChangeStream; | ||
| this.realTimeCheckpointInterval = realTimeCheckpointInterval; | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -387,12 +391,12 @@ private boolean isTimestampOutOfRange(SpannerException e) { | |
| && e.getMessage().contains(OUT_OF_RANGE_ERROR_MESSAGE); | ||
| } | ||
|
|
||
| // Return (now + 2 mins) as the end timestamp for reading change streams. This is only used if | ||
| // users want to run the connector forever. If the end timestamp is reached, we will resume | ||
| // processing from that timestamp on a subsequent DoFn execution. | ||
| // Return (now + config duration) as the end timestamp for reading change streams. This is only | ||
| // used if users want to run the connector forever. If the end timestamp is reached, we | ||
| // will resume processing from that timestamp on a subsequent DoFn execution. | ||
| private Timestamp getNextReadChangeStreamEndTimestamp() { | ||
| final Timestamp current = Timestamp.now(); | ||
| return Timestamp.ofTimeSecondsAndNanos(current.getSeconds() + 2 * 60, current.getNanos()); | ||
| return Timestamp.ofTimeMicroseconds( | ||
| Instant.now().plus(realTimeCheckpointInterval).getMillis() * 1000L); | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If end_time is calculated as now() + 2m it's very unlikely that clock skew will impact pulling data.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if we could detect that by capturing the now() we used before starting the query and then comparing now() at the end of the query versus that start time. If the duration is a lot less than our desired realtimecheckpointinterval, it means that our local clock is slow compared to the spanner true time. We could keep track of that and try to adjust our clock values for future calls.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you mean something like: I think I really prefer heartbeat PR as we don't have to include such heuristic :) |
||
| } | ||
|
|
||
| // For Mutable Change Stream bounded queries, update the query end timestamp to be within 2 | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.