[flink] add a test for ccovering the latest mode in scan.startup.mode configuration tests#2477
[flink] add a test for ccovering the latest mode in scan.startup.mode configuration tests#2477nhuantho wants to merge 2 commits intoapache:mainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
This PR adds test coverage for the scan.startup.mode='latest' configuration option in Flink's changelog virtual table integration tests. The change ensures that the latest mode correctly reads only new records after subscription, filling a gap in the existing test suite.
Changes:
- Added an overloaded
collectRowsWithTimeoutmethod that accepts a custommaxWaitTimeparameter - Extended
testChangelogWithScanStartupModeto include a test case forscan.startup.mode='latest'
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtils.java | Adds overloaded method to support custom timeout duration when collecting rows |
| fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java | Implements test logic for verifying latest scan startup mode behavior |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| break; | ||
| } | ||
| } | ||
| int id = Integer.parseInt(latestResults.getFirst().replaceAll(".*\\[(\\d+)]", "$1")); |
There was a problem hiding this comment.
The regex pattern assumes a specific row format. If latestResults.getFirst() doesn't match the expected pattern, parseInt will throw a NumberFormatException. Consider adding validation or using a more robust parsing approach to handle unexpected formats gracefully.
| for (int attempt = 0; attempt < 10; attempt++) { | ||
| // Write a new record (with id larger than 5) | ||
| int rowId = 6 + attempt; | ||
| writeRows(conn, tablePath, Arrays.asList(row(rowId, "v" + rowId)), false); | ||
|
|
||
| // Try to fetch one record with a 5-second timeout | ||
| latestResults = collectRowsWithTimeout(rowIterLatest, 1, Duration.ofSeconds(5)); |
There was a problem hiding this comment.
The hardcoded 5-second timeout per attempt combined with 10 attempts could result in up to 50 seconds of waiting. Consider reducing the timeout or number of attempts, or explaining why this duration is necessary to avoid unnecessarily long test execution times.
| for (int attempt = 0; attempt < 10; attempt++) { | |
| // Write a new record (with id larger than 5) | |
| int rowId = 6 + attempt; | |
| writeRows(conn, tablePath, Arrays.asList(row(rowId, "v" + rowId)), false); | |
| // Try to fetch one record with a 5-second timeout | |
| latestResults = collectRowsWithTimeout(rowIterLatest, 1, Duration.ofSeconds(5)); | |
| for (int attempt = 0; attempt < 5; attempt++) { | |
| // Write a new record (with id larger than 5) | |
| int rowId = 6 + attempt; | |
| writeRows(conn, tablePath, Arrays.asList(row(rowId, "v" + rowId)), false); | |
| // Try to fetch one record with a 1-second timeout | |
| latestResults = collectRowsWithTimeout(rowIterLatest, 1, Duration.ofSeconds(1)); |
There was a problem hiding this comment.
Can you please ellaborate your concern?
There was a problem hiding this comment.
@MehulBatra, from my side, Copilot was suggested incorrectly:
- Proposal:
- Fetch timeout: 5s -> Copilot:
latestResults = collectRowsWithTimeout(rowIterLatest, 1, Duration.ofSeconds(1)); - Repeat step 1 for 10 times -> Copilot:
for (int attempt = 0; attempt < 5; attempt++)
- Fetch timeout: 5s -> Copilot:
|
@MehulBatra, can you please review the pull request again? |
Thank you for your PR I will try to review it within this week! |

Purpose
Linked issue: close #2471
Brief change log
collectRowsWithTimeoutmethod allows passingmaxWaitTime.Tests
collectRowsWithTimeoutmethod allows passingmaxWaitTime.testChangelogWithScanStartupMode-> add logic for testingscan.startup.mode='latest'