Skip to content

[improve](streaming-job) async chunk splitting for StreamingInsertJob#63079

Draft
JNSimba wants to merge 1 commit into
apache:masterfrom
JNSimba:improve-streamingjob-split
Draft

[improve](streaming-job) async chunk splitting for StreamingInsertJob#63079
JNSimba wants to merge 1 commit into
apache:masterfrom
JNSimba:improve-streamingjob-split

Conversation

@JNSimba
Copy link
Copy Markdown
Member

@JNSimba JNSimba commented May 8, 2026

Summary

StreamingInsertJob (CDC FROM-TO and cdc_stream TVF paths) used to call splitChunks() synchronously inside CREATE STREAMING JOB, asking cdc_client to cut every chunk of every table before returning. On large/non-uniform PK tables this can take 30+ minutes — far beyond the BE→cdc_client BRPC 60s timeout, and the SQL client blocks the whole time.

This PR makes splitting tick-driven by the FE scheduler:

  • CREATE returns immediately; no more synchronous splitChunks().
  • Each scheduler tick advanceSplits() issues one short fetchSplits RPC (default batchSize=100) and pushes that batch into remainingSplits. Tasks dispatch as soon as the first batch lands, so end-to-end first-byte latency stays close to flink-cdc's.
  • cdc_client is stateless — every RPC reconstructs ChunkSplitter from the (currentSplittingTable, nextSplitStart, nextSplitId) triple supplied by FE; flink-cdc internals are untouched (uses the public ChunkSplitter API only).
  • Crash recovery uses three sources of truth:
    • editlog persists committedSplitProgress (3-field SplitProgress) + existing chunkHighWatermarkMap / binlogOffsetPersist
    • streaming_job_meta system table holds full chunk_list JSON per table (UPSERT each advanceSplits)
    • cdc_client memory holds nothing
  • Both FROM-TO (multi-table) and TVF (single-table) paths share the same SourceOffsetProvider#initSplitProgress / noMoreSplits / advanceSplits interface; StreamingJobSchedulerTask.handlePendingState pre-advances one batch so the first task doesn't wait a full max_interval.

Detailed design lives in the linked plan.

Changes

  • fe-common: FetchTableSplitsRequest adds nextSplitStart (Object[]) / nextSplitId / batchSize.
  • fe-core:
    • SourceOffsetProvider adds 3 default methods: initSplitProgress / advanceSplits / noMoreSplits.
    • JdbcSourceOffsetProvider implements the async state machine (committed/cdc SplitProgress, advanceSplits, dedup, system-table UPSERT, replay path).
    • JdbcTvfSourceOffsetProvider.initOnCreate no longer pre-splits; relies on the same scheduler tick path.
    • StreamingInsertJob carries syncTables (@SerializedName("st")); initSourceJob / initInsertJob initialize SplitProgress; advanceSplitsIfNeed() mirrors fetchMeta error handling (PAUSE on failure).
    • StreamingJobSchedulerTask.handlePendingState / handleRunningState call advanceSplitsIfNeed() each tick; PENDING handler pre-advances and short-circuits if PAUSED.
    • StreamingJobUtils.upsertChunkList covers id-allocation via MAX(id)+1 lookup.
  • cdc_client/JdbcIncrementalSourceReader: getSourceSplits() rebuilt around the public ChunkSplitter API (no more in-memory loop / reflection hack).

Tests

  • SplitProgressTest — copy/null-field semantics.
  • JdbcSourceOffsetProviderAsyncSplitTest — covers advanceSplits (first call / continue same table / cross-table switch / dedup / empty batch), noMoreSplits, updateOffset committed-progress advancement (mid-chunk vs last chunk vs replay missing-split path), and computeCdcRemainingTables.
  • Regression case (separate commit, not in this PR yet): test_streaming_postgres_job_async_split.groovy — 100 rows × snapshot_split_size=5 → 20 splits across multiple ticks; asserts CREATE returns < 30s, full snapshot count + DISTINCT id, then INSERT/UPDATE/DELETE in binlog phase.

Test plan

  • mvn test -pl fe/fe-core -Dtest=JdbcSourceOffsetProviderAsyncSplitTest,SplitProgressTest
  • Run test_streaming_postgres_job_async_split regression locally
  • PG/MySQL non-uniform PK large-table manual test: confirm CREATE returns in seconds, SHOW STREAMING JOB immediately reflects the new job, snapshot completes, binlog phase healthy
  • FE restart mid-snapshot: confirm cdc-side resumes from system-table position, no duplicate / lost rows
  • cdc_client kill mid-snapshot: confirm FE retries on next tick, no duplicate / lost rows
  • cdc_stream TVF + StreamingInsertJob path: confirm CREATE no longer blocks

@hello-stephen
Copy link
Copy Markdown
Contributor

Thank you for your contribution to Apache Doris.
Don't know what should be done next? See How to process your PR.

Please clearly describe your PR:

  1. What problem was fixed (it's best to include specific error reporting information). How it was fixed.
  2. Which behaviors were modified. What was the previous behavior, what is it now, why was it modified, and what possible impacts might there be.
  3. What features were added. Why was this function added?
  4. Which code was refactored and why was this part of the code refactored?
  5. Which functions were optimized and what is the difference before and after the optimization?

@JNSimba JNSimba changed the title [feature](streaming-job) async chunk splitting for StreamingInsertJob [improve](streaming-job) async chunk splitting for StreamingInsertJob May 8, 2026
@JNSimba JNSimba requested a review from Copilot May 11, 2026 03:56
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR moves StreamingInsertJob (CDC FROM-TO and cdc_stream TVF) snapshot chunk splitting from a synchronous CREATE STREAMING JOB path to an incremental, scheduler-tick-driven flow. The goal is to avoid long blocking CREATE times and BRPC timeouts on large / skewed PK tables by fetching snapshot splits in small batches and persisting progress for recovery.

Changes:

  • Adds split-progress APIs to SourceOffsetProvider and implements an async split state machine in JdbcSourceOffsetProvider (plus new FE tests).
  • Introduces FetchTableSplitsRequest fields to drive stateless, resumable split generation (nextSplitStart/nextSplitId/batchSize) and rebuilds cdc_client split fetching around flink-cdc ChunkSplitter.
  • Persists per-table chunk lists incrementally via StreamingJobUtils.upsertChunkList, and advances splits each scheduler tick (including a pre-advance in PENDING).

Reviewed changes

Copilot reviewed 10 out of 10 changed files in this pull request and generated 7 comments.

Show a summary per file
File Description
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java Reworks /api/fetchSplits handling to drive flink-cdc ChunkSplitter directly (stateless batch split generation).
fe/fe-core/src/test/java/org/apache/doris/job/offset/jdbc/SplitProgressTest.java Unit tests for SplitProgress default state and deep-copy semantics.
fe/fe-core/src/test/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProviderAsyncSplitTest.java Unit tests covering async split advancement, dedup, noMoreSplits, and committed-progress advancement.
fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java Adds per-table chunk_list UPSERT support with id reuse / allocation.
fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java Adds default split-progress hooks (initSplitProgress, advanceSplits, noMoreSplits).
fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcTvfSourceOffsetProvider.java Removes create-time pre-splitting; re-init split progress on replay; relies on scheduler-driven split fetching.
fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java Implements async split progress, scheduler-driven split fetching, persistence to system table, and restart replay logic.
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java Calls advanceSplitsIfNeed() each tick and pre-advances once in PENDING before dispatch.
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java Persists syncTables, initializes split progress on CREATE, and adds advanceSplitsIfNeed() that pauses job on failure.
fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/FetchTableSplitsRequest.java Adds nextSplitStart, nextSplitId, and batchSize fields to support resumable batched split fetching.
Comments suppressed due to low confidence (1)

fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcTvfSourceOffsetProvider.java:320

  • replayIfNeed() comment still says snapshot splits in the meta table are "written by initOnCreate", but initOnCreate() is now an intentional no-op and meta writes come from scheduler-driven advanceSplits()/upsertChunkList. Updating this comment will avoid confusion when debugging TVF recovery behavior.
        // Re-init transient split progress fields lost across FE restart.
        // syncTables itself is persisted on StreamingInsertJob; cdcSplitProgress is rebuilt empty
        // here and advanceSplits will resume from the system table on next tick.
        if (cdcSplitProgress == null) {
            initSplitProgress(job.getSyncTables());
        }
        if (currentOffset == null) {
            // No committed txn yet. If snapshot splits exist in the meta table (written by
            // initOnCreate), restore remainingSplits so getNextOffset() returns snapshot splits
            // instead of a BinlogSplit (which would incorrectly skip the snapshot phase).

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 252 to 257
StreamingJobUtils.resolveAndValidateSource(
dataSourceType, sourceProperties, String.valueOf(getJobId()), createTbls);
this.offsetProvider = createOffsetProvider(getConvertedSourceProperties());
JdbcSourceOffsetProvider rdsOffsetProvider = (JdbcSourceOffsetProvider) this.offsetProvider;
rdsOffsetProvider.splitChunks(createTbls);
// Initialize split progress; advanceSplits is driven later by the scheduler each tick.
this.offsetProvider.initSplitProgress(this.syncTables);
} catch (Exception ex) {
Comment on lines +623 to +631
/** Initialize at CREATE time. syncTables is the list of source tables this job syncs. */
@Override
public void initSplitProgress(List<String> syncTables) {
synchronized (splitsLock) {
this.cachedSyncTables = syncTables;
this.committedSplitProgress = new SplitProgress();
this.cdcSplitProgress = new SplitProgress();
}
}
Comment on lines 144 to +169
public List<AbstractSourceSplit> getSourceSplits(FetchTableSplitsRequest ftsReq) {
LOG.info("Get table {} splits for job {}", ftsReq.getSnapshotTable(), ftsReq.getJobId());
LOG.info("Get table {} splits for job {} (nextSplitId={}, hasNextSplitStart={})",
ftsReq.getSnapshotTable(), ftsReq.getJobId(),
ftsReq.getNextSplitId(),
ftsReq.getNextSplitStart() != null);
JdbcSourceConfig sourceConfig = getSourceConfig(ftsReq);
List<org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit>
remainingSnapshotSplits = new ArrayList<>();
StreamSplit remainingStreamSplit = null;

// Check startup mode - for PostgreSQL, we use similar logic as MySQL
String startupMode = ftsReq.getConfig().get(DataSourceConfigKeys.OFFSET);
if (DataSourceConfigKeys.OFFSET_INITIAL.equalsIgnoreCase(startupMode)
|| DataSourceConfigKeys.OFFSET_SNAPSHOT.equalsIgnoreCase(startupMode)) {
remainingSnapshotSplits =
startSplitChunks(sourceConfig, ftsReq.getSnapshotTable(), ftsReq.getConfig());
} else {
// For non-initial mode, create a stream split
Offset startingOffset = createInitialOffset();
remainingStreamSplit =
new StreamSplit(
STREAM_SPLIT_ID,
startingOffset,
createNoStoppingOffset(),
new ArrayList<>(),
new HashMap<>(),
0);
}
String schema = ftsReq.getConfig().get(DataSourceConfigKeys.SCHEMA);
TableId tableId = new TableId(null, schema, ftsReq.getSnapshotTable());

// Reconstruct flink-cdc ChunkBound from FE-side nextSplitStart.
// null / empty -> START_BOUND (first time splitting this table, start from min(pk))
// non-empty -> middleOf(value[0]) (continue from previous split's splitEnd)
// END_BOUND never appears in transit: when a table is fully split, FE clears
// committedSplitProgress.currentSplittingTable to null, so the next RPC will be for
// a different table or no RPC at all.
Object[] pkValues = ftsReq.getNextSplitStart();
ChunkBound bound = (pkValues == null || pkValues.length == 0)
? ChunkBound.START_BOUND
: ChunkBound.middleOf(pkValues[0]);

int splitId = ftsReq.getNextSplitId() == null ? 0 : ftsReq.getNextSplitId();
int batchSize = ftsReq.getBatchSize() == null ? 100 : ftsReq.getBatchSize();

ChunkSplitterState state = new ChunkSplitterState(tableId, bound, splitId);
ChunkSplitter splitter = getDialect(sourceConfig).createChunkSplitter(sourceConfig, state);
splitter.open();
Comment on lines +667 to +717
synchronized (splitsLock) {
// 1. Pick next table if not currently splitting one.
if (cdcSplitProgress.getCurrentSplittingTable() == null) {
List<String> remaining = computeCdcRemainingTables();
if (remaining.isEmpty()) {
return;
}
cdcSplitProgress.setCurrentSplittingTable(remaining.get(0));
cdcSplitProgress.setNextSplitStart(null);
cdcSplitProgress.setNextSplitId(null);
}
String tbl = cdcSplitProgress.getCurrentSplittingTable();
Object[] startVal = cdcSplitProgress.getNextSplitStart();
Integer splitId = cdcSplitProgress.getNextSplitId();

// 2. RPC. OK to hold lock during RPC: no async splitting thread; updateOffset on
// task commit waits briefly (max_interval >> RPC time anyway).
List<SnapshotSplit> batch = rpcFetchSplitsBatch(tbl, startVal, splitId);
if (batch == null || batch.isEmpty()) {
return;
}

// 3. mergeBySplitId (defensive dedup).
Set<String> existingIds = new HashSet<>();
finishedSplits.forEach(s -> existingIds.add(s.getSplitId()));
remainingSplits.forEach(s -> existingIds.add(s.getSplitId()));
List<SnapshotSplit> newSplits = new ArrayList<>();
for (SnapshotSplit s : batch) {
if (!existingIds.contains(s.getSplitId())) {
newSplits.add(s);
}
}
remainingSplits.addAll(newSplits);

// 4. UPSERT this table's full chunk_list to system table.
List<SnapshotSplit> splitsOfTbl = Stream.concat(
finishedSplits.stream(), remainingSplits.stream())
.filter(s -> tbl.equals(s.getTableId()))
.sorted(Comparator.comparingInt(s -> splitIdOf(s.getSplitId())))
.collect(Collectors.toList());
try {
StreamingJobUtils.upsertChunkList(getJobId(), tbl, splitsOfTbl);
} catch (Exception e) {
throw new JobException("UPSERT chunk_list failed for " + tbl + ": " + e.getMessage());
}

// 5. Advance cdcSplitProgress to the last split in the batch.
applySplitToProgress(cdcSplitProgress, batch.get(batch.size() - 1));
}
}

Comment on lines +689 to +707
// 3. mergeBySplitId (defensive dedup).
Set<String> existingIds = new HashSet<>();
finishedSplits.forEach(s -> existingIds.add(s.getSplitId()));
remainingSplits.forEach(s -> existingIds.add(s.getSplitId()));
List<SnapshotSplit> newSplits = new ArrayList<>();
for (SnapshotSplit s : batch) {
if (!existingIds.contains(s.getSplitId())) {
newSplits.add(s);
}
}
remainingSplits.addAll(newSplits);

// 4. UPSERT this table's full chunk_list to system table.
List<SnapshotSplit> splitsOfTbl = Stream.concat(
finishedSplits.stream(), remainingSplits.stream())
.filter(s -> tbl.equals(s.getTableId()))
.sorted(Comparator.comparingInt(s -> splitIdOf(s.getSplitId())))
.collect(Collectors.toList());
try {
Comment on lines +718 to +722
/** Parse the trailing integer id from flink-cdc splitId format "tableId:id". */
private static int splitIdOf(String splitId) {
int colon = splitId.lastIndexOf(':');
return Integer.parseInt(splitId.substring(colon + 1));
}
Comment on lines +155 to +162
// non-empty -> middleOf(value[0]) (continue from previous split's splitEnd)
// END_BOUND never appears in transit: when a table is fully split, FE clears
// committedSplitProgress.currentSplittingTable to null, so the next RPC will be for
// a different table or no RPC at all.
Object[] pkValues = ftsReq.getNextSplitStart();
ChunkBound bound = (pkValues == null || pkValues.length == 0)
? ChunkBound.START_BOUND
: ChunkBound.middleOf(pkValues[0]);
@JNSimba JNSimba requested a review from Copilot May 11, 2026 06:06
@JNSimba
Copy link
Copy Markdown
Member Author

JNSimba commented May 11, 2026

/review

Copy link
Copy Markdown
Contributor

@github-actions github-actions Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found one additional blocking issue beyond the existing review threads.

Critical checkpoint conclusions:

  • Goal/test coverage: the PR moves CDC chunk splitting from CREATE-time blocking to scheduler-driven async fetching and adds focused unit tests, but the TVF FE-restart path is not covered and can stop fetching the remaining chunks.
  • Scope: the change is focused on async split progress, though it touches both FROM-TO and TVF paths.
  • Concurrency: existing threads already cover the broad splitsLock/RPC concern; I did not add a duplicate. The new issue is lifecycle/replay state, not a new lock-order finding.
  • Lifecycle/replay: blocking issue found for cdc_stream TVF restart because transient cachedSyncTables is not restored.
  • Configuration/compatibility: no new config or wire-incompatible persistent format issue found beyond the added optional request fields.
  • Parallel paths: FROM-TO replay initializes split progress, but the TVF override does not do the equivalent unconditionally.
  • Testing: unit tests cover the base provider state machine, but miss TVF replay after FE restart. No end-to-end regression result is included in this PR.
  • Observability/performance/transactions: no additional distinct issue found beyond existing review threads.
  • User focus: no additional user-provided review focus was supplied.

// syncTables itself is persisted on StreamingInsertJob; cdcSplitProgress is rebuilt empty
// here and advanceSplits will resume from the system table on next tick.
if (cdcSplitProgress == null) {
initSplitProgress(job.getSyncTables());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This restart guard only runs when cdcSplitProgress is null, but the no-arg constructor already initializes it in the base class. After an FE restart for a cdc_stream TVF job, replayIfNeed() runs before ensureInitialized(), so cachedSyncTables remains null. The restored remainingSplits from streaming_job_meta can still be consumed, but once those are committed, noMoreSplits() treats the null table cache as empty and advanceSplitsIfNeed() never fetches the next batch. A mid-snapshot restart therefore truncates the snapshot to only the chunks already present in the meta table. Please always restore the transient split progress/table cache here, or at least call initSplitProgress(job.getSyncTables()) when cachedSyncTables is null as well, and add a TVF restart test that consumes restored chunks then verifies another batch is fetched.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 10 out of 10 changed files in this pull request and generated 8 comments.

Comment on lines 156 to +166
public Offset getNextOffset(StreamingJobProperties jobProps, Map<String, String> properties) {
JdbcOffset nextOffset = new JdbcOffset();
if (!remainingSplits.isEmpty()) {
int splitsNum = Math.min(remainingSplits.size(), snapshotParallelism);
List<SnapshotSplit> snapshotSplits = new ArrayList<>(remainingSplits.subList(0, splitsNum));
nextOffset.setSplits(snapshotSplits);
return nextOffset;
} else if (currentOffset != null && currentOffset.snapshotSplit()) {
// initial mode: snapshot to binlog
// snapshot-only mode must be intercepted by hasReachedEnd() before reaching here
} else if (currentOffset != null && currentOffset.snapshotSplit() && noMoreSplits()) {
// initial mode: snapshot to binlog. noMoreSplits() guards against switching while
// splitting is still in progress (remainingSplits empty doesn't mean fully cut).
// snapshot-only mode must be intercepted by hasReachedEnd() before reaching here.
Comment on lines 304 to +320
@Override
public boolean hasMoreDataToConsume() {
if (currentOffset == null) {
return true;
}

if (currentOffset.snapshotSplit()) {
if (isSnapshotOnlyMode() && remainingSplits.isEmpty()) {
if (!remainingSplits.isEmpty()) {
return true;
}
// remainingSplits empty: if splitting is still in progress, scheduler should delay
// task dispatch (advanceSplits will push more splits next tick).
if (!noMoreSplits()) {
return false;
}
return true;
// Splitting fully done: snapshot-only completes here; initial mode falls through to binlog.
return !isSnapshotOnlyMode();
Comment on lines +718 to +721
/** Parse the trailing integer id from flink-cdc splitId format "tableId:id". */
private static int splitIdOf(String splitId) {
int colon = splitId.lastIndexOf(':');
return Integer.parseInt(splitId.substring(colon + 1));
Comment on lines +746 to +757
/** RPC fetchSplits with (table, nextSplitStart, nextSplitId, batchSize). protected for UT subclass. */
protected List<SnapshotSplit> rpcFetchSplitsBatch(String table, Object[] nextSplitStart, Integer nextSplitId)
throws JobException {
Backend backend = StreamingJobUtils.selectBackend(cloudCluster);
FetchTableSplitsRequest requestParams = new FetchTableSplitsRequest(
getJobId(), sourceType.name(), sourceProperties, getFrontendAddress(), table);
requestParams.setNextSplitStart(nextSplitStart);
requestParams.setNextSplitId(nextSplitId);
InternalService.PRequestCdcClientRequest request = InternalService.PRequestCdcClientRequest.newBuilder()
.setApi("/api/fetchSplits")
.setParams(new Gson().toJson(requestParams))
.build();
Comment on lines +193 to +206
public static void upsertChunkList(Long jobId, String tableName, List<SnapshotSplit> chunks) throws Exception {
createMetaTableIfNotExist();
Integer id = querySingleTableId(jobId, tableName);
if (id == null) {
id = queryNextAvailableId(jobId);
}
Map<String, String> params = new HashMap<>();
params.put("id", String.valueOf(id));
params.put("job_id", String.valueOf(jobId));
params.put("table_name", tableName);
params.put("chunk_list", objectMapper.writeValueAsString(chunks));
StringSubstitutor sub = new StringSubstitutor(params);
String sql = sub.replace(INSERT_INTO_META_TABLE_TEMPLATE);
batchInsert(Collections.singletonList(sql));
Comment on lines +155 to +162
// non-empty -> middleOf(value[0]) (continue from previous split's splitEnd)
// END_BOUND never appears in transit: when a table is fully split, FE clears
// committedSplitProgress.currentSplittingTable to null, so the next RPC will be for
// a different table or no RPC at all.
Object[] pkValues = ftsReq.getNextSplitStart();
ChunkBound bound = (pkValues == null || pkValues.length == 0)
? ChunkBound.START_BOUND
: ChunkBound.middleOf(pkValues[0]);
Comment on lines +167 to +175
ChunkSplitterState state = new ChunkSplitterState(tableId, bound, splitId);
ChunkSplitter splitter = getDialect(sourceConfig).createChunkSplitter(sourceConfig, state);
splitter.open();

List<AbstractSourceSplit> splits = new ArrayList<>();
if (!remainingSnapshotSplits.isEmpty()) {
for (org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit
snapshotSplit : remainingSnapshotSplits) {
String splitId = snapshotSplit.splitId();
String tableId = snapshotSplit.getTableId().identifier();
Object[] splitStart = snapshotSplit.getSplitStart();
Object[] splitEnd = snapshotSplit.getSplitEnd();
List<String> splitKey = snapshotSplit.getSplitKeyType().getFieldNames();
SnapshotSplit split =
new SnapshotSplit(splitId, tableId, splitKey, splitStart, splitEnd, null);
splits.add(split);
try {
List<AbstractSourceSplit> result = new ArrayList<>();
while (result.size() < batchSize) {
Collection<org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit>
chunks = splitter.generateSplits(tableId);
Comment on lines +36 to +46
/**
* PK values of the next split start (inclusive lower bound). null = fresh start (START_BOUND).
* Same shape as SnapshotSplit.splitStart/splitEnd; cdc_client takes [0] to construct ChunkBound.
*/
private Object[] nextSplitStart;

/** Next split id; null = 0 (fresh start). */
private Integer nextSplitId;

/** Max splits to fetch in this RPC; null = default 100. */
private Integer batchSize;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants