diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java index 3a69d1177f4a..bfc6cd6a7ffc 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java @@ -20,8 +20,11 @@ import static java.util.stream.Collectors.toList; import static org.apache.beam.sdk.io.gcp.spanner.MutationUtils.isPointDelete; import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_CHANGE_STREAM_NAME; +import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_HEARTBEAT_MILLIS; import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_INCLUSIVE_END_AT; import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_INCLUSIVE_START_AT; +import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_LOW_LATENCY_DEFAULT_HEARTBEAT_MILLIS; +import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_REAL_TIME_CHECKPOINT_INTERVAL; import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_RPC_PRIORITY; import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_WATERMARK_REFRESH_RATE; import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.MAX_INCLUSIVE_END_AT; @@ -537,6 +540,9 @@ public static ReadChangeStream readChangeStream() { .setRpcPriority(DEFAULT_RPC_PRIORITY) .setInclusiveStartAt(DEFAULT_INCLUSIVE_START_AT) .setInclusiveEndAt(DEFAULT_INCLUSIVE_END_AT) + .setRealTimeCheckpointInterval(DEFAULT_REAL_TIME_CHECKPOINT_INTERVAL) + .setHeartbeatMillis(DEFAULT_HEARTBEAT_MILLIS) + .setCancelQueryOnHeartbeat(false) .build(); } @@ -1761,6 +1767,12 @@ public abstract static class ReadChangeStream abstract @Nullable ValueProvider getPlainText(); + abstract Duration getRealTimeCheckpointInterval(); + + abstract Integer getHeartbeatMillis(); + + abstract boolean getCancelQueryOnHeartbeat(); + abstract Builder toBuilder(); @AutoValue.Builder @@ -1790,6 +1802,18 @@ abstract static class Builder { abstract Builder setPlainText(ValueProvider plainText); + /** + * When caught up to real-time, checkpoint processing of change stream this often. This sets a + * bound on latency of processing if a steady trickle of elements prevents the heartbeat + * interval from triggering. + */ + abstract Builder setRealTimeCheckpointInterval(Duration realTimeCheckpointInterval); + + /** Heartbeat interval for all change stream queries. */ + abstract Builder setHeartbeatMillis(Integer heartbeatMillis); + + abstract Builder setCancelQueryOnHeartbeat(boolean cancelQueryOnHeartbeat); + abstract ReadChangeStream build(); } @@ -1912,6 +1936,17 @@ public ReadChangeStream withUsingPlainTextChannel(boolean plainText) { return withUsingPlainTextChannel(ValueProvider.StaticValueProvider.of(plainText)); } + /** + * Configures the change stream to checkpoint and flush output targeting low latency at the cost + * of higher rpc rate and cpu usage. + */ + public ReadChangeStream withLowLatency() { + return toBuilder() + .setHeartbeatMillis(DEFAULT_LOW_LATENCY_DEFAULT_HEARTBEAT_MILLIS) + .setCancelQueryOnHeartbeat(true) + .build(); + } + @Override public PCollection expand(PBegin input) { checkArgument( @@ -2018,13 +2053,24 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta MoreObjects.firstNonNull(getWatermarkRefreshRate(), DEFAULT_WATERMARK_REFRESH_RATE); final CacheFactory cacheFactory = new CacheFactory(daoFactory, watermarkRefreshRate); + final long heartbeatMillis = getHeartbeatMillis().longValue(); + final InitializeDoFn initializeDoFn = - new InitializeDoFn(daoFactory, mapperFactory, startTimestamp, endTimestamp); + new InitializeDoFn( + daoFactory, mapperFactory, startTimestamp, endTimestamp, heartbeatMillis); final DetectNewPartitionsDoFn detectNewPartitionsDoFn = new DetectNewPartitionsDoFn( daoFactory, mapperFactory, actionFactory, cacheFactory, metrics); + final Duration realTimeCheckpointInterval = getRealTimeCheckpointInterval(); + final ReadChangeStreamPartitionDoFn readChangeStreamPartitionDoFn = - new ReadChangeStreamPartitionDoFn(daoFactory, mapperFactory, actionFactory, metrics); + new ReadChangeStreamPartitionDoFn( + daoFactory, + mapperFactory, + actionFactory, + metrics, + realTimeCheckpointInterval, + getCancelQueryOnHeartbeat()); final PostProcessingMetricsDoFn postProcessingMetricsDoFn = new PostProcessingMetricsDoFn(metrics); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/ChangeStreamsConstants.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/ChangeStreamsConstants.java index db09adb0f27e..b45ae0e602be 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/ChangeStreamsConstants.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/ChangeStreamsConstants.java @@ -49,6 +49,12 @@ public class ChangeStreamsConstants { */ public static final Timestamp DEFAULT_INCLUSIVE_END_AT = MAX_INCLUSIVE_END_AT; + public static final Duration DEFAULT_REAL_TIME_CHECKPOINT_INTERVAL = Duration.standardMinutes(2); + + public static final int DEFAULT_HEARTBEAT_MILLIS = 2000; + + public static final int DEFAULT_LOW_LATENCY_DEFAULT_HEARTBEAT_MILLIS = 100; + /** The default priority for a change stream query is {@link RpcPriority#HIGH}. */ public static final RpcPriority DEFAULT_RPC_PRIORITY = RpcPriority.HIGH; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ActionFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ActionFactory.java index cd84168b23f7..6850d77cbf52 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ActionFactory.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ActionFactory.java @@ -71,9 +71,10 @@ public synchronized DataChangeRecordAction dataChangeRecordAction( * @param metrics metrics gathering class * @return singleton instance of the {@link HeartbeatRecordAction} */ - public synchronized HeartbeatRecordAction heartbeatRecordAction(ChangeStreamMetrics metrics) { + public synchronized HeartbeatRecordAction heartbeatRecordAction( + ChangeStreamMetrics metrics, boolean cancelQueryOnHeartbeat) { if (heartbeatRecordActionInstance == null) { - heartbeatRecordActionInstance = new HeartbeatRecordAction(metrics); + heartbeatRecordActionInstance = new HeartbeatRecordAction(metrics, cancelQueryOnHeartbeat); } return heartbeatRecordActionInstance; } @@ -174,6 +175,7 @@ public synchronized PartitionEventRecordAction partitionEventRecordAction( * @param partitionEventRecordAction action class to process {@link * org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionEventRecord}s * @param metrics metrics gathering class + * @param realTimeCheckpointInterval the duration added to current time for the end timestamp * @return single instance of the {@link QueryChangeStreamAction} */ public synchronized QueryChangeStreamAction queryChangeStreamAction( @@ -188,7 +190,8 @@ public synchronized QueryChangeStreamAction queryChangeStreamAction( PartitionEndRecordAction partitionEndRecordAction, PartitionEventRecordAction partitionEventRecordAction, ChangeStreamMetrics metrics, - boolean isMutableChangeStream) { + boolean isMutableChangeStream, + Duration realTimeCheckpointInterval) { if (queryChangeStreamActionInstance == null) { queryChangeStreamActionInstance = new QueryChangeStreamAction( @@ -203,7 +206,8 @@ public synchronized QueryChangeStreamAction queryChangeStreamAction( partitionEndRecordAction, partitionEventRecordAction, metrics, - isMutableChangeStream); + isMutableChangeStream, + realTimeCheckpointInterval); } return queryChangeStreamActionInstance; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordAction.java index 0937e896fbf1..14fa6f96ed3d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordAction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordAction.java @@ -41,14 +41,16 @@ public class HeartbeatRecordAction { private static final Logger LOG = LoggerFactory.getLogger(HeartbeatRecordAction.class); private final ChangeStreamMetrics metrics; + private final boolean cancelQueryOnHeartbeat; /** * Constructs an action class for handling {@link HeartbeatRecord}s. * * @param metrics metrics gathering class */ - HeartbeatRecordAction(ChangeStreamMetrics metrics) { + HeartbeatRecordAction(ChangeStreamMetrics metrics, boolean cancelQueryOnHeartbeat) { this.metrics = metrics; + this.cancelQueryOnHeartbeat = cancelQueryOnHeartbeat; } /** @@ -76,7 +78,8 @@ public Optional run( HeartbeatRecord record, RestrictionTracker tracker, RestrictionInterrupter interrupter, - ManualWatermarkEstimator watermarkEstimator) { + ManualWatermarkEstimator watermarkEstimator, + Timestamp endTimestamp) { final String token = partition.getPartitionToken(); LOG.debug("[{}] Processing heartbeat record {}", token, record); @@ -96,6 +99,11 @@ public Optional run( watermarkEstimator.setWatermark(timestampInstant); LOG.debug("[{}] Heartbeat record action completed successfully", token); - return Optional.empty(); + if (timestamp.equals(endTimestamp)) { + // this is probably last element in query, let it finish query + return Optional.empty(); + } + // no new data, finish reading data + return Optional.of(ProcessContinuation.resume()); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java index 69e89e74a38b..3b8ad06a4ffa 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java @@ -91,6 +91,7 @@ public class QueryChangeStreamAction { private final PartitionEventRecordAction partitionEventRecordAction; private final ChangeStreamMetrics metrics; private final boolean isMutableChangeStream; + private final Duration realTimeCheckpointInterval; /** * Constructs an action class for performing a change stream query for a given partition. @@ -109,6 +110,7 @@ public class QueryChangeStreamAction { * @param PartitionEventRecordAction action class to process {@link PartitionEventRecord}s * @param metrics metrics gathering class * @param isMutableChangeStream whether the change stream is mutable or not + * @param realTimeCheckpointInterval duration to add to current time */ QueryChangeStreamAction( ChangeStreamDao changeStreamDao, @@ -122,7 +124,8 @@ public class QueryChangeStreamAction { PartitionEndRecordAction partitionEndRecordAction, PartitionEventRecordAction partitionEventRecordAction, ChangeStreamMetrics metrics, - boolean isMutableChangeStream) { + boolean isMutableChangeStream, + Duration realTimeCheckpointInterval) { this.changeStreamDao = changeStreamDao; this.partitionMetadataDao = partitionMetadataDao; this.changeStreamRecordMapper = changeStreamRecordMapper; @@ -135,6 +138,7 @@ public class QueryChangeStreamAction { this.partitionEventRecordAction = partitionEventRecordAction; this.metrics = metrics; this.isMutableChangeStream = isMutableChangeStream; + this.realTimeCheckpointInterval = realTimeCheckpointInterval; } /** @@ -244,7 +248,8 @@ public ProcessContinuation run( (HeartbeatRecord) record, tracker, interrupter, - watermarkEstimator); + watermarkEstimator, + endTimestamp); } else if (record instanceof ChildPartitionsRecord) { maybeContinuation = childPartitionsRecordAction.run( @@ -387,12 +392,12 @@ private boolean isTimestampOutOfRange(SpannerException e) { && e.getMessage().contains(OUT_OF_RANGE_ERROR_MESSAGE); } - // Return (now + 2 mins) as the end timestamp for reading change streams. This is only used if - // users want to run the connector forever. If the end timestamp is reached, we will resume - // processing from that timestamp on a subsequent DoFn execution. + // Return (now + config duration) as the end timestamp for reading change streams. This is only + // used if users want to run the connector forever. If the end timestamp is reached, we + // will resume processing from that timestamp on a subsequent DoFn execution. private Timestamp getNextReadChangeStreamEndTimestamp() { - final Timestamp current = Timestamp.now(); - return Timestamp.ofTimeSecondsAndNanos(current.getSeconds() + 2 * 60, current.getNanos()); + return Timestamp.ofTimeMicroseconds( + Instant.now().plus(realTimeCheckpointInterval).getMillis() * 1000L); } // For Mutable Change Stream bounded queries, update the query end timestamp to be within 2 diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/InitializeDoFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/InitializeDoFn.java index 60eb96ca3387..4191f2d93594 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/InitializeDoFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/InitializeDoFn.java @@ -36,11 +36,7 @@ public class InitializeDoFn extends DoFn implements S private static final long serialVersionUID = -8921188388649003102L; - /** Heartbeat interval for all change stream queries will be of 2 seconds. */ - // Be careful when changing this interval, as it needs to be less than the checkpointing interval - // in Dataflow. Otherwise, if there are no records within checkpoint intervals, the consuming of - // a change stream query might get stuck. - private static final long DEFAULT_HEARTBEAT_MILLIS = 2000; + private final long heartbeatMillis; private final DaoFactory daoFactory; private final MapperFactory mapperFactory; @@ -53,11 +49,13 @@ public InitializeDoFn( DaoFactory daoFactory, MapperFactory mapperFactory, com.google.cloud.Timestamp startTimestamp, - com.google.cloud.Timestamp endTimestamp) { + com.google.cloud.Timestamp endTimestamp, + long heartbeatMillis) { this.daoFactory = daoFactory; this.mapperFactory = mapperFactory; this.startTimestamp = startTimestamp; this.endTimestamp = endTimestamp; + this.heartbeatMillis = heartbeatMillis; } @ProcessElement @@ -88,7 +86,7 @@ private void createFakeParentPartition() { .setPartitionToken(InitialPartition.PARTITION_TOKEN) .setStartTimestamp(startTimestamp) .setEndTimestamp(endTimestamp) - .setHeartbeatMillis(DEFAULT_HEARTBEAT_MILLIS) + .setHeartbeatMillis(heartbeatMillis) .setState(State.CREATED) .setWatermark(startTimestamp) .build(); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java index c3650b42761b..750865efbf02 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java @@ -74,12 +74,15 @@ public class ReadChangeStreamPartitionDoFn extends DoFn throughputEstimator; + private final Duration realTimeCheckpointInterval; + private transient QueryChangeStreamAction queryChangeStreamAction; /** @@ -95,17 +98,23 @@ public class ReadChangeStreamPartitionDoFn extends DoFn(); } @@ -195,7 +204,7 @@ public void setup() { final DataChangeRecordAction dataChangeRecordAction = actionFactory.dataChangeRecordAction(throughputEstimator); final HeartbeatRecordAction heartbeatRecordAction = - actionFactory.heartbeatRecordAction(metrics); + actionFactory.heartbeatRecordAction(metrics, cancelQueryOnHeartbeat); final ChildPartitionsRecordAction childPartitionsRecordAction = actionFactory.childPartitionsRecordAction(partitionMetadataDao, metrics); final PartitionStartRecordAction partitionStartRecordAction = @@ -218,7 +227,8 @@ public void setup() { partitionEndRecordAction, partitionEventRecordAction, metrics, - isMutableChangeStream); + isMutableChangeStream, + realTimeCheckpointInterval); } /** diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordActionTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordActionTest.java index 56d1825c8a18..d068bd1fb8e9 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordActionTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordActionTest.java @@ -41,6 +41,7 @@ public class HeartbeatRecordActionTest { private HeartbeatRecordAction action; + private HeartbeatRecordAction cancellingAction; private PartitionMetadata partition; private RestrictionTracker tracker; private RestrictionInterrupter interrupter; @@ -49,7 +50,8 @@ public class HeartbeatRecordActionTest { @Before public void setUp() { final ChangeStreamMetrics metrics = mock(ChangeStreamMetrics.class); - action = new HeartbeatRecordAction(metrics); + action = new HeartbeatRecordAction(metrics, false); + cancellingAction = new HeartbeatRecordAction(metrics, true); partition = mock(PartitionMetadata.class); tracker = mock(RestrictionTracker.class); interrupter = mock(RestrictionInterrupter.class); @@ -60,6 +62,7 @@ public void setUp() { public void testRestrictionClaimed() { final String partitionToken = "partitionToken"; final Timestamp timestamp = Timestamp.ofTimeMicroseconds(10L); + final Timestamp endTimestamp = Timestamp.ofTimeMicroseconds(10L); when(tracker.tryClaim(timestamp)).thenReturn(true); when(partition.getPartitionToken()).thenReturn(partitionToken); @@ -70,7 +73,8 @@ public void testRestrictionClaimed() { new HeartbeatRecord(timestamp, null), tracker, interrupter, - watermarkEstimator); + watermarkEstimator, + endTimestamp); assertEquals(Optional.empty(), maybeContinuation); verify(watermarkEstimator).setWatermark(new Instant(timestamp.toSqlTimestamp().getTime())); @@ -80,6 +84,7 @@ public void testRestrictionClaimed() { public void testRestrictionNotClaimed() { final String partitionToken = "partitionToken"; final Timestamp timestamp = Timestamp.ofTimeMicroseconds(10L); + final Timestamp endTimestamp = Timestamp.ofTimeMicroseconds(10L); when(tracker.tryClaim(timestamp)).thenReturn(false); when(partition.getPartitionToken()).thenReturn(partitionToken); @@ -90,7 +95,8 @@ public void testRestrictionNotClaimed() { new HeartbeatRecord(timestamp, null), tracker, interrupter, - watermarkEstimator); + watermarkEstimator, + endTimestamp); assertEquals(Optional.of(ProcessContinuation.stop()), maybeContinuation); verify(watermarkEstimator, never()).setWatermark(any()); @@ -100,6 +106,7 @@ public void testRestrictionNotClaimed() { public void testSoftDeadlineReached() { final String partitionToken = "partitionToken"; final Timestamp timestamp = Timestamp.ofTimeMicroseconds(10L); + final Timestamp endTimestamp = Timestamp.ofTimeMicroseconds(10L); when(interrupter.tryInterrupt(timestamp)).thenReturn(true); when(tracker.tryClaim(timestamp)).thenReturn(true); @@ -111,9 +118,54 @@ public void testSoftDeadlineReached() { new HeartbeatRecord(timestamp, null), tracker, interrupter, - watermarkEstimator); + watermarkEstimator, + endTimestamp); assertEquals(Optional.of(ProcessContinuation.resume()), maybeContinuation); verify(watermarkEstimator, never()).setWatermark(any()); } + + @Test + public void testEndTimestampReachedOnCancellingAction() { + final String partitionToken = "partitionToken"; + final Timestamp timestamp = Timestamp.ofTimeMicroseconds(10L); + final Timestamp endTimestamp = Timestamp.ofTimeMicroseconds(10L); + + when(tracker.tryClaim(timestamp)).thenReturn(true); + when(partition.getPartitionToken()).thenReturn(partitionToken); + + final Optional maybeContinuation = + cancellingAction.run( + partition, + new HeartbeatRecord(timestamp, null), + tracker, + interrupter, + watermarkEstimator, + endTimestamp); + + assertEquals(Optional.empty(), maybeContinuation); + verify(watermarkEstimator).setWatermark(new Instant(timestamp.toSqlTimestamp().getTime())); + } + + @Test + public void testEndTimestampNotReachedOnCancellingAction() { + final String partitionToken = "partitionToken"; + final Timestamp timestamp = Timestamp.ofTimeMicroseconds(10L); + final Timestamp endTimestamp = Timestamp.ofTimeMicroseconds(20L); + + when(tracker.tryClaim(timestamp)).thenReturn(true); + when(partition.getPartitionToken()).thenReturn(partitionToken); + + final Optional maybeContinuation = + cancellingAction.run( + partition, + new HeartbeatRecord(timestamp, null), + tracker, + interrupter, + watermarkEstimator, + endTimestamp); + + assertEquals(Optional.of(ProcessContinuation.resume()), maybeContinuation); + verify(watermarkEstimator).setWatermark(new Instant(timestamp.toSqlTimestamp().getTime())); + } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java index 26ab41dff878..7c5d6d0f1870 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java @@ -58,6 +58,7 @@ import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; +import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Before; import org.junit.Test; @@ -119,7 +120,8 @@ public void setUp() throws Exception { partitionEndRecordAction, partitionEventRecordAction, metrics, - false); + false, + Duration.standardMinutes(2)); final Struct row = mock(Struct.class); partition = PartitionMetadata.newBuilder() @@ -223,7 +225,7 @@ public void testQueryChangeStreamWithDataChangeRecord() { eq(watermarkEstimator)); verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, WATERMARK_TIMESTAMP); - verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), any()); + verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), any(), any()); verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any(), any()); verify(partitionStartRecordAction, never()).run(any(), any(), any(), any(), any()); verify(partitionEndRecordAction, never()).run(any(), any(), any(), any(), any()); @@ -257,14 +259,16 @@ public void testQueryChangeStreamWithHeartbeatRecord() { eq(record1), eq(restrictionTracker), any(RestrictionInterrupter.class), - eq(watermarkEstimator))) + eq(watermarkEstimator), + eq(PARTITION_END_TIMESTAMP))) .thenReturn(Optional.empty()); when(heartbeatRecordAction.run( eq(partition), eq(record2), eq(restrictionTracker), any(RestrictionInterrupter.class), - eq(watermarkEstimator))) + eq(watermarkEstimator), + eq(PARTITION_END_TIMESTAMP))) .thenReturn(Optional.of(ProcessContinuation.stop())); when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK); @@ -279,14 +283,80 @@ public void testQueryChangeStreamWithHeartbeatRecord() { eq(record1), eq(restrictionTracker), any(RestrictionInterrupter.class), - eq(watermarkEstimator)); + eq(watermarkEstimator), + eq(PARTITION_END_TIMESTAMP)); verify(heartbeatRecordAction) .run( eq(partition), eq(record2), eq(restrictionTracker), any(RestrictionInterrupter.class), - eq(watermarkEstimator)); + eq(watermarkEstimator), + eq(PARTITION_END_TIMESTAMP)); + verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, WATERMARK_TIMESTAMP); + + verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any(), any()); + verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any(), any()); + verify(partitionStartRecordAction, never()).run(any(), any(), any(), any(), any()); + verify(partitionEndRecordAction, never()).run(any(), any(), any(), any(), any()); + verify(partitionEventRecordAction, never()).run(any(), any(), any(), any(), any()); + verify(restrictionTracker, never()).tryClaim(any()); + } + + @Test + public void testQueryChangeStreamWithHeartbeatRecordAndCancelOnHeartbeat() { + final Struct rowAsStruct = mock(Struct.class); + final ChangeStreamResultSetMetadata resultSetMetadata = + mock(ChangeStreamResultSetMetadata.class); + final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class); + final HeartbeatRecord record1 = mock(HeartbeatRecord.class); + final HeartbeatRecord record2 = mock(HeartbeatRecord.class); + when(record1.getRecordTimestamp()).thenReturn(RECORD_TIMESTAMP); + when(record2.getRecordTimestamp()).thenReturn(PARTITION_END_TIMESTAMP); + when(changeStreamDao.changeStreamQuery( + PARTITION_TOKEN, + PARTITION_START_TIMESTAMP, + PARTITION_END_TIMESTAMP, + PARTITION_HEARTBEAT_MILLIS)) + .thenReturn(resultSet); + when(resultSet.next()).thenReturn(true); + when(resultSet.getCurrentRowAsStruct()).thenReturn(rowAsStruct); + when(resultSet.getMetadata()).thenReturn(resultSetMetadata); + when(changeStreamRecordMapper.toChangeStreamRecords(partition, resultSet, resultSetMetadata)) + .thenReturn(Arrays.asList(record1, record2)); + when(heartbeatRecordAction.run( + eq(partition), + eq(record1), + eq(restrictionTracker), + any(RestrictionInterrupter.class), + eq(watermarkEstimator), + eq(PARTITION_END_TIMESTAMP))) + .thenReturn(Optional.of(ProcessContinuation.resume())); + when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK); + + final ProcessContinuation result = + action.run( + partition, restrictionTracker, outputReceiver, watermarkEstimator, bundleFinalizer); + + assertEquals(ProcessContinuation.resume(), result); + verify(heartbeatRecordAction) + .run( + eq(partition), + eq(record1), + eq(restrictionTracker), + any(RestrictionInterrupter.class), + eq(watermarkEstimator), + eq(PARTITION_END_TIMESTAMP)); + + // Heartbeat cancels loop and second record is not processed + verify(heartbeatRecordAction, never()) + .run( + eq(partition), + eq(record2), + eq(restrictionTracker), + any(RestrictionInterrupter.class), + eq(watermarkEstimator), + eq(PARTITION_END_TIMESTAMP)); verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, WATERMARK_TIMESTAMP); verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any(), any()); @@ -356,7 +426,7 @@ public void testQueryChangeStreamWithChildPartitionsRecord() { verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, WATERMARK_TIMESTAMP); verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any(), any()); - verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), any()); + verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), any(), any()); verify(partitionStartRecordAction, never()).run(any(), any(), any(), any(), any()); verify(partitionEndRecordAction, never()).run(any(), any(), any(), any(), any()); verify(partitionEventRecordAction, never()).run(any(), any(), any(), any(), any()); @@ -419,7 +489,7 @@ public void testQueryChangeStreamWithRestrictionFromAfterPartitionStart() { verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, WATERMARK_TIMESTAMP); verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any(), any()); - verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), any()); + verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), any(), any()); verify(partitionStartRecordAction, never()).run(any(), any(), any(), any(), any()); verify(partitionEndRecordAction, never()).run(any(), any(), any(), any(), any()); verify(partitionEventRecordAction, never()).run(any(), any(), any(), any(), any()); @@ -467,7 +537,7 @@ public void testQueryChangeStreamWithPartitionStartRecord() { verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, WATERMARK_TIMESTAMP); verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any(), any()); - verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), any()); + verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), any(), any()); verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any(), any()); verify(restrictionTracker, never()).tryClaim(any()); } @@ -517,7 +587,7 @@ public void testQueryChangeStreamWithRestrictionFromAfterPartitionStartForPartit verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, WATERMARK_TIMESTAMP); verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any(), any()); - verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), any()); + verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), any(), any()); verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any(), any()); verify(restrictionTracker, never()).tryClaim(any()); } @@ -564,7 +634,7 @@ public void testQueryChangeStreamWithPartitionEndRecordBoundedRestriction() { verify(restrictionTracker).tryClaim(PARTITION_END_TIMESTAMP); verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any(), any()); - verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), any()); + verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), any(), any()); verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any(), any()); verify(partitionStartRecordAction, never()).run(any(), any(), any(), any(), any()); verify(partitionEventRecordAction, never()).run(any(), any(), any(), any(), any()); @@ -617,7 +687,7 @@ public void testQueryChangeStreamWithPartitionEndRecordUnboundedRestriction() { verify(restrictionTracker).tryClaim(MAX_INCLUSIVE_END_AT); verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any(), any()); - verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), any()); + verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), any(), any()); verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any(), any()); verify(partitionStartRecordAction, never()).run(any(), any(), any(), any(), any()); verify(partitionEventRecordAction, never()).run(any(), any(), any(), any(), any()); @@ -665,7 +735,7 @@ public void testQueryChangeStreamWithPartitionEventRecord() { verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, WATERMARK_TIMESTAMP); verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any(), any()); - verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), any()); + verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), any(), any()); verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any(), any()); verify(partitionStartRecordAction, never()).run(any(), any(), any(), any(), any()); verify(partitionEndRecordAction, never()).run(any(), any(), any(), any(), any()); @@ -694,7 +764,7 @@ public void testQueryChangeStreamWithStreamFinished() { verify(metrics).decActivePartitionReadCounter(); verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any(), any()); - verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), any()); + verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), any(), any()); verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any(), any()); verify(partitionStartRecordAction, never()).run(any(), any(), any(), any(), any()); verify(partitionEndRecordAction, never()).run(any(), any(), any(), any(), any()); @@ -740,7 +810,7 @@ public void testQueryChangeStreamFinishedWithResume() { verify(metrics, never()).decActivePartitionReadCounter(); verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any(), any()); - verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), any()); + verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), any(), any()); verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any(), any()); verify(partitionStartRecordAction, never()).run(any(), any(), any(), any(), any()); verify(partitionEndRecordAction, never()).run(any(), any(), any(), any(), any()); @@ -776,7 +846,7 @@ public void testQueryChangeStreamWithOutOfRangeErrorOnUnboundedPartition() { verify(metrics).decActivePartitionReadCounter(); verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any(), any()); - verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), any()); + verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), any(), any()); verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any(), any()); verify(partitionStartRecordAction, never()).run(any(), any(), any(), any(), any()); verify(partitionEndRecordAction, never()).run(any(), any(), any(), any(), any()); @@ -809,7 +879,7 @@ public void testQueryChangeStreamWithOutOfRangeErrorOnBoundedPartition() { verify(metrics).decActivePartitionReadCounter(); verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any(), any()); - verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), any()); + verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), any(), any()); verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any(), any()); verify(partitionStartRecordAction, never()).run(any(), any(), any(), any(), any()); verify(partitionEndRecordAction, never()).run(any(), any(), any(), any(), any()); @@ -859,7 +929,7 @@ public void testQueryChangeStreamWithChildPartitionsRecordBoundedRestriction() { verify(restrictionTracker).tryClaim(PARTITION_END_TIMESTAMP); verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any(), any()); - verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), any()); + verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), any(), any()); verify(partitionStartRecordAction, never()).run(any(), any(), any(), any(), any()); verify(partitionEndRecordAction, never()).run(any(), any(), any(), any(), any()); verify(partitionEventRecordAction, never()).run(any(), any(), any(), any(), any()); @@ -912,7 +982,7 @@ public void testQueryChangeStreamWithChildPartitionsRecordUnboundedRestriction() verify(restrictionTracker).tryClaim(MAX_INCLUSIVE_END_AT); verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any(), any()); - verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), any()); + verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), any(), any()); verify(partitionStartRecordAction, never()).run(any(), any(), any(), any(), any()); verify(partitionEndRecordAction, never()).run(any(), any(), any(), any(), any()); verify(partitionEventRecordAction, never()).run(any(), any(), any(), any(), any()); @@ -935,7 +1005,8 @@ public void testQueryChangeStreamWithMutableChangeStreamCappedEndTimestamp() { partitionEndRecordAction, partitionEventRecordAction, metrics, - true); + true, + Duration.standardMinutes(2)); // Set endTimestamp to 60 minutes in the future Timestamp now = Timestamp.now(); @@ -983,7 +1054,8 @@ public void testQueryChangeStreamWithMutableChangeStreamUncappedEndTimestamp() { partitionEndRecordAction, partitionEventRecordAction, metrics, - true); + true, + Duration.standardMinutes(2)); // Set endTimestamp to only 10 seconds in the future Timestamp now = Timestamp.now(); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/InitializeDoFnTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/InitializeDoFnTest.java index 9672e23b16d7..c3bee10f8e14 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/InitializeDoFnTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/InitializeDoFnTest.java @@ -62,7 +62,8 @@ public void setUp() { daoFactory, mapperFactory, Timestamp.ofTimeMicroseconds(1L), - Timestamp.ofTimeMicroseconds(2L)); + Timestamp.ofTimeMicroseconds(2L), + 2000L); } @Test diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java index 9e588de77a03..ee65bd21bf00 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java @@ -53,6 +53,7 @@ import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; +import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Before; import org.junit.Test; @@ -103,7 +104,9 @@ public void setUp() { partitionEventRecordAction = mock(PartitionEventRecordAction.class); queryChangeStreamAction = mock(QueryChangeStreamAction.class); - doFn = new ReadChangeStreamPartitionDoFn(daoFactory, mapperFactory, actionFactory, metrics); + doFn = + new ReadChangeStreamPartitionDoFn( + daoFactory, mapperFactory, actionFactory, metrics, Duration.standardMinutes(2), false); doFn.setThroughputEstimator(throughputEstimator); partition = @@ -131,7 +134,7 @@ public void setUp() { when(actionFactory.dataChangeRecordAction(throughputEstimator)) .thenReturn(dataChangeRecordAction); - when(actionFactory.heartbeatRecordAction(metrics)).thenReturn(heartbeatRecordAction); + when(actionFactory.heartbeatRecordAction(metrics, false)).thenReturn(heartbeatRecordAction); when(actionFactory.childPartitionsRecordAction(partitionMetadataDao, metrics)) .thenReturn(childPartitionsRecordAction); when(actionFactory.partitionStartRecordAction(partitionMetadataDao, metrics)) @@ -152,7 +155,8 @@ public void setUp() { eq(partitionEndRecordAction), eq(partitionEventRecordAction), eq(metrics), - anyBoolean())) + anyBoolean(), + eq(Duration.standardMinutes(2)))) .thenReturn(queryChangeStreamAction); doFn.setup(); @@ -171,7 +175,7 @@ public void testQueryChangeStreamMode() { .run(partition, tracker, receiver, watermarkEstimator, bundleFinalizer); verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any(), any()); - verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), any()); + verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), any(), any()); verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any(), any()); verify(partitionStartRecordAction, never()).run(any(), any(), any(), any(), any()); verify(partitionEndRecordAction, never()).run(any(), any(), any(), any(), any());