diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 63cec327c207b8..b991874cb82884 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1182,6 +1182,9 @@ public class Config extends ConfigBase { @ConfField(mutable = true, masterOnly = true) public static int streaming_cdc_heavy_rpc_timeout_sec = 600; + @ConfField(mutable = true, masterOnly = true) + public static int streaming_cdc_fetch_splits_batch_size = 100; + /** * the max timeout of get kafka meta. */ diff --git a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/FetchTableSplitsRequest.java b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/FetchTableSplitsRequest.java index a54b263073415f..83b0ec6c248391 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/FetchTableSplitsRequest.java +++ b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/FetchTableSplitsRequest.java @@ -33,6 +33,18 @@ public class FetchTableSplitsRequest extends JobBaseConfig { private String snapshotTable; + /** + * The next split start, null = fresh start (START_BOUND). + * cdc_client takes to construct ChunkSplitterState. + */ + private Object[] nextSplitStart; + + /** Next split id; null = 0 (fresh start). */ + private Integer nextSplitId; + + /** Splits to fetch in this RPC; null lets cdc_client use its default. */ + private Integer batchSize; + public FetchTableSplitsRequest(Long jobId, String name, Map sourceProperties, String frontendAddress, String snapshotTable) { super(jobId.toString(), name, sourceProperties, frontendAddress); diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java index f0fd31623fa8bb..b2aa058cc6553c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java @@ -94,6 +94,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.LinkedHashMap; @@ -174,6 +175,11 @@ public class StreamingInsertJob extends AbstractJob syncTables; + // The sampling window starts at the beginning of the sampling window. // If the error rate exceeds `max_filter_ratio` within the window, the sampling fails. @Setter @@ -238,6 +244,7 @@ private void initSourceJob() { init(); checkRequiredSourceProperties(); List createTbls = createTableIfNotExists(); + this.syncTables = createTbls; if (sourceProperties.get(DataSourceConfigKeys.INCLUDE_TABLES) == null) { // cdc need the final includeTables String includeTables = String.join(",", createTbls); @@ -246,8 +253,7 @@ private void initSourceJob() { StreamingJobUtils.resolveAndValidateSource( dataSourceType, sourceProperties, String.valueOf(getJobId()), createTbls); this.offsetProvider = createOffsetProvider(getConvertedSourceProperties()); - JdbcSourceOffsetProvider rdsOffsetProvider = (JdbcSourceOffsetProvider) this.offsetProvider; - rdsOffsetProvider.splitChunks(createTbls); + this.offsetProvider.initOnCreate(this.syncTables); } catch (Exception ex) { log.warn("init streaming job for {} failed", dataSourceType, ex); throw new RuntimeException(ex.getMessage()); @@ -382,10 +388,13 @@ private void initInsertJob() { this.originTvfProps = currentTvf.getProperties().getMap(); this.offsetProvider = createOffsetProvider(sourceProperties); this.offsetProvider.ensureInitialized(getJobId(), originTvfProps); + // cdc_stream TVF has TABLE; S3 TVF doesn't, so syncTables stays null there. + String tvfTable = originTvfProps.get(DataSourceConfigKeys.TABLE); + this.syncTables = tvfTable == null ? null : Collections.singletonList(tvfTable); // Validate source-side resources (e.g. PG slot/publication ownership) once at job // creation so conflicts fail fast. No-op for standalone cdc_stream TVF (no job). StreamingJobUtils.validateTvfSource(tvfType, originTvfProps, String.valueOf(getJobId())); - this.offsetProvider.initOnCreate(); + this.offsetProvider.initOnCreate(this.syncTables); // validate offset props, only for s3 cause s3 tvf no offset prop if (jobProperties.getOffsetProperty() != null && S3TableValuedFunction.NAME.equalsIgnoreCase(tvfType)) { @@ -704,6 +713,28 @@ protected void fetchMeta() throws JobException { } } + /** + * Advance one batch of split fetching if there are still tables to split. + * Called by scheduler each tick (PENDING/RUNNING). Mirrors fetchMeta error handling. + */ + public void advanceSplitsIfNeed() throws JobException { + if (offsetProvider.noMoreSplits()) { + return; + } + try { + offsetProvider.advanceSplits(); + } catch (Exception ex) { + log.warn("advance splits failed, job id: {}", getJobId(), ex); + if (this.getFailureReason() == null + || !InternalErrorCode.MANUAL_PAUSE_ERR.equals(this.getFailureReason().getCode())) { + this.setFailureReason(new FailureReason( + InternalErrorCode.GET_REMOTE_DATA_ERROR, + "Failed to advance splits, " + ex.getMessage())); + this.updateJobStatus(JobStatus.PAUSED); + } + } + } + public boolean needScheduleTask() { readLock(); try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java index b1948dd4bd51e5..0f6bcba892b85d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java @@ -66,6 +66,12 @@ private void handlePendingState() throws JobException { } } streamingInsertJob.replayOffsetProviderIfNeed(); + // Pre-advance one batch so the first task has splits to consume + streamingInsertJob.advanceSplitsIfNeed(); + if (streamingInsertJob.getJobStatus() == JobStatus.PAUSED) { + // advanceSplits failed and paused the job; skip task dispatch this tick. + return; + } if (streamingInsertJob.hasReachedEnd()) { // Source already fully consumed (e.g. snapshot-only mode recovered after FE restart). // Transition directly to FINISHED without creating a new task. @@ -80,6 +86,7 @@ private void handlePendingState() throws JobException { private void handleRunningState() throws JobException { streamingInsertJob.processTimeoutTasks(); streamingInsertJob.fetchMeta(); + streamingInsertJob.advanceSplitsIfNeed(); } private void autoResumeHandler() throws JobException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java index 87eca253c46577..a6f9e582bff91d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java @@ -45,11 +45,11 @@ public interface SourceOffsetProvider { default void ensureInitialized(Long jobId, Map originTvfProps) throws JobException {} /** - * Performs one-time initialization that must run only on fresh job creation, not on FE restart. - * For example, fetching and persisting snapshot splits to the meta table. + * One-time initialization on fresh job creation (not on FE restart). Subclasses may + * initialize split progress, fetch initial splits, or open remote readers. * Default: no-op (most providers need no extra setup). */ - default void initOnCreate() throws JobException {} + default void initOnCreate(List syncTables) throws JobException {} /** * Get next offset to consume @@ -180,6 +180,24 @@ default boolean hasReachedEnd() { return false; } + /** + * Advance one batch of split fetching, called by scheduler each tick during PENDING/RUNNING. + * For providers without async splitting work (e.g. S3, Kafka), default is no-op. + * Aligned with flink-cdc SnapshotSplitAssigner naming. + * + * @throws JobException if fetching splits fails fatally + */ + default void advanceSplits() throws JobException {} + + /** + * Returns true if no more splits will be produced. + * For providers without splitting concept, always returns true. + * Aligned with flink-cdc SnapshotSplitAssigner.noMoreSplits() naming. + */ + default boolean noMoreSplits() { + return true; + } + /** * Get the lag of the data source in seconds. * For CDC sources, lag = (now - last consumed event timestamp) in seconds. diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java index bada3a1d7a77d2..d5642cb277ff43 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java @@ -58,16 +58,20 @@ import org.apache.commons.collections4.MapUtils; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; -import java.util.LinkedHashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; +import java.util.stream.Stream; @Getter @Setter @@ -95,10 +99,23 @@ public class JdbcSourceOffsetProvider implements SourceOffsetProvider { @SerializedName("ts") String tableSchemas; + /** Split progress (task-commit view). */ + @SerializedName("csp") + SplitProgress committedSplitProgress; + volatile boolean hasMoreData = true; transient volatile String cloudCluster; + /** Split progress (cdc-fetch view), >= committedSplitProgress. Rebuilt on restart. */ + transient SplitProgress cdcSplitProgress = new SplitProgress(); + + /** Cache of Job.syncTables, set by initSplitProgress / replayIfNeed. */ + transient List cachedSyncTables; + + /** Guards cdcSplitProgress/committedSplitProgress/remainingSplits/finishedSplits. */ + protected final transient Object splitsLock = new Object(); + /** * No-arg constructor for subclass use. */ @@ -135,22 +152,26 @@ public String getSourceType() { @Override public Offset getNextOffset(StreamingJobProperties jobProps, Map properties) { - JdbcOffset nextOffset = new JdbcOffset(); - if (!remainingSplits.isEmpty()) { - int splitsNum = Math.min(remainingSplits.size(), snapshotParallelism); - List snapshotSplits = new ArrayList<>(remainingSplits.subList(0, splitsNum)); - nextOffset.setSplits(snapshotSplits); - return nextOffset; - } else if (currentOffset != null && currentOffset.snapshotSplit()) { - // initial mode: snapshot to binlog - // snapshot-only mode must be intercepted by hasReachedEnd() before reaching here - BinlogSplit binlogSplit = new BinlogSplit(); - binlogSplit.setFinishedSplits(finishedSplits); - nextOffset.setSplits(Collections.singletonList(binlogSplit)); - return nextOffset; - } else { - // only binlog - return currentOffset == null ? new JdbcOffset(Collections.singletonList(new BinlogSplit())) : currentOffset; + synchronized (splitsLock) { + JdbcOffset nextOffset = new JdbcOffset(); + if (!remainingSplits.isEmpty()) { + int splitsNum = Math.min(remainingSplits.size(), snapshotParallelism); + List snapshotSplits = new ArrayList<>(remainingSplits.subList(0, splitsNum)); + nextOffset.setSplits(snapshotSplits); + return nextOffset; + } else if (currentOffset != null && currentOffset.snapshotSplit() && noMoreSplits()) { + // initial mode: snapshot to binlog. noMoreSplits() guards against switching while + // splitting is still in progress (remainingSplits empty doesn't mean fully cut). + // snapshot-only mode is intercepted by hasReachedEnd() before reaching here. + BinlogSplit binlogSplit = new BinlogSplit(); + binlogSplit.setFinishedSplits(new ArrayList<>(finishedSplits)); + nextOffset.setSplits(Collections.singletonList(binlogSplit)); + return nextOffset; + } else { + // only binlog + return currentOffset == null + ? new JdbcOffset(Collections.singletonList(new BinlogSplit())) : currentOffset; + } } } @@ -194,26 +215,34 @@ public InsertIntoTableCommand rewriteTvfParams(InsertIntoTableCommand originComm public void updateOffset(Offset offset) { this.currentOffset = (JdbcOffset) offset; if (currentOffset.snapshotSplit()) { - List splits = currentOffset.getSplits(); - for (AbstractSourceSplit split : splits) { - SnapshotSplit snapshotSplit = (SnapshotSplit) split; - String splitId = split.getSplitId(); - boolean remove = remainingSplits.removeIf(v -> { - if (v.getSplitId().equals(splitId)) { - snapshotSplit.setTableId(v.getTableId()); - snapshotSplit.setSplitKey(v.getSplitKey()); - snapshotSplit.setSplitStart(v.getSplitStart()); - snapshotSplit.setSplitEnd(v.getSplitEnd()); - return true; + synchronized (splitsLock) { + List splits = currentOffset.getSplits(); + for (AbstractSourceSplit split : splits) { + SnapshotSplit snapshotSplit = (SnapshotSplit) split; + String splitId = split.getSplitId(); + boolean remove = remainingSplits.removeIf(v -> { + if (v.getSplitId().equals(splitId)) { + snapshotSplit.setTableId(v.getTableId()); + snapshotSplit.setSplitKey(v.getSplitKey()); + snapshotSplit.setSplitStart(v.getSplitStart()); + snapshotSplit.setSplitEnd(v.getSplitEnd()); + return true; + } + return false; + }); + if (remove) { + finishedSplits.add(snapshotSplit); + chunkHighWatermarkMap.computeIfAbsent(snapshotSplit.getTableId(), k -> new HashMap<>()) + .put(snapshotSplit.getSplitId(), snapshotSplit.getHighWatermark()); + + // Advance committedSplitProgress to this committed chunk. + if (committedSplitProgress != null) { + applySplitToProgress(committedSplitProgress, snapshotSplit); + } + } else { + // Replay before remainingSplits is restored, or a duplicate commit. + log.warn("Cannot find snapshot split {} in remainingSplits for job {}", splitId, getJobId()); } - return false; - }); - if (remove) { - finishedSplits.add(snapshotSplit); - chunkHighWatermarkMap.computeIfAbsent(snapshotSplit.getTableId(), k -> new HashMap<>()) - .put(snapshotSplit.getSplitId(), snapshotSplit.getHighWatermark()); - } else { - log.warn("Cannot find snapshot split {} in remainingSplits for job {}", splitId, getJobId()); } } } else { @@ -278,19 +307,26 @@ public boolean hasMoreDataToConsume() { return true; } - if (currentOffset.snapshotSplit()) { - if (isSnapshotOnlyMode() && remainingSplits.isEmpty()) { - return false; + synchronized (splitsLock) { + if (currentOffset.snapshotSplit()) { + if (!remainingSplits.isEmpty()) { + return true; + } + // Splitting still in progress: wait for next tick. + if (!noMoreSplits()) { + return false; + } + // Splitting done: snapshot-only completes; initial mode falls through to binlog. + return !isSnapshotOnlyMode(); } - return true; - } - if (!hasMoreData) { - return false; - } + if (!hasMoreData) { + return false; + } - if (CollectionUtils.isNotEmpty(remainingSplits)) { - return true; + if (CollectionUtils.isNotEmpty(remainingSplits)) { + return true; + } } if (MapUtils.isEmpty(endBinlogOffset)) { return false; @@ -420,6 +456,10 @@ public void validateAlterOffset(String offset) throws Exception { */ @Override public void replayIfNeed(StreamingInsertJob job) throws JobException { + synchronized (splitsLock) { + this.cachedSyncTables = job.getSyncTables(); + } + String offsetProviderPersist = job.getOffsetProviderPersist(); if (offsetProviderPersist != null) { JdbcSourceOffsetProvider replayFromPersist = GsonUtils.GSON.fromJson(offsetProviderPersist, @@ -427,6 +467,11 @@ public void replayIfNeed(StreamingInsertJob job) throws JobException { this.binlogOffsetPersist = replayFromPersist.getBinlogOffsetPersist(); this.chunkHighWatermarkMap = replayFromPersist.getChunkHighWatermarkMap(); this.tableSchemas = replayFromPersist.getTableSchemas(); + synchronized (splitsLock) { + this.committedSplitProgress = replayFromPersist.getCommittedSplitProgress() != null + ? replayFromPersist.getCommittedSplitProgress() : new SplitProgress(); + this.cdcSplitProgress = this.committedSplitProgress.copy(); + } log.info("Replaying offset provider for job {}, binlogOffset size {}, chunkHighWatermark size {}", getJobId(), binlogOffsetPersist == null ? 0 : binlogOffsetPersist.size(), @@ -473,6 +518,58 @@ public void replayIfNeed(StreamingInsertJob job) throws JobException { } else { log.info("No need to replay offset provider for job {}", getJobId()); } + + // Resume cdcSplitProgress from the at-most-one table cut to mid so the next + // advanceSplits() RPC won't re-cut already-fetched splits. + synchronized (splitsLock) { + if (cachedSyncTables == null || cachedSyncTables.isEmpty()) { + return; + } + SnapshotSplit mid = findResumeMidSplit(cachedSyncTables, finishedSplits, remainingSplits); + if (mid != null) { + applySplitToProgress(cdcSplitProgress, mid); + } else { + clearProgress(cdcSplitProgress); + } + log.info("Replay summary for job {}: finishedSplits={}, remainingSplits={}, " + + "committedSplitProgress=(table={}, nextStart={}, nextSplitId={}), " + + "cdcSplitProgress=(table={}, nextStart={}, nextSplitId={})", + getJobId(), finishedSplits.size(), remainingSplits.size(), + committedSplitProgress == null ? null : committedSplitProgress.getCurrentSplittingTable(), + committedSplitProgress == null ? null : Arrays.toString(committedSplitProgress.getNextSplitStart()), + committedSplitProgress == null ? null : committedSplitProgress.getNextSplitId(), + cdcSplitProgress.getCurrentSplittingTable(), + Arrays.toString(cdcSplitProgress.getNextSplitStart()), + cdcSplitProgress.getNextSplitId()); + } + } + + /** + * Find the at-most-one table cut to mid (its largest-id split has non-null splitEnd). + * Returns null when every table in {@code syncTables} is either untouched or fully cut. + */ + static SnapshotSplit findResumeMidSplit(List syncTables, + List finishedSplits, + List remainingSplits) { + Map lastPerTable = new HashMap<>(); + pickLastById(finishedSplits, lastPerTable); + pickLastById(remainingSplits, lastPerTable); + for (String tbl : syncTables) { + SnapshotSplit last = lastPerTable.get(tbl); + if (last != null && last.getSplitEnd() != null && last.getSplitEnd().length > 0) { + return last; + } + } + return null; + } + + private static void pickLastById(List splits, Map out) { + for (SnapshotSplit s : splits) { + SnapshotSplit prev = out.get(s.getTableId()); + if (prev == null || splitIdOf(s.getSplitId()) > splitIdOf(prev.getSplitId())) { + out.put(s.getTableId(), s); + } + } } /** @@ -533,78 +630,193 @@ public String getPersistInfo() { return GsonUtils.GSON.toJson(this); } - public void splitChunks(List createTbls) throws JobException { - // todo: When splitting takes a long time, it needs to be changed to asynchronous. - if (checkNeedSplitChunks(sourceProperties)) { - Map> tableSplits = new LinkedHashMap<>(); - for (String tbl : createTbls) { - List snapshotSplits = requestTableSplits(tbl); - tableSplits.put(tbl, snapshotSplits); + // ============ Async split progress (driven by scheduler each tick) ============ + + /** + * One-time setup at CREATE. + * - initial/snapshot mode: init split progress; scheduler will drive advanceSplits() each tick. + * - latest mode (and other non-splitting modes): open the remote reader (e.g. PG slot) so the + * binlog phase can start immediately; no snapshot splitting will happen. + */ + @Override + public void initOnCreate(List syncTables) throws JobException { + if (!checkNeedSplitChunks(sourceProperties)) { + initSourceReader(); + return; + } + synchronized (splitsLock) { + this.cachedSyncTables = syncTables; + this.committedSplitProgress = new SplitProgress(); + this.cdcSplitProgress = new SplitProgress(); + } + } + + @Override + public boolean noMoreSplits() { + if (!checkNeedSplitChunks(sourceProperties)) { + return true; + } + synchronized (splitsLock) { + return cdcSplitProgress.getCurrentSplittingTable() == null + && computeCdcRemainingTables().isEmpty(); + } + } + + /** Tables not yet touched by cdc fetching. Caller must hold splitsLock. */ + private List computeCdcRemainingTables() { + if (cachedSyncTables == null || cachedSyncTables.isEmpty()) { + return Collections.emptyList(); + } + Set touched = new HashSet<>(); + for (SnapshotSplit s : finishedSplits) { + touched.add(s.getTableId()); + } + for (SnapshotSplit s : remainingSplits) { + touched.add(s.getTableId()); + } + if (cdcSplitProgress.getCurrentSplittingTable() != null) { + touched.add(cdcSplitProgress.getCurrentSplittingTable()); + } + List result = new ArrayList<>(cachedSyncTables.size()); + for (String t : cachedSyncTables) { + if (!touched.contains(t)) { + result.add(t); + } + } + return result; + } + + @Override + public void advanceSplits() throws JobException { + synchronized (splitsLock) { + // 1. Pick next table if not currently splitting one. + if (cdcSplitProgress.getCurrentSplittingTable() == null) { + List remaining = computeCdcRemainingTables(); + if (remaining.isEmpty()) { + return; + } + cdcSplitProgress.setCurrentSplittingTable(remaining.get(0)); + cdcSplitProgress.setNextSplitStart(null); + cdcSplitProgress.setNextSplitId(null); + } + String tbl = cdcSplitProgress.getCurrentSplittingTable(); + Object[] startVal = cdcSplitProgress.getNextSplitStart(); + Integer splitId = cdcSplitProgress.getNextSplitId(); + + // 2. RPC under lock — updateOffset may wait briefly + List batch = rpcFetchSplitsBatch(tbl, startVal, splitId); + if (batch == null || batch.isEmpty()) { + return; + } + + // 3. mergeBySplitId (defensive dedup). + Set existingIds = new HashSet<>(); + finishedSplits.forEach(s -> existingIds.add(s.getSplitId())); + remainingSplits.forEach(s -> existingIds.add(s.getSplitId())); + List newSplits = new ArrayList<>(); + for (SnapshotSplit s : batch) { + if (!existingIds.contains(s.getSplitId())) { + newSplits.add(s); + } } - // save chunk list to system table - saveChunkMeta(tableSplits); - this.remainingSplits = tableSplits.values().stream() - .flatMap(List::stream) + if (newSplits.size() < batch.size()) { + log.info("advanceSplits dedup'd {} duplicate splits (batch={}, new={}) for job {} table {}", + batch.size() - newSplits.size(), batch.size(), newSplits.size(), getJobId(), tbl); + } + remainingSplits.addAll(newSplits); + + // 4. UPSERT this table's full chunk_list to system table. + List splitsOfTbl = Stream.concat( + finishedSplits.stream(), remainingSplits.stream()) + .filter(s -> tbl.equals(s.getTableId())) + .sorted(Comparator.comparingInt(s -> splitIdOf(s.getSplitId()))) .collect(Collectors.toList()); - } else { - // The source reader is automatically initialized when the split is obtained. - // In latest mode, a separate init is required.init source reader - initSourceReader(); + try { + StreamingJobUtils.upsertChunkList(getJobId(), tbl, splitsOfTbl); + } catch (Exception e) { + throw new JobException("UPSERT chunk_list failed for " + tbl + ": " + e.getMessage()); + } + + // 5. Advance cdcSplitProgress to the last split in the batch. + applySplitToProgress(cdcSplitProgress, batch.get(batch.size() - 1)); + log.info("advanceSplits jobId={} table={} request(nextStart={}, nextSplitId={}) " + + "got {} new splits, cdcSplitProgress -> (table={}, nextStart={}, nextSplitId={})", + getJobId(), tbl, Arrays.toString(startVal), splitId, newSplits.size(), + cdcSplitProgress.getCurrentSplittingTable(), + Arrays.toString(cdcSplitProgress.getNextSplitStart()), + cdcSplitProgress.getNextSplitId()); } } - private void saveChunkMeta(Map> tableSplits) throws JobException { + /** Parse the trailing integer id from flink-cdc splitId format "tableId:id". */ + static int splitIdOf(String splitId) { + if (splitId == null) { + throw new IllegalArgumentException("splitId is null"); + } + int colon = splitId.lastIndexOf(':'); + if (colon < 0 || colon == splitId.length() - 1) { + throw new IllegalArgumentException("malformed splitId, expected 'tableId:id': " + splitId); + } try { - StreamingJobUtils.createMetaTableIfNotExist(); - StreamingJobUtils.insertSplitsToMeta(getJobId(), tableSplits); - } catch (Exception e) { - log.warn("save chunk meta error: ", e); - throw new JobException(e.getMessage()); + return Integer.parseInt(splitId.substring(colon + 1)); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("malformed splitId, expected 'tableId:id': " + splitId, e); } } - private List requestTableSplits(String table) throws JobException { + /** Reset progress to "no table being split" state. */ + private static void clearProgress(SplitProgress progress) { + progress.setCurrentSplittingTable(null); + progress.setNextSplitStart(null); + progress.setNextSplitId(null); + } + + /** + * Apply a split's position to a progress object. + * - splitEnd null/empty (final split of table) → clear all fields. + * - splitEnd non-empty → set currentSplittingTable to split.tableId, advance start/id. + */ + private static void applySplitToProgress(SplitProgress progress, SnapshotSplit split) { + if (split.getSplitEnd() == null || split.getSplitEnd().length == 0) { + clearProgress(progress); + } else { + progress.setCurrentSplittingTable(split.getTableId()); + progress.setNextSplitStart(split.getSplitEnd()); + progress.setNextSplitId(splitIdOf(split.getSplitId()) + 1); + } + } + + /** RPC fetchSplits with (table, nextSplitStart, nextSplitId, batchSize). protected for UT subclass. */ + protected List rpcFetchSplitsBatch(String table, Object[] nextSplitStart, Integer nextSplitId) + throws JobException { Backend backend = StreamingJobUtils.selectBackend(cloudCluster); - FetchTableSplitsRequest requestParams = - new FetchTableSplitsRequest(getJobId(), sourceType.name(), - sourceProperties, getFrontendAddress(), table); + FetchTableSplitsRequest requestParams = new FetchTableSplitsRequest( + getJobId(), sourceType.name(), sourceProperties, getFrontendAddress(), table); + requestParams.setNextSplitStart(nextSplitStart); + requestParams.setNextSplitId(nextSplitId); + requestParams.setBatchSize(Config.streaming_cdc_fetch_splits_batch_size); InternalService.PRequestCdcClientRequest request = InternalService.PRequestCdcClientRequest.newBuilder() .setApi("/api/fetchSplits") - .setParams(new Gson().toJson(requestParams)).build(); + .setParams(new Gson().toJson(requestParams)) + .build(); TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort()); - InternalService.PRequestCdcClientResult result = null; try { Future future = BackendServiceProxy.getInstance() .requestCdcClient(address, request, Config.streaming_cdc_heavy_rpc_timeout_sec); - result = future.get(Config.streaming_cdc_heavy_rpc_timeout_sec, TimeUnit.SECONDS); + PRequestCdcClientResult result = future.get( + Config.streaming_cdc_heavy_rpc_timeout_sec, TimeUnit.SECONDS); TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode()); if (code != TStatusCode.OK) { - log.warn("Failed to get split from backend, {}", result.getStatus().getErrorMsgs(0)); - throw new JobException( - "Failed to get split from backend," + result.getStatus().getErrorMsgs(0) + ", response: " - + result.getResponse()); - } - String response = result.getResponse(); - try { - ResponseBody> responseObj = objectMapper.readValue( - response, - new TypeReference>>() { - } - ); - List splits = responseObj.getData(); - return splits; - } catch (JsonProcessingException e) { - log.warn("Failed to parse split response: {}", response); - throw new JobException("Failed to parse split response: " + response); + throw new JobException("fetchSplits backend error: " + result.getStatus().getErrorMsgs(0)); } + ResponseBody> resp = objectMapper.readValue( + result.getResponse(), + new TypeReference>>() {}); + return resp.getData(); } catch (TimeoutException te) { - log.warn("cdc_client RPC timeout api=/api/fetchSplits jobId={} backend={}:{} table={} timeout_sec={}", - getJobId(), backend.getHost(), backend.getBrpcPort(), table, - Config.streaming_cdc_heavy_rpc_timeout_sec); - throw new JobException("cdc_client RPC timeout: /api/fetchSplits jobId=" + getJobId() + " table=" + table); - } catch (ExecutionException | InterruptedException ex) { - log.warn("Get splits error: ", ex); - throw new JobException(ex); + throw new JobException("fetchSplits RPC timeout: jobId=" + getJobId() + " table=" + table); + } catch (Exception ex) { + throw new JobException("fetchSplits failed: " + ex.getMessage()); } } @@ -673,9 +885,14 @@ public void onTaskCommitted(long scannedRows, long loadBytes) { @Override public boolean hasReachedEnd() { - return isSnapshotOnlyMode() - && CollectionUtils.isNotEmpty(finishedSplits) - && remainingSplits.isEmpty(); + if (!isSnapshotOnlyMode()) { + return false; + } + synchronized (splitsLock) { + return CollectionUtils.isNotEmpty(finishedSplits) + && remainingSplits.isEmpty() + && noMoreSplits(); + } } /** @@ -765,4 +982,31 @@ public void cleanMeta(Long jobId) throws JobException { private String getFrontendAddress() { return Env.getCurrentEnv().getMasterHost() + ":" + Env.getCurrentEnv().getMasterHttpPort(); } + + + /** Mirrors flink-cdc ChunkSplitterState. */ + @Getter + @Setter + public static class SplitProgress { + @SerializedName("ct") + private String currentSplittingTable; + + @SerializedName("ns") + private Object[] nextSplitStart; + + @SerializedName("ni") + private Integer nextSplitId; + + public SplitProgress() {} + + /** Deep copy for transferring committed -> cdc view on restart. */ + public SplitProgress copy() { + SplitProgress c = new SplitProgress(); + c.currentSplittingTable = this.currentSplittingTable; + c.nextSplitStart = this.nextSplitStart == null + ? null : Arrays.copyOf(this.nextSplitStart, this.nextSplitStart.length); + c.nextSplitId = this.nextSplitId; + return c; + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcTvfSourceOffsetProvider.java b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcTvfSourceOffsetProvider.java index 749b592c775832..3565deb0a153db 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcTvfSourceOffsetProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcTvfSourceOffsetProvider.java @@ -91,11 +91,7 @@ public JdbcTvfSourceOffsetProvider() { super(); } - /** - * Initializes provider state and fetches snapshot splits from BE. - * splitChunks is called here (rather than in StreamingInsertJob) to keep - * all cdc_stream-specific init logic inside the provider. - */ + /** Initializes provider state from TVF properties; called every schedule tick. */ @Override public void ensureInitialized(Long jobId, Map originTvfProps) throws JobException { String type = originTvfProps.get(DataSourceConfigKeys.TYPE); @@ -124,16 +120,6 @@ public void ensureInitialized(Long jobId, Map originTvfProps) th Preconditions.checkArgument(table != null, "table is required for cdc_stream TVF"); } - /** - * Called once on fresh job creation (not on FE restart). - * Fetches snapshot splits from BE and persists them to the meta table. - */ - @Override - public void initOnCreate() throws JobException { - String table = sourceProperties.get(DataSourceConfigKeys.TABLE); - splitChunks(Collections.singletonList(table)); - } - /** * Rewrites the cdc_stream TVF SQL with current offset meta and taskId, * so the BE knows where to start reading and can report @@ -279,23 +265,25 @@ private List> buildCumulativeSnapshotOffset( public void updateOffset(Offset offset) { this.currentOffset = (JdbcOffset) offset; if (currentOffset.snapshotSplit()) { - for (AbstractSourceSplit split : currentOffset.getSplits()) { - SnapshotSplit ss = (SnapshotSplit) split; - boolean removed = remainingSplits.removeIf(v -> { - if (v.getSplitId().equals(ss.getSplitId())) { - ss.setTableId(v.getTableId()); - ss.setSplitKey(v.getSplitKey()); - ss.setSplitStart(v.getSplitStart()); - ss.setSplitEnd(v.getSplitEnd()); - return true; + synchronized (splitsLock) { + for (AbstractSourceSplit split : currentOffset.getSplits()) { + SnapshotSplit ss = (SnapshotSplit) split; + boolean removed = remainingSplits.removeIf(v -> { + if (v.getSplitId().equals(ss.getSplitId())) { + ss.setTableId(v.getTableId()); + ss.setSplitKey(v.getSplitKey()); + ss.setSplitStart(v.getSplitStart()); + ss.setSplitEnd(v.getSplitEnd()); + return true; + } + return false; + }); + if (removed) { + finishedSplits.add(ss); } - return false; - }); - if (removed) { - finishedSplits.add(ss); + chunkHighWatermarkMap.computeIfAbsent(buildTableKey(), k -> new HashMap<>()) + .put(ss.getSplitId(), ss.getHighWatermark()); } - chunkHighWatermarkMap.computeIfAbsent(buildTableKey(), k -> new HashMap<>()) - .put(ss.getSplitId(), ss.getHighWatermark()); } } // Binlog: currentOffset is already set; no binlogOffsetPersist needed for TVF path. @@ -312,9 +300,12 @@ public void updateOffset(Offset offset) { */ @Override public void replayIfNeed(StreamingInsertJob job) throws JobException { + synchronized (splitsLock) { + this.cachedSyncTables = job.getSyncTables(); + } if (currentOffset == null) { // No committed txn yet. If snapshot splits exist in the meta table (written by - // initOnCreate), restore remainingSplits so getNextOffset() returns snapshot splits + // advanceSplits), restore remainingSplits so getNextOffset() returns snapshot splits // instead of a BinlogSplit (which would incorrectly skip the snapshot phase). Map> snapshotSplits = StreamingJobUtils.restoreSplitsToJob(job.getJobId()); if (MapUtils.isNotEmpty(snapshotSplits)) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java index 56c9559699195d..560251b83800bc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java @@ -101,6 +101,12 @@ public class StreamingJobUtils { private static final String SELECT_SPLITS_TABLE_TEMPLATE = "SELECT table_name, chunk_list FROM " + FULL_QUALIFIED_META_TBL_NAME + " WHERE job_id='%s' ORDER BY id ASC"; + private static final String SELECT_TABLE_ID_TEMPLATE = + "SELECT id FROM " + FULL_QUALIFIED_META_TBL_NAME + " WHERE job_id='%s' AND table_name='%s'"; + + private static final String SELECT_MAX_ID_TEMPLATE = + "SELECT IFNULL(MAX(id), 0) FROM " + FULL_QUALIFIED_META_TBL_NAME + " WHERE job_id='%s'"; + private static final String DELETE_JOB_META_TEMPLATE = "DELETE FROM " + FULL_QUALIFIED_META_TBL_NAME + " WHERE job_id='%s'"; @@ -163,21 +169,52 @@ public static void deleteJobMeta(Long jobId) { } } - public static void insertSplitsToMeta(Long jobId, Map> tableSplits) throws Exception { - List values = new ArrayList<>(); - int index = 1; - for (Map.Entry> entry : tableSplits.entrySet()) { - Map params = new HashMap<>(); - params.put("id", index + ""); - params.put("job_id", jobId + ""); - params.put("table_name", entry.getKey()); - params.put("chunk_list", objectMapper.writeValueAsString(entry.getValue())); - StringSubstitutor stringSubstitutor = new StringSubstitutor(params); - String sql = stringSubstitutor.replace(INSERT_INTO_META_TABLE_TEMPLATE); - values.add(sql); - index++; + /** + * UPSERT a single table's chunk_list. id is reused if the table already has a row, + * otherwise allocated as MAX(id)+1. Relies on UNIQUE KEY (id, job_id) for in-place override. + */ + public static void upsertChunkList(Long jobId, String tableName, List chunks) throws Exception { + createMetaTableIfNotExist(); + Integer id = querySingleTableId(jobId, tableName); + if (id == null) { + id = queryNextAvailableId(jobId); + } + Map params = new HashMap<>(); + params.put("id", String.valueOf(id)); + params.put("job_id", String.valueOf(jobId)); + params.put("table_name", tableName); + params.put("chunk_list", objectMapper.writeValueAsString(chunks)); + StringSubstitutor sub = new StringSubstitutor(params); + String sql = sub.replace(INSERT_INTO_META_TABLE_TEMPLATE); + batchInsert(Collections.singletonList(sql)); + } + + /** Returns id of the row matching (jobId, tableName), or null if no such row exists. */ + private static Integer querySingleTableId(Long jobId, String tableName) throws JobException { + String sql = String.format(SELECT_TABLE_ID_TEMPLATE, jobId, tableName); + try (AutoCloseConnectContext ctx = new AutoCloseConnectContext(buildConnectContext())) { + StmtExecutor stmtExecutor = new StmtExecutor(ctx.connectContext, sql); + List rows = stmtExecutor.executeInternalQuery(); + if (rows == null || rows.isEmpty()) { + return null; + } + return Integer.parseInt(rows.get(0).get(0)); + } catch (Exception e) { + throw new JobException("query table id failed: " + e.getMessage()); + } + } + + /** Returns MAX(id) + 1 for this job, or 1 if no rows yet. */ + private static int queryNextAvailableId(Long jobId) throws JobException { + String sql = String.format(SELECT_MAX_ID_TEMPLATE, jobId); + try (AutoCloseConnectContext ctx = new AutoCloseConnectContext(buildConnectContext())) { + StmtExecutor stmtExecutor = new StmtExecutor(ctx.connectContext, sql); + List rows = stmtExecutor.executeInternalQuery(); + int max = (rows == null || rows.isEmpty()) ? 0 : Integer.parseInt(rows.get(0).get(0)); + return max + 1; + } catch (Exception e) { + throw new JobException("query next available id failed: " + e.getMessage()); } - batchInsert(values); } private static void batchInsert(List values) throws Exception { diff --git a/fe/fe-core/src/test/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProviderAsyncSplitTest.java b/fe/fe-core/src/test/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProviderAsyncSplitTest.java new file mode 100644 index 00000000000000..9d8f52460656ca --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProviderAsyncSplitTest.java @@ -0,0 +1,449 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.job.offset.jdbc; + +import org.apache.doris.job.cdc.split.SnapshotSplit; +import org.apache.doris.job.exception.JobException; +import org.apache.doris.job.util.StreamingJobUtils; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentMatchers; +import org.mockito.MockedStatic; +import org.mockito.Mockito; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Deque; +import java.util.List; + +/** + * Tests the async split state machine in {@link JdbcSourceOffsetProvider}: + * advanceSplits / noMoreSplits / updateOffset / dedup. RPC and the system-table + * UPSERT are stubbed so the test focuses purely on in-memory state transitions. + */ +public class JdbcSourceOffsetProviderAsyncSplitTest { + + /** Records each rpcFetchSplitsBatch invocation; used for argument assertions. */ + static final class RpcCall { + final String table; + final Object[] startVal; + final Integer splitId; + + RpcCall(String table, Object[] startVal, Integer splitId) { + this.table = table; + this.startVal = startVal; + this.splitId = splitId; + } + } + + /** Provider under test with rpcFetchSplitsBatch stubbed to dequeue prepared batches. */ + static final class TestableProvider extends JdbcSourceOffsetProvider { + final Deque> mockBatches = new ArrayDeque<>(); + final List rpcCalls = new ArrayList<>(); + + TestableProvider() { + super(); + // Default to initial mode so initOnCreate() takes the splitting path + // (latest mode would try to call initSourceReader against a real backend). + this.sourceProperties.put( + org.apache.doris.job.cdc.DataSourceConfigKeys.OFFSET, + org.apache.doris.job.cdc.DataSourceConfigKeys.OFFSET_INITIAL); + } + + @Override + protected List rpcFetchSplitsBatch(String table, Object[] startVal, Integer splitId) { + rpcCalls.add(new RpcCall(table, startVal, splitId)); + if (mockBatches.isEmpty()) { + return Collections.emptyList(); + } + return mockBatches.poll(); + } + + @Override + public Long getJobId() { + return 999L; + } + } + + private TestableProvider provider; + private MockedStatic utilsMock; + + @Before + public void setup() { + provider = new TestableProvider(); + utilsMock = Mockito.mockStatic(StreamingJobUtils.class); + utilsMock.when(() -> StreamingJobUtils.upsertChunkList( + ArgumentMatchers.anyLong(), + ArgumentMatchers.anyString(), + ArgumentMatchers.any())) + .then(invocation -> null); + } + + @org.junit.After + public void tearDown() { + if (utilsMock != null) { + utilsMock.close(); + } + } + + /** Helper to build a SnapshotSplit. start/end are wrapped in Object[] only when non-null. */ + private static SnapshotSplit split(String tableId, int chunkId, Long start, Long end) { + return new SnapshotSplit( + tableId + ":" + chunkId, + tableId, + Collections.singletonList("id"), + start == null ? null : new Object[]{start}, + end == null ? null : new Object[]{end}, + Collections.singletonMap("file", "binlog.000001")); + } + + // ===== initOnCreate / noMoreSplits ===== + + @Test + public void testInitWithEmptySyncTablesIsAllDone() throws JobException { + provider.initOnCreate(Collections.emptyList()); + Assert.assertTrue(provider.noMoreSplits()); + } + + @Test + public void testInitWithSyncTablesNotDone() throws JobException { + provider.initOnCreate(Arrays.asList("db.tbl_a")); + Assert.assertNotNull(provider.committedSplitProgress); + Assert.assertNotNull(provider.cdcSplitProgress); + Assert.assertNull(provider.cdcSplitProgress.getCurrentSplittingTable()); + Assert.assertFalse(provider.noMoreSplits()); + } + + // ===== advanceSplits ===== + + @Test + public void testAdvanceFirstCallPicksFirstTableWithNullStart() throws JobException { + provider.initOnCreate(Arrays.asList("db.tbl_a", "db.tbl_b")); + provider.mockBatches.add(Arrays.asList( + split("db.tbl_a", 0, null, 100L), + split("db.tbl_a", 1, 100L, 200L))); + + provider.advanceSplits(); + + Assert.assertEquals(2, provider.remainingSplits.size()); + Assert.assertEquals("db.tbl_a", provider.cdcSplitProgress.getCurrentSplittingTable()); + Assert.assertArrayEquals(new Object[]{200L}, provider.cdcSplitProgress.getNextSplitStart()); + Assert.assertEquals(Integer.valueOf(2), provider.cdcSplitProgress.getNextSplitId()); + + Assert.assertEquals(1, provider.rpcCalls.size()); + RpcCall first = provider.rpcCalls.get(0); + Assert.assertEquals("db.tbl_a", first.table); + Assert.assertNull("first call should pass null nextSplitStart (= START_BOUND)", first.startVal); + Assert.assertNull(first.splitId); + } + + @Test + public void testAdvanceContinuesOnSameTableAfterFirstBatch() throws JobException { + provider.initOnCreate(Arrays.asList("db.tbl_a")); + provider.mockBatches.add(Arrays.asList(split("db.tbl_a", 0, null, 100L))); + provider.mockBatches.add(Arrays.asList(split("db.tbl_a", 1, 100L, 200L))); + + provider.advanceSplits(); + provider.advanceSplits(); + + Assert.assertEquals(2, provider.rpcCalls.size()); + RpcCall second = provider.rpcCalls.get(1); + Assert.assertEquals("db.tbl_a", second.table); + Assert.assertArrayEquals(new Object[]{100L}, second.startVal); + Assert.assertEquals(Integer.valueOf(1), second.splitId); + Assert.assertEquals(2, provider.remainingSplits.size()); + } + + @Test + public void testAdvanceTableDoneSwitchesToNextTable() throws JobException { + provider.initOnCreate(Arrays.asList("db.tbl_a", "db.tbl_b")); + // tbl_a's last chunk: splitEnd=null + provider.mockBatches.add(Arrays.asList(split("db.tbl_a", 0, null, null))); + // 2nd advance picks tbl_b + provider.mockBatches.add(Arrays.asList(split("db.tbl_b", 0, null, 50L))); + + provider.advanceSplits(); + Assert.assertNull("after tbl_a done, currentSplittingTable should clear", + provider.cdcSplitProgress.getCurrentSplittingTable()); + Assert.assertFalse("tbl_b still pending", provider.noMoreSplits()); + + provider.advanceSplits(); + + Assert.assertEquals(2, provider.rpcCalls.size()); + Assert.assertEquals("db.tbl_b", provider.rpcCalls.get(1).table); + Assert.assertEquals("db.tbl_b", provider.cdcSplitProgress.getCurrentSplittingTable()); + Assert.assertArrayEquals(new Object[]{50L}, provider.cdcSplitProgress.getNextSplitStart()); + } + + @Test + public void testAllSyncTablesDoneMakesNoMoreSplitsTrue() throws JobException { + provider.initOnCreate(Arrays.asList("db.tbl_a")); + provider.mockBatches.add(Arrays.asList(split("db.tbl_a", 0, null, null))); + + provider.advanceSplits(); + + Assert.assertTrue(provider.noMoreSplits()); + Assert.assertEquals(1, provider.remainingSplits.size()); + } + + @Test + public void testAdvanceSplitsDedupsBySplitId() throws JobException { + provider.initOnCreate(Arrays.asList("db.tbl_a")); + // Pre-existing split with same splitId; simulates a defensive dedup target + // (e.g. on FE restart after RPC succeeded but state wasn't fully advanced). + provider.remainingSplits.add(split("db.tbl_a", 0, null, 100L)); + provider.mockBatches.add(Arrays.asList(split("db.tbl_a", 0, null, 100L))); + + provider.advanceSplits(); + + Assert.assertEquals("duplicate splitId should be filtered out", + 1, provider.remainingSplits.size()); + } + + @Test + public void testAdvanceWithEmptyBatchIsNoop() throws JobException { + provider.initOnCreate(Arrays.asList("db.tbl_a")); + // mockBatches empty → rpcFetchSplitsBatch returns empty list + provider.advanceSplits(); + + Assert.assertEquals(0, provider.remainingSplits.size()); + // currentSplittingTable was set then RPC returned empty; we leave it set + // (next advance retries on same table from null start). Just assert no progress. + Assert.assertNull(provider.cdcSplitProgress.getNextSplitStart()); + Assert.assertNull(provider.cdcSplitProgress.getNextSplitId()); + } + + // ===== updateOffset advances committedSplitProgress ===== + + /** Build a commit-shaped SnapshotSplit: only splitId + HW are present (BE strips others). */ + private static SnapshotSplit commitSplit(String splitId) { + SnapshotSplit s = new SnapshotSplit(); + s.setSplitId(splitId); + s.setHighWatermark(Collections.singletonMap("file", "binlog.000002")); + return s; + } + + @Test + public void testUpdateOffsetAdvancesCommittedProgressOnMidChunk() throws JobException { + provider.initOnCreate(Arrays.asList("db.tbl_a")); + provider.mockBatches.add(Arrays.asList( + split("db.tbl_a", 0, null, 100L), + split("db.tbl_a", 1, 100L, 200L))); + provider.advanceSplits(); + + // Task commits chunk #0; updateOffset will copy splitEnd back from remainingSplits. + JdbcOffset endOffset = new JdbcOffset(Collections.singletonList(commitSplit("db.tbl_a:0"))); + provider.updateOffset(endOffset); + + Assert.assertEquals(1, provider.finishedSplits.size()); + Assert.assertEquals(1, provider.remainingSplits.size()); + + JdbcSourceOffsetProvider.SplitProgress committed = provider.committedSplitProgress; + Assert.assertEquals("db.tbl_a", committed.getCurrentSplittingTable()); + Assert.assertArrayEquals(new Object[]{100L}, committed.getNextSplitStart()); + Assert.assertEquals(Integer.valueOf(1), committed.getNextSplitId()); + } + + @Test + public void testUpdateOffsetLastChunkClearsCommittedProgress() throws JobException { + provider.initOnCreate(Arrays.asList("db.tbl_a")); + provider.mockBatches.add(Arrays.asList(split("db.tbl_a", 0, null, null))); + provider.advanceSplits(); + + JdbcOffset endOffset = new JdbcOffset(Collections.singletonList(commitSplit("db.tbl_a:0"))); + provider.updateOffset(endOffset); + + JdbcSourceOffsetProvider.SplitProgress committed = provider.committedSplitProgress; + Assert.assertNull(committed.getCurrentSplittingTable()); + Assert.assertNull(committed.getNextSplitStart()); + Assert.assertNull(committed.getNextSplitId()); + Assert.assertEquals(1, provider.finishedSplits.size()); + Assert.assertEquals(0, provider.remainingSplits.size()); + } + + @Test + public void testUpdateOffsetReplayPathSkipsWhenSplitMissing() throws JobException { + provider.initOnCreate(Arrays.asList("db.tbl_a")); + // remainingSplits is empty (simulates editlog replay path). + + JdbcOffset endOffset = new JdbcOffset(Collections.singletonList(commitSplit("db.tbl_a:0"))); + provider.updateOffset(endOffset); + + // committed progress untouched; finishedSplits not added (we have nothing to fill in). + Assert.assertNull(provider.committedSplitProgress.getCurrentSplittingTable()); + Assert.assertEquals(0, provider.finishedSplits.size()); + } + + // ===== computeCdcRemainingTables (covered indirectly via noMoreSplits) ===== + + @Test + public void testTouchedTablesRemovedFromRemaining() throws JobException { + provider.initOnCreate(Arrays.asList("db.tbl_a", "db.tbl_b", "db.tbl_c")); + provider.mockBatches.add(Arrays.asList(split("db.tbl_a", 0, null, null))); + provider.advanceSplits(); + + // tbl_a is now done (in remainingSplits + currentSplittingTable cleared). + // 2 more tables remain; noMoreSplits should still be false. + Assert.assertFalse(provider.noMoreSplits()); + Assert.assertNull(provider.cdcSplitProgress.getCurrentSplittingTable()); + + // 2nd advance picks tbl_b + provider.mockBatches.add(Arrays.asList(split("db.tbl_b", 0, null, null))); + provider.advanceSplits(); + Assert.assertEquals("db.tbl_b", provider.rpcCalls.get(1).table); + + // 3rd advance picks tbl_c + provider.mockBatches.add(Arrays.asList(split("db.tbl_c", 0, null, null))); + provider.advanceSplits(); + Assert.assertEquals("db.tbl_c", provider.rpcCalls.get(2).table); + + Assert.assertTrue(provider.noMoreSplits()); + } + + // ===== findResumeMidSplit (replay helper) ===== + + @Test + public void testFindResumeMidSplitSingleTableFullyCutReturnsNull() { + SnapshotSplit s0 = split("db.tbl_a", 0, null, 100L); + SnapshotSplit s1 = split("db.tbl_a", 1, 100L, null); // last, splitEnd=null + SnapshotSplit mid = JdbcSourceOffsetProvider.findResumeMidSplit( + Collections.singletonList("db.tbl_a"), + Arrays.asList(s0, s1), + Collections.emptyList()); + Assert.assertNull(mid); + } + + @Test + public void testFindResumeMidSplitSingleTableCutToMid() { + SnapshotSplit s0 = split("db.tbl_a", 0, null, 100L); + SnapshotSplit s1 = split("db.tbl_a", 1, 100L, 200L); // largest id, splitEnd non-null + SnapshotSplit mid = JdbcSourceOffsetProvider.findResumeMidSplit( + Collections.singletonList("db.tbl_a"), + Arrays.asList(s0, s1), + Collections.emptyList()); + Assert.assertNotNull(mid); + Assert.assertEquals("db.tbl_a:1", mid.getSplitId()); + Assert.assertArrayEquals(new Object[]{200L}, mid.getSplitEnd()); + } + + @Test + public void testFindResumeMidSplitMultiTableOnlyOneMid() { + // tbl_a fully cut; tbl_b cut to mid; tbl_c untouched + SnapshotSplit a0 = split("db.tbl_a", 0, null, null); + SnapshotSplit b0 = split("db.tbl_b", 0, null, 50L); + SnapshotSplit mid = JdbcSourceOffsetProvider.findResumeMidSplit( + Arrays.asList("db.tbl_a", "db.tbl_b", "db.tbl_c"), + Collections.singletonList(a0), + Collections.singletonList(b0)); + Assert.assertNotNull(mid); + Assert.assertEquals("db.tbl_b:0", mid.getSplitId()); + } + + @Test + public void testFindResumeMidSplitMaxIdSpreadAcrossLists() { + // last id is in remainingSplits (id=2), not finishedSplits (id=0,1) + SnapshotSplit f0 = split("db.tbl_a", 0, null, 100L); + SnapshotSplit f1 = split("db.tbl_a", 1, 100L, 200L); + SnapshotSplit r2 = split("db.tbl_a", 2, 200L, 300L); + SnapshotSplit mid = JdbcSourceOffsetProvider.findResumeMidSplit( + Collections.singletonList("db.tbl_a"), + Arrays.asList(f0, f1), + Collections.singletonList(r2)); + Assert.assertNotNull(mid); + Assert.assertEquals("db.tbl_a:2", mid.getSplitId()); + Assert.assertArrayEquals(new Object[]{300L}, mid.getSplitEnd()); + } + + @Test + public void testFindResumeMidSplitEmptyInputs() { + Assert.assertNull(JdbcSourceOffsetProvider.findResumeMidSplit( + Collections.emptyList(), Collections.emptyList(), Collections.emptyList())); + Assert.assertNull(JdbcSourceOffsetProvider.findResumeMidSplit( + Collections.singletonList("db.tbl_a"), + Collections.emptyList(), Collections.emptyList())); + } + + @Test + public void testFindResumeMidSplitSyncTablesContainsUntouchedTable() { + // syncTables lists tbl_a and tbl_b; only tbl_a appears in splits, fully cut. + // tbl_b is untouched (no splits) -> still returns null (no mid). + SnapshotSplit a0 = split("db.tbl_a", 0, null, null); + SnapshotSplit mid = JdbcSourceOffsetProvider.findResumeMidSplit( + Arrays.asList("db.tbl_a", "db.tbl_b"), + Collections.singletonList(a0), + Collections.emptyList()); + Assert.assertNull(mid); + } + + // ===== splitIdOf validation ===== + + @Test + public void testSplitIdOfHappyPath() { + Assert.assertEquals(0, JdbcSourceOffsetProvider.splitIdOf("db.tbl_a:0")); + Assert.assertEquals(42, JdbcSourceOffsetProvider.splitIdOf("db.tbl_a:42")); + // table with colon in its qualifier: lastIndexOf(':') takes the trailing one. + Assert.assertEquals(7, JdbcSourceOffsetProvider.splitIdOf("schema:tbl:7")); + } + + @Test(expected = IllegalArgumentException.class) + public void testSplitIdOfNoColonThrows() { + JdbcSourceOffsetProvider.splitIdOf("db.tbl_a_0"); + } + + @Test(expected = IllegalArgumentException.class) + public void testSplitIdOfTrailingColonThrows() { + JdbcSourceOffsetProvider.splitIdOf("db.tbl_a:"); + } + + @Test(expected = IllegalArgumentException.class) + public void testSplitIdOfNonNumericSuffixThrows() { + JdbcSourceOffsetProvider.splitIdOf("db.tbl_a:abc"); + } + + @Test(expected = IllegalArgumentException.class) + public void testSplitIdOfNullThrows() { + JdbcSourceOffsetProvider.splitIdOf(null); + } + + // ===== mode gate ===== + + @Test + public void testNoMoreSplitsLatestModeAlwaysTrue() { + provider.sourceProperties.put( + org.apache.doris.job.cdc.DataSourceConfigKeys.OFFSET, + org.apache.doris.job.cdc.DataSourceConfigKeys.OFFSET_LATEST); + // Even if cachedSyncTables is populated (e.g. by replayIfNeed), latest mode + // must report noMoreSplits=true so scheduler skips advanceSplits entirely. + provider.cachedSyncTables = Arrays.asList("db.tbl_a", "db.tbl_b"); + Assert.assertTrue(provider.noMoreSplits()); + } + + @Test + public void testNoMoreSplitsSnapshotModeStillRespectsState() throws JobException { + provider.sourceProperties.put( + org.apache.doris.job.cdc.DataSourceConfigKeys.OFFSET, + org.apache.doris.job.cdc.DataSourceConfigKeys.OFFSET_SNAPSHOT); + provider.initOnCreate(Arrays.asList("db.tbl_a")); + Assert.assertFalse("snapshot mode with un-split tables must return false", + provider.noMoreSplits()); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/job/offset/jdbc/SplitProgressTest.java b/fe/fe-core/src/test/java/org/apache/doris/job/offset/jdbc/SplitProgressTest.java new file mode 100644 index 00000000000000..9fccdc0b01280b --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/job/offset/jdbc/SplitProgressTest.java @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.job.offset.jdbc; + +import org.apache.doris.job.offset.jdbc.JdbcSourceOffsetProvider.SplitProgress; + +import org.junit.Assert; +import org.junit.Test; + +public class SplitProgressTest { + + @Test + public void testDefaultStateIsAllNull() { + SplitProgress p = new SplitProgress(); + Assert.assertNull(p.getCurrentSplittingTable()); + Assert.assertNull(p.getNextSplitStart()); + Assert.assertNull(p.getNextSplitId()); + } + + @Test + public void testCopyDeepClonesNextSplitStart() { + SplitProgress original = new SplitProgress(); + original.setCurrentSplittingTable("db.tbl_a"); + original.setNextSplitStart(new Object[]{100L}); + original.setNextSplitId(5); + + SplitProgress copy = original.copy(); + Assert.assertEquals("db.tbl_a", copy.getCurrentSplittingTable()); + Assert.assertArrayEquals(new Object[]{100L}, copy.getNextSplitStart()); + Assert.assertEquals(Integer.valueOf(5), copy.getNextSplitId()); + + // Mutating copy.nextSplitStart must not affect the original (deep copy). + copy.getNextSplitStart()[0] = 999L; + Assert.assertEquals(100L, original.getNextSplitStart()[0]); + } + + @Test + public void testCopyHandlesNullFields() { + SplitProgress original = new SplitProgress(); + SplitProgress copy = original.copy(); + Assert.assertNull(copy.getCurrentSplittingTable()); + Assert.assertNull(copy.getNextSplitStart()); + Assert.assertNull(copy.getNextSplitId()); + } +} diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java index 10cb98e448abae..69210aa99823c5 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java @@ -20,6 +20,7 @@ import org.apache.doris.cdcclient.source.deserialize.DebeziumJsonDeserializer; import org.apache.doris.cdcclient.source.deserialize.DeserializeResult; import org.apache.doris.cdcclient.source.factory.DataSource; +import org.apache.doris.cdcclient.utils.SplitKeyTypeResolver; import org.apache.doris.job.cdc.DataSourceConfigKeys; import org.apache.doris.job.cdc.request.FetchTableSplitsRequest; import org.apache.doris.job.cdc.request.JobBaseConfig; @@ -30,14 +31,15 @@ import org.apache.commons.collections.CollectionUtils; import org.apache.flink.api.connector.source.SourceSplit; -import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.cdc.common.utils.Preconditions; import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig; import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect; import org.apache.flink.cdc.connectors.base.options.StartupOptions; -import org.apache.flink.cdc.connectors.base.source.assigner.HybridSplitAssigner; -import org.apache.flink.cdc.connectors.base.source.assigner.SnapshotSplitAssigner; +import org.apache.flink.cdc.connectors.base.source.assigner.splitter.ChunkSplitter; +import org.apache.flink.cdc.connectors.base.source.utils.JdbcChunkUtils; +import org.apache.flink.cdc.connectors.base.source.assigner.state.ChunkSplitterState; +import org.apache.flink.cdc.connectors.base.source.assigner.state.ChunkSplitterState.ChunkBound; import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset; import org.apache.flink.cdc.connectors.base.source.meta.offset.OffsetFactory; import org.apache.flink.cdc.connectors.base.source.meta.split.FinishedSnapshotSplitInfo; @@ -58,6 +60,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -66,7 +69,6 @@ import java.util.List; import java.util.Map; import java.util.NoSuchElementException; -import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; @@ -75,6 +77,7 @@ import static org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit.STREAM_SPLIT_ID; import com.fasterxml.jackson.databind.ObjectMapper; +import io.debezium.jdbc.JdbcConnection; import io.debezium.relational.Column; import io.debezium.relational.TableId; import io.debezium.relational.history.TableChanges; @@ -133,54 +136,107 @@ public void initialize(String jobId, DataSource dataSource, Map LOG.info("Initialized poll executor with parallelism: {}", parallelism); } + /** + * Fetch a batch of snapshot splits by driving flink-cdc {@link ChunkSplitter} directly. + * + *

Stateless: each RPC builds a fresh splitter from the (table, nextChunkStart, nextChunkId) + * triple supplied by FE, fetches up to {@code batchSize} chunks, then closes the splitter. + * + *

Only INITIAL/SNAPSHOT startup modes call this RPC; binlog/latest modes never reach here. + */ @Override public List getSourceSplits(FetchTableSplitsRequest ftsReq) { - LOG.info("Get table {} splits for job {}", ftsReq.getSnapshotTable(), ftsReq.getJobId()); + LOG.info( + "Get table {} splits for job {} (nextSplitId={}, nextSplitStart={})", + ftsReq.getSnapshotTable(), + ftsReq.getJobId(), + ftsReq.getNextSplitId(), + java.util.Arrays.toString(ftsReq.getNextSplitStart())); JdbcSourceConfig sourceConfig = getSourceConfig(ftsReq); - List - remainingSnapshotSplits = new ArrayList<>(); - StreamSplit remainingStreamSplit = null; - - // Check startup mode - for PostgreSQL, we use similar logic as MySQL - String startupMode = ftsReq.getConfig().get(DataSourceConfigKeys.OFFSET); - if (DataSourceConfigKeys.OFFSET_INITIAL.equalsIgnoreCase(startupMode) - || DataSourceConfigKeys.OFFSET_SNAPSHOT.equalsIgnoreCase(startupMode)) { - remainingSnapshotSplits = - startSplitChunks(sourceConfig, ftsReq.getSnapshotTable(), ftsReq.getConfig()); - } else { - // For non-initial mode, create a stream split - Offset startingOffset = createInitialOffset(); - remainingStreamSplit = - new StreamSplit( - STREAM_SPLIT_ID, - startingOffset, - createNoStoppingOffset(), - new ArrayList<>(), - new HashMap<>(), - 0); - } + String schema = ftsReq.getConfig().get(DataSourceConfigKeys.SCHEMA); + TableId tableId = new TableId(null, schema, ftsReq.getSnapshotTable()); + + int batchSize = ftsReq.getBatchSize() == null ? 100 : ftsReq.getBatchSize(); + ChunkSplitterState state = buildChunkSplitterState(sourceConfig, tableId, ftsReq); + ChunkSplitter splitter = getDialect(sourceConfig).createChunkSplitter(sourceConfig, state); - List splits = new ArrayList<>(); - if (!remainingSnapshotSplits.isEmpty()) { - for (org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit - snapshotSplit : remainingSnapshotSplits) { - String splitId = snapshotSplit.splitId(); - String tableId = snapshotSplit.getTableId().identifier(); - Object[] splitStart = snapshotSplit.getSplitStart(); - Object[] splitEnd = snapshotSplit.getSplitEnd(); - List splitKey = snapshotSplit.getSplitKeyType().getFieldNames(); - SnapshotSplit split = - new SnapshotSplit(splitId, tableId, splitKey, splitStart, splitEnd, null); - splits.add(split); + try { + splitter.open(); + List result = new ArrayList<>(); + while (result.size() < batchSize) { + Collection + chunks = splitter.generateSplits(tableId); + for (org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit chunk : chunks) { + result.add(toDorisSnapshotSplit(chunk)); + } + if (!splitter.hasNextChunk()) { + break; + } + } + LOG.info("Fetched {} splits for table {} (resume nextSplitId={}); hasNextChunk={}", + result.size(), tableId, ftsReq.getNextSplitId(), splitter.hasNextChunk()); + return result; + } catch (Exception e) { + throw new RuntimeException("Failed to generate splits for " + tableId, e); + } finally { + try { + splitter.close(); + } catch (Exception e) { + LOG.warn("Failed to close splitter for {}", tableId, e); } - } else { - Offset startingOffset = remainingStreamSplit.getStartingOffset(); - BinlogSplit streamSplit = new BinlogSplit(); - streamSplit.setSplitId(remainingStreamSplit.splitId()); - streamSplit.setStartingOffset(startingOffset.getOffset()); - splits.add(streamSplit); } - return splits; + } + + /** Resolve and cache the split key column's JDBC type. */ + /** + * null start -> NO_SPLITTING_TABLE_STATE (analyze + maybe evenly); non-null -> resume mid-table. + * Cast pkValues[0] back to the original JDBC type (JSON downgrades Long to Integer). + */ + private ChunkSplitterState buildChunkSplitterState( + JdbcSourceConfig sourceConfig, TableId tableId, FetchTableSplitsRequest ftsReq) { + Object[] pkValues = ftsReq.getNextSplitStart(); + if (pkValues == null || pkValues.length == 0) { + return ChunkSplitterState.NO_SPLITTING_TABLE_STATE; + } + int sqlType = resolveSplitKeySqlType(sourceConfig, tableId); + Object castStart = SplitKeyTypeResolver.cast(pkValues[0], sqlType); + int splitId = ftsReq.getNextSplitId() == null ? 0 : ftsReq.getNextSplitId(); + return new ChunkSplitterState(tableId, ChunkBound.middleOf(castStart), splitId); + } + + private int resolveSplitKeySqlType(JdbcSourceConfig sourceConfig, TableId tableId) { + String database = sourceConfig.getDatabaseList().isEmpty() + ? "" : sourceConfig.getDatabaseList().get(0); + String chunkKeyCol = sourceConfig.getChunkKeyColumn() == null + ? "" : sourceConfig.getChunkKeyColumn(); + String cacheKey = String.join("|", + sourceConfig.getHostname() + ":" + sourceConfig.getPort(), + database, + tableId.toString(), + chunkKeyCol); + return SplitKeyTypeResolver.getOrCompute(cacheKey, () -> { + JdbcDataSourceDialect dialect = getDialect(sourceConfig); + try (JdbcConnection jdbc = dialect.openJdbcConnection(sourceConfig)) { + return JdbcChunkUtils.getSplitColumn( + dialect.queryTableSchema(jdbc, tableId).getTable(), + sourceConfig.getChunkKeyColumn()) + .jdbcType(); + } catch (Exception e) { + throw new RuntimeException("Failed to resolve split key type for " + tableId, e); + } + }); + } + + /** flink-cdc SnapshotSplit -> Doris SnapshotSplit (drops splitKeyType, keeps key field names). */ + private SnapshotSplit toDorisSnapshotSplit( + org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit chunk) { + return new SnapshotSplit( + chunk.splitId(), + chunk.getTableId().identifier(), + chunk.getSplitKeyType().getFieldNames(), + chunk.getSplitStart(), + chunk.getSplitEnd(), + null); } @Override @@ -746,86 +802,6 @@ private Offset getStartOffsetFromConfig(JdbcSourceConfig sourceConfig) { return startingOffset; } - private List - startSplitChunks( - JdbcSourceConfig sourceConfig, - String snapshotTable, - Map config) { - List remainingTables = new ArrayList<>(); - if (snapshotTable != null) { - String schema = config.get(DataSourceConfigKeys.SCHEMA); - remainingTables.add(new TableId(null, schema, snapshotTable)); - } - List remainingSplits = - new ArrayList<>(); - HybridSplitAssigner splitAssigner = - new HybridSplitAssigner<>( - sourceConfig, - 1, - remainingTables, - true, - getDialect(sourceConfig), - getOffsetFactory(), - new MockSplitEnumeratorContext(1)); - splitAssigner.open(); - try { - while (true) { - Optional split = splitAssigner.getNext(); - if (split.isPresent()) { - org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit - snapshotSplit = split.get().asSnapshotSplit(); - remainingSplits.add(snapshotSplit); - } else { - break; - } - } - } finally { - closeChunkSplitterOnly(splitAssigner); - } - return remainingSplits; - } - - /** - * Close only the chunk splitter to avoid closing shared connection pools Similar to MySQL - * implementation Note: HybridSplitAssigner wraps SnapshotSplitAssigner, so we need to get the - * inner assigner first - */ - private static void closeChunkSplitterOnly(HybridSplitAssigner splitAssigner) { - try { - // First, get the inner SnapshotSplitAssigner from HybridSplitAssigner - java.lang.reflect.Field snapshotAssignerField = - HybridSplitAssigner.class.getDeclaredField("snapshotSplitAssigner"); - snapshotAssignerField.setAccessible(true); - SnapshotSplitAssigner snapshotSplitAssigner = - (SnapshotSplitAssigner) snapshotAssignerField.get(splitAssigner); - - if (snapshotSplitAssigner == null) { - LOG.warn("snapshotSplitAssigner is null in HybridSplitAssigner"); - return; - } - - // Call closeExecutorService() via reflection - java.lang.reflect.Method closeExecutorMethod = - SnapshotSplitAssigner.class.getDeclaredMethod("closeExecutorService"); - closeExecutorMethod.setAccessible(true); - closeExecutorMethod.invoke(snapshotSplitAssigner); - - // Call chunkSplitter.close() via reflection - java.lang.reflect.Field chunkSplitterField = - SnapshotSplitAssigner.class.getDeclaredField("chunkSplitter"); - chunkSplitterField.setAccessible(true); - Object chunkSplitter = chunkSplitterField.get(snapshotSplitAssigner); - - if (chunkSplitter != null) { - java.lang.reflect.Method closeMethod = chunkSplitter.getClass().getMethod("close"); - closeMethod.invoke(chunkSplitter); - LOG.info("Closed Source chunkSplitter JDBC connection"); - } - } catch (Exception e) { - LOG.warn("Failed to close chunkSplitter via reflection", e); - } - } - // Method removed - reader cleanup is now handled in finishSplitRecords() protected abstract FetchTask createFetchTaskFromSplit( diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java index bf8ac56312bbeb..1dcce21a64cf06 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java @@ -20,6 +20,7 @@ import org.apache.doris.cdcclient.source.deserialize.DeserializeResult; import org.apache.doris.cdcclient.source.deserialize.MySqlDebeziumJsonDeserializer; import org.apache.doris.cdcclient.source.factory.DataSource; +import org.apache.doris.cdcclient.utils.SplitKeyTypeResolver; import org.apache.doris.cdcclient.source.reader.AbstractCdcSourceReader; import org.apache.doris.cdcclient.source.reader.SnapshotReaderContext; import org.apache.doris.cdcclient.source.reader.SplitReadResult; @@ -38,14 +39,15 @@ import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; import org.apache.flink.api.connector.source.SourceSplit; -import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.cdc.common.utils.Preconditions; import org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils; +import org.apache.flink.cdc.connectors.mysql.schema.MySqlSchema; +import org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlChunkSplitter; +import org.apache.flink.cdc.connectors.mysql.source.assigners.state.ChunkSplitterState; import org.apache.flink.cdc.connectors.mysql.debezium.reader.BinlogSplitReader; import org.apache.flink.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader; import org.apache.flink.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext; -import org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlSnapshotSplitAssigner; import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig; import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory; import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset; @@ -80,7 +82,6 @@ import java.util.List; import java.util.Map; import java.util.NoSuchElementException; -import java.util.Optional; import java.util.Properties; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -158,46 +159,134 @@ public void initialize(String jobId, DataSource dataSource, Map LOG.info("Initialized poll executor with parallelism: {}", parallelism); } + /** + * Fetch a batch of snapshot splits by driving flink-cdc {@link MySqlChunkSplitter} directly. + * + *

Stateless: each RPC rebuilds the splitter from (table, nextSplitStart, nextSplitId) + * supplied by FE, splits up to {@code batchSize} chunks, then closes. Note: evenly-distributed + * PKs go through a single splitChunks() call returning all chunks at once, so batchSize is only + * effective on the uneven path. + * + *

Only INITIAL/SNAPSHOT startup modes go through the chunk path; other modes return a + * single BinlogSplit instead. + */ @Override public List getSourceSplits(FetchTableSplitsRequest ftsReq) { - LOG.info("Get table {} splits for job {}", ftsReq.getSnapshotTable(), ftsReq.getJobId()); + LOG.info( + "Get table {} splits for job {} (nextSplitId={}, nextSplitStart={})", + ftsReq.getSnapshotTable(), + ftsReq.getJobId(), + ftsReq.getNextSplitId(), + java.util.Arrays.toString(ftsReq.getNextSplitStart())); MySqlSourceConfig sourceConfig = getSourceConfig(ftsReq); StartupMode startupMode = sourceConfig.getStartupOptions().startupMode; - List remainingSnapshotSplits = new ArrayList<>(); - MySqlBinlogSplit remainingBinlogSplit = null; - if (startupMode.equals(StartupMode.INITIAL) || startupMode.equals(StartupMode.SNAPSHOT)) { - remainingSnapshotSplits = - startSplitChunks(sourceConfig, ftsReq.getSnapshotTable(), ftsReq.getConfig()); - } else { - remainingBinlogSplit = - new MySqlBinlogSplit( - BINLOG_SPLIT_ID, - sourceConfig.getStartupOptions().binlogOffset, - BinlogOffset.ofNonStopping(), - new ArrayList<>(), - new HashMap<>(), - 0); + + if (!startupMode.equals(StartupMode.INITIAL) && !startupMode.equals(StartupMode.SNAPSHOT)) { + BinlogSplit binlogSplit = new BinlogSplit(); + binlogSplit.setSplitId(BINLOG_SPLIT_ID); + binlogSplit.setStartingOffset(sourceConfig.getStartupOptions().binlogOffset.getOffset()); + return Collections.singletonList(binlogSplit); } - List splits = new ArrayList<>(); - if (!remainingSnapshotSplits.isEmpty()) { - for (MySqlSnapshotSplit snapshotSplit : remainingSnapshotSplits) { - String splitId = snapshotSplit.splitId(); - String tableId = snapshotSplit.getTableId().identifier(); - Object[] splitStart = snapshotSplit.getSplitStart(); - Object[] splitEnd = snapshotSplit.getSplitEnd(); - List splitKey = snapshotSplit.getSplitKeyType().getFieldNames(); - SnapshotSplit split = - new SnapshotSplit(splitId, tableId, splitKey, splitStart, splitEnd, null); - splits.add(split); + + String database = ftsReq.getConfig().get(DataSourceConfigKeys.DATABASE); + TableId tableId = TableId.parse(database + "." + ftsReq.getSnapshotTable()); + int batchSize = ftsReq.getBatchSize() == null ? 100 : ftsReq.getBatchSize(); + + boolean isCaseSensitive; + try (MySqlConnection jdbc = DebeziumUtils.createMySqlConnection(sourceConfig)) { + isCaseSensitive = DebeziumUtils.isTableIdCaseSensitive(jdbc); + } catch (SQLException e) { + throw new RuntimeException("Failed to query table id case sensitivity", e); + } + MySqlSchema mySqlSchema = new MySqlSchema(sourceConfig, isCaseSensitive); + MySqlPartition partition = + new MySqlPartition(sourceConfig.getMySqlConnectorConfig().getLogicalName()); + + ChunkSplitterState state = buildChunkSplitterState(sourceConfig, tableId, ftsReq, mySqlSchema, partition); + MySqlChunkSplitter splitter = new MySqlChunkSplitter(mySqlSchema, sourceConfig, state); + + try { + splitter.open(); + List result = new ArrayList<>(); + while (result.size() < batchSize) { + List chunks = splitter.splitChunks(partition, tableId); + for (MySqlSnapshotSplit chunk : chunks) { + result.add(toDorisSnapshotSplit(chunk)); + } + if (!splitter.hasNextChunk()) { + break; + } + } + LOG.info( + "Fetched {} splits for table {} (resume nextSplitId={}); hasNextChunk={}", + result.size(), + tableId, + ftsReq.getNextSplitId(), + splitter.hasNextChunk()); + return result; + } catch (Exception e) { + throw new RuntimeException("Failed to generate splits for " + tableId, e); + } finally { + try { + splitter.close(); + } catch (Exception e) { + LOG.warn("Failed to close splitter for {}", tableId, e); } - } else { - BinlogOffset startingOffset = remainingBinlogSplit.getStartingOffset(); - BinlogSplit binlogSplit = new BinlogSplit(); - binlogSplit.setSplitId(remainingBinlogSplit.splitId()); - binlogSplit.setStartingOffset(startingOffset.getOffset()); - splits.add(binlogSplit); } - return splits; + } + + /** + * null start -> NO_SPLITTING_TABLE_STATE (analyze + maybe evenly); non-null -> resume mid-table. + * Cast pkValues[0] back to the original JDBC type (JSON downgrades Long to Integer). + */ + private ChunkSplitterState buildChunkSplitterState( + MySqlSourceConfig sourceConfig, TableId tableId, FetchTableSplitsRequest ftsReq, + MySqlSchema mySqlSchema, MySqlPartition partition) { + Object[] pkValues = ftsReq.getNextSplitStart(); + if (pkValues == null || pkValues.length == 0) { + return ChunkSplitterState.NO_SPLITTING_TABLE_STATE; + } + int sqlType = resolveSplitKeySqlType(sourceConfig, tableId, mySqlSchema, partition); + Object castStart = SplitKeyTypeResolver.cast(pkValues[0], sqlType); + int splitId = ftsReq.getNextSplitId() == null ? 0 : ftsReq.getNextSplitId(); + return new ChunkSplitterState( + tableId, ChunkSplitterState.ChunkBound.middleOf(castStart), splitId); + } + + /** Resolve and cache the split key column's JDBC type. */ + private int resolveSplitKeySqlType( + MySqlSourceConfig sourceConfig, TableId tableId, + MySqlSchema mySqlSchema, MySqlPartition partition) { + String database = sourceConfig.getDatabaseList().isEmpty() + ? "" : sourceConfig.getDatabaseList().get(0); + String chunkKeyCols = sourceConfig.getChunkKeyColumns() == null + ? "" : sourceConfig.getChunkKeyColumns().toString(); + String cacheKey = String.join("|", + sourceConfig.getHostname() + ":" + sourceConfig.getPort(), + database, + tableId.toString(), + chunkKeyCols); + return SplitKeyTypeResolver.getOrCompute(cacheKey, () -> { + try (MySqlConnection jdbc = DebeziumUtils.createMySqlConnection(sourceConfig)) { + return ChunkUtils.getChunkKeyColumn( + mySqlSchema.getTableSchema(partition, jdbc, tableId).getTable(), + sourceConfig.getChunkKeyColumns()) + .jdbcType(); + } catch (Exception e) { + throw new RuntimeException("Failed to resolve split key type for " + tableId, e); + } + }); + } + + /** flink-cdc MySqlSnapshotSplit -> Doris SnapshotSplit (drops splitKeyType, keeps field names). */ + private SnapshotSplit toDorisSnapshotSplit(MySqlSnapshotSplit chunk) { + return new SnapshotSplit( + chunk.splitId(), + chunk.getTableId().identifier(), + chunk.getSplitKeyType().getFieldNames(), + chunk.getSplitStart(), + chunk.getSplitEnd(), + null); } @Override @@ -705,66 +794,6 @@ private Tuple2 createBinlogSplit( return Tuple2.of(binlogSplitFinal, pureBinlogPhase); } - private List startSplitChunks( - MySqlSourceConfig sourceConfig, String snapshotTable, Map config) { - List remainingTables = new ArrayList<>(); - if (snapshotTable != null) { - // need add database name - String database = config.get(DataSourceConfigKeys.DATABASE); - remainingTables.add(TableId.parse(database + "." + snapshotTable)); - } - List remainingSplits = new ArrayList<>(); - MySqlSnapshotSplitAssigner splitAssigner = - new MySqlSnapshotSplitAssigner( - sourceConfig, 1, remainingTables, false, new MockSplitEnumeratorContext(1)); - splitAssigner.open(); - try { - while (true) { - Optional mySqlSplit = splitAssigner.getNext(); - if (mySqlSplit.isPresent()) { - MySqlSnapshotSplit snapshotSplit = mySqlSplit.get().asSnapshotSplit(); - remainingSplits.add(snapshotSplit); - } else { - break; - } - } - } finally { - // splitAssigner.close(); - closeChunkSplitterOnly(splitAssigner); - } - return remainingSplits; - } - - /** - * The JdbcConnectionPools inside MySqlSnapshotSplitAssigner are singletons. Calling - * MySqlSnapshotSplitAssigner.close() closes the entire JdbcConnectionPools, which can cause - * problems under high concurrency. This only closes the connection of the current - * MySqlSnapshotSplitAssigner. - */ - private void closeChunkSplitterOnly(MySqlSnapshotSplitAssigner splitAssigner) { - try { - // call closeExecutorService() - java.lang.reflect.Method closeExecutorMethod = - MySqlSnapshotSplitAssigner.class.getDeclaredMethod("closeExecutorService"); - closeExecutorMethod.setAccessible(true); - closeExecutorMethod.invoke(splitAssigner); - - // call chunkSplitter.close() - java.lang.reflect.Field field = - MySqlSnapshotSplitAssigner.class.getDeclaredField("chunkSplitter"); - field.setAccessible(true); - Object chunkSplitter = field.get(splitAssigner); - - if (chunkSplitter != null) { - java.lang.reflect.Method closeMethod = chunkSplitter.getClass().getMethod("close"); - closeMethod.invoke(chunkSplitter); - LOG.info("Closed chunkSplitter JDBC connection"); - } - } catch (Exception e) { - LOG.warn("Failed to close chunkSplitter via reflection,", e); - } - } - private SnapshotSplitReader getSnapshotSplitReader(JobBaseConfig config, int subtaskId) { MySqlSourceConfig sourceConfig = getSourceConfig(config); final MySqlConnection jdbcConnection = DebeziumUtils.createMySqlConnection(sourceConfig); diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SplitKeyTypeResolver.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SplitKeyTypeResolver.java new file mode 100644 index 00000000000000..ca3521ad9a37ef --- /dev/null +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SplitKeyTypeResolver.java @@ -0,0 +1,76 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cdcclient.utils; + +import java.math.BigDecimal; +import java.sql.Types; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.function.IntSupplier; + +/** + * Casts JSON-round-tripped split-key values back to the Java type the JDBC driver returns + * for that column. flink-cdc's chunk splitter uses strict {@code Objects.equals} internally, + * so passing {@code Integer(3)} where it expects {@code Long(3)} (BIGINT column) silently + * produces empty chunks. + */ +public final class SplitKeyTypeResolver { + + private static final ConcurrentMap SQL_TYPE_CACHE = new ConcurrentHashMap<>(); + + private SplitKeyTypeResolver() {} + + /** Returns cached sqlType, or computes via {@code resolver} and caches. */ + public static int getOrCompute(String cacheKey, IntSupplier resolver) { + Integer cached = SQL_TYPE_CACHE.get(cacheKey); + if (cached != null) { + return cached; + } + int v = resolver.getAsInt(); + SQL_TYPE_CACHE.put(cacheKey, v); + return v; + } + + /** Cast a Number to the Java type JDBC returns for the given {@link Types} code. */ + public static Object cast(Object v, int sqlType) { + if (!(v instanceof Number)) { + return v; + } + Number n = (Number) v; + switch (sqlType) { + case Types.BIGINT: + return n.longValue(); + case Types.INTEGER: + return n.intValue(); + case Types.SMALLINT: + return n.shortValue(); + case Types.TINYINT: + return n.byteValue(); + case Types.NUMERIC: + case Types.DECIMAL: + return new BigDecimal(n.toString()); + case Types.REAL: + case Types.FLOAT: + return n.floatValue(); + case Types.DOUBLE: + return n.doubleValue(); + default: + return v; + } + } +} diff --git a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/SplitKeyTypeResolverTest.java b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/SplitKeyTypeResolverTest.java new file mode 100644 index 00000000000000..df729344295286 --- /dev/null +++ b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/SplitKeyTypeResolverTest.java @@ -0,0 +1,153 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cdcclient.utils; + +import org.junit.jupiter.api.Test; + +import java.math.BigDecimal; +import java.sql.Types; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertSame; + +class SplitKeyTypeResolverTest { + + // ─── cast: numeric types ────────────────────────────────────────────────── + + @Test + void castIntegerToBigintReturnsLong() { + Object out = SplitKeyTypeResolver.cast(Integer.valueOf(3), Types.BIGINT); + assertEquals(Long.class, out.getClass()); + assertEquals(3L, out); + } + + @Test + void castLongToIntegerReturnsInteger() { + Object out = SplitKeyTypeResolver.cast(Long.valueOf(7), Types.INTEGER); + assertEquals(Integer.class, out.getClass()); + assertEquals(7, out); + } + + @Test + void castNumberToSmallintReturnsShort() { + Object out = SplitKeyTypeResolver.cast(Integer.valueOf(100), Types.SMALLINT); + assertEquals(Short.class, out.getClass()); + assertEquals((short) 100, out); + } + + @Test + void castNumberToTinyintReturnsByte() { + Object out = SplitKeyTypeResolver.cast(Integer.valueOf(5), Types.TINYINT); + assertEquals(Byte.class, out.getClass()); + assertEquals((byte) 5, out); + } + + @Test + void castNumberToDecimalReturnsBigDecimal() { + Object out = SplitKeyTypeResolver.cast(Long.valueOf(42), Types.DECIMAL); + assertEquals(BigDecimal.class, out.getClass()); + assertEquals(new BigDecimal("42"), out); + } + + @Test + void castNumberToNumericReturnsBigDecimal() { + Object out = SplitKeyTypeResolver.cast(Integer.valueOf(9), Types.NUMERIC); + assertEquals(BigDecimal.class, out.getClass()); + assertEquals(new BigDecimal("9"), out); + } + + @Test + void castNumberToFloatReturnsFloat() { + Object out = SplitKeyTypeResolver.cast(Double.valueOf(1.5), Types.FLOAT); + assertEquals(Float.class, out.getClass()); + assertEquals(1.5f, out); + } + + @Test + void castNumberToDoubleReturnsDouble() { + Object out = SplitKeyTypeResolver.cast(Integer.valueOf(2), Types.DOUBLE); + assertEquals(Double.class, out.getClass()); + assertEquals(2.0d, out); + } + + // ─── cast: edge cases ───────────────────────────────────────────────────── + + @Test + void castNullReturnsNull() { + assertNull(SplitKeyTypeResolver.cast(null, Types.BIGINT)); + } + + @Test + void castNonNumberReturnsAsIs() { + String v = "abc"; + assertSame(v, SplitKeyTypeResolver.cast(v, Types.BIGINT)); + } + + @Test + void castUuidReturnsAsIs() { + UUID v = UUID.randomUUID(); + assertSame(v, SplitKeyTypeResolver.cast(v, Types.OTHER)); + } + + @Test + void castUnknownSqlTypeReturnsAsIs() { + Object v = Integer.valueOf(3); + // VARCHAR is not in the switch -> returned as is. + assertSame(v, SplitKeyTypeResolver.cast(v, Types.VARCHAR)); + } + + // ─── getOrCompute: cache behavior ───────────────────────────────────────── + + @Test + void getOrComputeCachesAfterFirstCall() { + String key = "test-cache-" + UUID.randomUUID(); + AtomicInteger callCount = new AtomicInteger(); + int v1 = SplitKeyTypeResolver.getOrCompute(key, () -> { + callCount.incrementAndGet(); + return Types.BIGINT; + }); + int v2 = SplitKeyTypeResolver.getOrCompute(key, () -> { + callCount.incrementAndGet(); + return Types.INTEGER; // would be wrong if called -> proves cache hit + }); + assertEquals(Types.BIGINT, v1); + assertEquals(Types.BIGINT, v2); + assertEquals(1, callCount.get()); + } + + @Test + void getOrComputeDifferentKeysComputeIndependently() { + AtomicInteger callCount = new AtomicInteger(); + String keyA = "test-keyA-" + UUID.randomUUID(); + String keyB = "test-keyB-" + UUID.randomUUID(); + int a = SplitKeyTypeResolver.getOrCompute(keyA, () -> { + callCount.incrementAndGet(); + return Types.BIGINT; + }); + int b = SplitKeyTypeResolver.getOrCompute(keyB, () -> { + callCount.incrementAndGet(); + return Types.INTEGER; + }); + assertEquals(Types.BIGINT, a); + assertEquals(Types.INTEGER, b); + assertEquals(2, callCount.get()); + } +} diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_async_split.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_async_split.groovy new file mode 100644 index 00000000000000..08eef8a898fe47 --- /dev/null +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_async_split.groovy @@ -0,0 +1,162 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +import org.awaitility.Awaitility + +import static java.util.concurrent.TimeUnit.SECONDS + +// Verifies async chunk splitting end-to-end for MySQL: CREATE returns quickly, +// snapshot fully syncs across multiple splits, and the binlog phase still works after. +suite("test_streaming_mysql_job_async_split", "p0,external,mysql,external_docker,external_docker_mysql,nondatalake") { + def jobName = "test_streaming_mysql_job_async_split_name" + def currentDb = (sql "select database()")[0][0] + def table1 = "user_info_mysql_async_split" + def mysqlDb = "test_cdc_db" + def totalRows = 100 + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """drop table if exists ${currentDb}.${table1} force""" + + String enabled = context.config.otherConfigs.get("enableJdbcTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String mysql_port = context.config.otherConfigs.get("mysql_57_port"); + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String s3_endpoint = getS3Endpoint() + String bucket = getS3BucketName() + String driver_url = "https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar" + + // Prepare 100 rows in MySQL; snapshot_split_size=5 -> 20 splits. + connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") { + sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}""" + sql """DROP TABLE IF EXISTS ${mysqlDb}.${table1}""" + sql """CREATE TABLE ${mysqlDb}.${table1} ( + `id` int NOT NULL, + `name` varchar(200), + `age` int, + PRIMARY KEY (`id`) + ) ENGINE=InnoDB""" + StringBuilder sb = new StringBuilder() + sb.append("INSERT INTO ${mysqlDb}.${table1} (id, name, age) VALUES ") + for (int i = 1; i <= totalRows; i++) { + if (i > 1) sb.append(", ") + sb.append("(${i}, 'name_${i}', ${i})") + } + sql sb.toString() + } + + // CREATE should return promptly: splitting is now driven by the scheduler + // each tick, not synchronously inside CREATE. + long createStartMs = System.currentTimeMillis() + sql """CREATE JOB ${jobName} + ON STREAMING + FROM MYSQL ( + "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}", + "driver_url" = "${driver_url}", + "driver_class" = "com.mysql.cj.jdbc.Driver", + "user" = "root", + "password" = "123456", + "database" = "${mysqlDb}", + "include_tables" = "${table1}", + "offset" = "initial", + "snapshot_split_size" = "5", + "snapshot_parallelism" = "4" + ) + TO DATABASE ${currentDb} ( + "table.create.properties.replication_num" = "1" + ) + """ + long createElapsedMs = System.currentTimeMillis() - createStartMs + log.info("CREATE JOB elapsed: ${createElapsedMs} ms") + assert createElapsedMs < 30_000 : + "CREATE JOB should return quickly under async splitting, took ${createElapsedMs} ms" + + // Job metadata should be visible immediately (no blocking on splitChunks). + def jobRows = sql """select Name from jobs("type"="insert") where Name='${jobName}'""" + assert jobRows.size() == 1 + + // Wait until snapshot fully syncs. 20 splits at snapshot_parallelism=4 -> + // ~5 task ticks; default max_interval=10s -> ~50s end-to-end. Cap at 5min. + try { + Awaitility.await().atMost(300, SECONDS) + .pollInterval(2, SECONDS).until( + { + def cnt = sql """SELECT COUNT(*) FROM ${currentDb}.${table1}""" + log.info("doris row count: ${cnt}") + cnt.size() == 1 && cnt.get(0).get(0) == totalRows + } + ) + } catch (Exception ex) { + def showjob = sql """select * from jobs("type"="insert") where Name='${jobName}'""" + def showtask = sql """select * from tasks("type"="insert") where JobName='${jobName}'""" + log.info("show job: " + showjob) + log.info("show task: " + showtask) + throw ex + } + + // Distinct PK count must equal total — no duplicates from RPC retry / dedup paths. + def doneDistinct = sql """SELECT COUNT(DISTINCT id) FROM ${currentDb}.${table1}""" + assert doneDistinct.get(0).get(0) == totalRows + + def loadStat0 = parseJson(sql(""" + select loadStatistic from jobs("type"="insert") where Name='${jobName}' + """).get(0).get(0)) + log.info("loadStat after snapshot: ${loadStat0}") + assert loadStat0.scannedRows == totalRows + + // Snapshot done -> binlog phase. Verify INSERT/UPDATE/DELETE still propagate. + connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") { + sql """INSERT INTO ${mysqlDb}.${table1} (id, name, age) VALUES (101, 'incr_101', 101);""" + sql """UPDATE ${mysqlDb}.${table1} SET age = 999 WHERE id = 1;""" + sql """DELETE FROM ${mysqlDb}.${table1} WHERE id = 2;""" + } + + try { + Awaitility.await().atMost(120, SECONDS) + .pollInterval(2, SECONDS).until( + { + def res = sql """SELECT + COUNT(*), + SUM(CASE WHEN id=1 AND age=999 THEN 1 ELSE 0 END), + SUM(CASE WHEN id=101 THEN 1 ELSE 0 END), + SUM(CASE WHEN id=2 THEN 1 ELSE 0 END) + FROM ${currentDb}.${table1}""" + log.info("binlog state: ${res}") + res.size() == 1 + && res.get(0).get(0) == totalRows + && res.get(0).get(1) == 1 + && res.get(0).get(2) == 1 + && res.get(0).get(3) == 0 + } + ) + } catch (Exception ex) { + def showjob = sql """select * from jobs("type"="insert") where Name='${jobName}'""" + log.info("show job: " + showjob) + throw ex + } + + // Job should still be RUNNING (snapshot transitioned cleanly to binlog). + def jobInfo = sql """ + select status from jobs("type"="insert") where Name='${jobName}' + """ + assert jobInfo.get(0).get(0) == "RUNNING" + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + def jobCountRsp = sql """select count(1) from jobs("type"="insert") where Name ='${jobName}'""" + assert jobCountRsp.get(0).get(0) == 0 + } +} diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_async_split.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_async_split.groovy new file mode 100644 index 00000000000000..688cc752487a12 --- /dev/null +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_async_split.groovy @@ -0,0 +1,168 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +import org.awaitility.Awaitility + +import static java.util.concurrent.TimeUnit.SECONDS + +// Verifies async chunk splitting end-to-end: CREATE returns quickly, snapshot +// fully syncs across multiple splits, and the binlog phase still works after. +suite("test_streaming_postgres_job_async_split", "p0,external,pg,external_docker,external_docker_pg,nondatalake") { + def jobName = "test_streaming_postgres_job_async_split_name" + def currentDb = (sql "select database()")[0][0] + def table1 = "user_info_pg_async_split" + def pgDB = "postgres" + def pgSchema = "cdc_test" + def pgUser = "postgres" + def pgPassword = "123456" + def totalRows = 100 + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """drop table if exists ${currentDb}.${table1} force""" + + String enabled = context.config.otherConfigs.get("enableJdbcTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String pg_port = context.config.otherConfigs.get("pg_14_port"); + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String s3_endpoint = getS3Endpoint() + String bucket = getS3BucketName() + String driver_url = "https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar" + + // Prepare 100 rows in PG; snapshot_split_size=5 -> 20 splits. + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}""" + sql """CREATE TABLE ${pgDB}.${pgSchema}.${table1} ( + "id" int4 PRIMARY KEY, + "name" varchar(200), + "age" int4 + )""" + StringBuilder sb = new StringBuilder() + sb.append("INSERT INTO ${pgDB}.${pgSchema}.${table1} (id, name, age) VALUES ") + for (int i = 1; i <= totalRows; i++) { + if (i > 1) sb.append(", ") + sb.append("(${i}, 'name_${i}', ${i})") + } + sql sb.toString() + } + + // CREATE should return promptly: splitting is now driven by the scheduler + // each tick, not synchronously inside CREATE. Synchronous splitting on a + // large table historically could take many minutes; here we just assert + // the CREATE call itself doesn't block for that long. + long createStartMs = System.currentTimeMillis() + sql """CREATE JOB ${jobName} + ON STREAMING + FROM POSTGRES ( + "jdbc_url" = "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}", + "driver_url" = "${driver_url}", + "driver_class" = "org.postgresql.Driver", + "user" = "${pgUser}", + "password" = "${pgPassword}", + "database" = "${pgDB}", + "schema" = "${pgSchema}", + "include_tables" = "${table1}", + "offset" = "initial", + "snapshot_split_size" = "5", + "snapshot_parallelism" = "4" + ) + TO DATABASE ${currentDb} ( + "table.create.properties.replication_num" = "1" + ) + """ + long createElapsedMs = System.currentTimeMillis() - createStartMs + log.info("CREATE JOB elapsed: ${createElapsedMs} ms") + assert createElapsedMs < 30_000 : + "CREATE JOB should return quickly under async splitting, took ${createElapsedMs} ms" + + // Job metadata should be visible immediately (no blocking on splitChunks). + def jobRows = sql """select Name from jobs("type"="insert") where Name='${jobName}'""" + assert jobRows.size() == 1 + + // Wait until snapshot fully syncs. 20 splits at snapshot_parallelism=4 -> + // ~5 task ticks; default max_interval=10s -> ~50s end-to-end. Cap at 5min. + try { + Awaitility.await().atMost(300, SECONDS) + .pollInterval(2, SECONDS).until( + { + def cnt = sql """SELECT COUNT(*) FROM ${currentDb}.${table1}""" + log.info("doris row count: ${cnt}") + cnt.size() == 1 && cnt.get(0).get(0) == totalRows + } + ) + } catch (Exception ex) { + def showjob = sql """select * from jobs("type"="insert") where Name='${jobName}'""" + def showtask = sql """select * from tasks("type"="insert") where JobName='${jobName}'""" + log.info("show job: " + showjob) + log.info("show task: " + showtask) + throw ex + } + + // Distinct PK count must equal total — no duplicates from RPC retry / dedup paths. + def doneDistinct = sql """SELECT COUNT(DISTINCT id) FROM ${currentDb}.${table1}""" + assert doneDistinct.get(0).get(0) == totalRows + + def loadStat0 = parseJson(sql(""" + select loadStatistic from jobs("type"="insert") where Name='${jobName}' + """).get(0).get(0)) + log.info("loadStat after snapshot: ${loadStat0}") + assert loadStat0.scannedRows == totalRows + + // Snapshot done -> binlog phase. Verify INSERT/UPDATE/DELETE still propagate. + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (id, name, age) VALUES (101, 'incr_101', 101);""" + sql """UPDATE ${pgDB}.${pgSchema}.${table1} SET age = 999 WHERE id = 1;""" + sql """DELETE FROM ${pgDB}.${pgSchema}.${table1} WHERE id = 2;""" + } + + try { + Awaitility.await().atMost(120, SECONDS) + .pollInterval(2, SECONDS).until( + { + def res = sql """SELECT + COUNT(*), + SUM(CASE WHEN id=1 AND age=999 THEN 1 ELSE 0 END), + SUM(CASE WHEN id=101 THEN 1 ELSE 0 END), + SUM(CASE WHEN id=2 THEN 1 ELSE 0 END) + FROM ${currentDb}.${table1}""" + log.info("binlog state: ${res}") + // delete(id=2) + insert(id=101) -> count == totalRows + // update(id=1)+age=999 -> 1; insert(id=101) -> 1; leftover(id=2) -> 0 + res.size() == 1 + && res.get(0).get(0) == totalRows + && res.get(0).get(1) == 1 + && res.get(0).get(2) == 1 + && res.get(0).get(3) == 0 + } + ) + } catch (Exception ex) { + def showjob = sql """select * from jobs("type"="insert") where Name='${jobName}'""" + log.info("show job: " + showjob) + throw ex + } + + // Job should still be RUNNING (snapshot transitioned cleanly to binlog). + def jobInfo = sql """ + select status from jobs("type"="insert") where Name='${jobName}' + """ + assert jobInfo.get(0).get(0) == "RUNNING" + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + def jobCountRsp = sql """select count(1) from jobs("type"="insert") where Name ='${jobName}'""" + assert jobCountRsp.get(0).get(0) == 0 + } +} diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_async_split_latest.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_async_split_latest.groovy new file mode 100644 index 00000000000000..aae688033412ac --- /dev/null +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_async_split_latest.groovy @@ -0,0 +1,118 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +import org.awaitility.Awaitility + +import static java.util.concurrent.TimeUnit.SECONDS + +// Verifies the mode gate: with offset=latest, scheduler must NOT trigger any +// fetchSplits RPC even after FE replay. Existing source rows are skipped; +// only binlog changes after CREATE flow through. +suite("test_streaming_postgres_job_async_split_latest", "p0,external,pg,external_docker,external_docker_pg,nondatalake") { + def jobName = "test_streaming_postgres_job_async_split_latest_name" + def currentDb = (sql "select database()")[0][0] + def table1 = "user_info_pg_async_split_latest" + def pgDB = "postgres" + def pgSchema = "cdc_test" + def pgUser = "postgres" + def pgPassword = "123456" + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """drop table if exists ${currentDb}.${table1} force""" + + String enabled = context.config.otherConfigs.get("enableJdbcTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String pg_port = context.config.otherConfigs.get("pg_14_port"); + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String s3_endpoint = getS3Endpoint() + String bucket = getS3BucketName() + String driver_url = "https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar" + + // Pre-existing rows that must NOT be synced (offset=latest skips snapshot). + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}""" + sql """CREATE TABLE ${pgDB}.${pgSchema}.${table1} ( + "id" int4 PRIMARY KEY, + "name" varchar(200) + )""" + sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (1, 'pre_1'), (2, 'pre_2'), (3, 'pre_3')""" + } + + long createStartMs = System.currentTimeMillis() + sql """CREATE JOB ${jobName} + ON STREAMING + FROM POSTGRES ( + "jdbc_url" = "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}", + "driver_url" = "${driver_url}", + "driver_class" = "org.postgresql.Driver", + "user" = "${pgUser}", + "password" = "${pgPassword}", + "database" = "${pgDB}", + "schema" = "${pgSchema}", + "include_tables" = "${table1}", + "offset" = "latest" + ) + TO DATABASE ${currentDb} ( + "table.create.properties.replication_num" = "1" + ) + """ + long createElapsedMs = System.currentTimeMillis() - createStartMs + log.info("CREATE JOB elapsed: ${createElapsedMs} ms") + assert createElapsedMs < 30_000 : + "CREATE JOB latest mode should return quickly, took ${createElapsedMs} ms" + + // Wait for job to reach RUNNING (binlog phase) without scanning any snapshot row. + Awaitility.await().atMost(60, SECONDS) + .pollInterval(1, SECONDS).until( + { + def res = sql """select status from jobs("type"="insert") where Name='${jobName}'""" + res.size() == 1 && res.get(0).get(0) == "RUNNING" + } + ) + + // INSERT a new row after CREATE; binlog must pick it up. + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (101, 'after_latest')""" + } + + Awaitility.await().atMost(120, SECONDS) + .pollInterval(2, SECONDS).until( + { + def res = sql """SELECT COUNT(*) FROM ${currentDb}.${table1} WHERE id = 101""" + res.size() == 1 && res.get(0).get(0) == 1 + } + ) + + // Pre-existing rows (1, 2, 3) must NOT be in Doris: offset=latest skipped snapshot. + def preCount = sql """SELECT COUNT(*) FROM ${currentDb}.${table1} WHERE id IN (1, 2, 3)""" + assert preCount.get(0).get(0) == 0 : + "offset=latest should skip pre-existing rows, but found ${preCount.get(0).get(0)} of them" + + def loadStat = parseJson(sql(""" + select loadStatistic from jobs("type"="insert") where Name='${jobName}' + """).get(0).get(0)) + log.info("loadStat: ${loadStat}") + // Only the post-CREATE INSERT should be scanned. + assert loadStat.scannedRows == 1 : + "expected scannedRows=1 (only the binlog INSERT), got ${loadStat.scannedRows}" + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + def jobCountRsp = sql """select count(1) from jobs("type"="insert") where Name ='${jobName}'""" + assert jobCountRsp.get(0).get(0) == 0 + } +} diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_async_split_restart_fe.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_async_split_restart_fe.groovy new file mode 100644 index 00000000000000..39d3ea95896728 --- /dev/null +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_async_split_restart_fe.groovy @@ -0,0 +1,147 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +import org.apache.doris.regression.suite.ClusterOptions +import org.awaitility.Awaitility + +import static java.util.concurrent.TimeUnit.SECONDS + +// Verifies async chunk splitting survives FE restart mid-snapshot: cdcSplitProgress +// resumes from the system table on next tick, no chunks are re-cut, no rows lost. +suite("test_streaming_postgres_job_async_split_restart_fe", + "docker,pg,external_docker,external_docker_pg,nondatalake") { + def jobName = "test_streaming_postgres_job_async_split_restart_fe" + def options = new ClusterOptions() + options.setFeNum(1) + options.cloudMode = null + + docker(options) { + def currentDb = (sql "select database()")[0][0] + def table1 = "user_info_pg_async_split_restart" + def pgDB = "postgres" + def pgSchema = "cdc_test" + def pgUser = "postgres" + def pgPassword = "123456" + def totalRows = 500 + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """drop table if exists ${currentDb}.${table1} force""" + + String enabled = context.config.otherConfigs.get("enableJdbcTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String pg_port = context.config.otherConfigs.get("pg_14_port"); + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String s3_endpoint = getS3Endpoint() + String bucket = getS3BucketName() + String driver_url = "https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar" + + // 500 rows + split_size=10 -> 50 splits; parallelism=1 keeps consumption slow + // enough that we can restart while still mid-snapshot. + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}""" + sql """CREATE TABLE ${pgDB}.${pgSchema}.${table1} ( + "id" int4 PRIMARY KEY, + "name" varchar(200) + )""" + StringBuilder sb = new StringBuilder() + sb.append("INSERT INTO ${pgDB}.${pgSchema}.${table1} (id, name) VALUES ") + for (int i = 1; i <= totalRows; i++) { + if (i > 1) sb.append(", ") + sb.append("(${i}, 'name_${i}')") + } + sql sb.toString() + } + + sql """CREATE JOB ${jobName} + ON STREAMING + FROM POSTGRES ( + "jdbc_url" = "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}", + "driver_url" = "${driver_url}", + "driver_class" = "org.postgresql.Driver", + "user" = "${pgUser}", + "password" = "${pgPassword}", + "database" = "${pgDB}", + "schema" = "${pgSchema}", + "include_tables" = "${table1}", + "offset" = "initial", + "snapshot_split_size" = "10", + "snapshot_parallelism" = "1" + ) + TO DATABASE ${currentDb} ( + "table.create.properties.replication_num" = "1" + ) + """ + + // Wait until at least 3 tasks succeeded (≈30% snapshot progress) before restart, + // so cdcSplitProgress is mid-table when FE goes down. + try { + Awaitility.await().atMost(120, SECONDS) + .pollInterval(1, SECONDS).until( + { + def succeed = sql """select SucceedTaskCount from jobs("type"="insert") where Name = '${jobName}'""" + log.info("SucceedTaskCount before restart: ${succeed}") + succeed.size() == 1 && Integer.parseInt(succeed.get(0).get(0).toString()) >= 3 + } + ) + } catch (Exception ex) { + def showjob = sql """select * from jobs("type"="insert") where Name='${jobName}'""" + log.info("show job: " + showjob) + throw ex + } + + // Capture state before restart so we can confirm progress survives editlog replay. + def rowsBefore = sql """SELECT COUNT(*) FROM ${currentDb}.${table1}""" + def progressBefore = sql """select SucceedTaskCount from jobs("type"="insert") where Name='${jobName}'""" + log.info("before restart: rows=${rowsBefore} succeedTasks=${progressBefore}") + assert rowsBefore.get(0).get(0) < totalRows : + "snapshot finished too fast (${rowsBefore.get(0).get(0)} rows) — restart wouldn't be mid-snapshot" + + cluster.restartFrontends() + sleep(30000) + context.reconnectFe() + + // After restart, snapshot must complete; final count = totalRows, no duplicates. + try { + Awaitility.await().atMost(600, SECONDS) + .pollInterval(2, SECONDS).until( + { + def cnt = sql """SELECT COUNT(*) FROM ${currentDb}.${table1}""" + log.info("doris row count after restart: ${cnt}") + cnt.size() == 1 && cnt.get(0).get(0) == totalRows + } + ) + } catch (Exception ex) { + def showjob = sql """select * from jobs("type"="insert") where Name='${jobName}'""" + def showtask = sql """select * from tasks("type"="insert") where JobName='${jobName}'""" + log.info("show job: " + showjob) + log.info("show task: " + showtask) + throw ex + } + + def distinctCnt = sql """SELECT COUNT(DISTINCT id) FROM ${currentDb}.${table1}""" + assert distinctCnt.get(0).get(0) == totalRows : + "row count matches but PK distinct count differs — duplicates from re-cutting split chunks" + + // Job should reach binlog phase (RUNNING, snapshot transitioned cleanly). + def status = sql """select status from jobs("type"="insert") where Name='${jobName}'""" + assert status.get(0).get(0) == "RUNNING" + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + } + } +} diff --git a/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_async_split.groovy b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_async_split.groovy new file mode 100644 index 00000000000000..54c0bdac8745b2 --- /dev/null +++ b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_async_split.groovy @@ -0,0 +1,167 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +import org.awaitility.Awaitility + +import static java.util.concurrent.TimeUnit.SECONDS + +// Verifies async chunk splitting end-to-end for the cdc_stream TVF path: CREATE returns +// quickly, snapshot fully syncs across multiple splits, and the binlog phase still works. +suite("test_streaming_job_cdc_stream_postgres_async_split", + "p0,external,pg,external_docker,external_docker_pg,nondatalake") { + def jobName = "test_streaming_job_cdc_stream_postgres_async_split" + def currentDb = (sql "select database()")[0][0] + def dorisTable = "test_streaming_job_cdc_stream_postgres_async_split_tbl" + def pgDB = "postgres" + def pgSchema = "cdc_test" + def pgUser = "postgres" + def pgPassword = "123456" + def pgTable = "test_streaming_job_cdc_stream_postgres_async_split_src" + def totalRows = 100 + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """drop table if exists ${currentDb}.${dorisTable} force""" + + sql """ + CREATE TABLE IF NOT EXISTS ${currentDb}.${dorisTable} ( + `id` int NULL, + `name` varchar(200) NULL, + `age` int NULL + ) ENGINE=OLAP + DUPLICATE KEY(`id`) + DISTRIBUTED BY HASH(`id`) BUCKETS AUTO + PROPERTIES ("replication_allocation" = "tag.location.default: 1") + """ + + String enabled = context.config.otherConfigs.get("enableJdbcTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String pg_port = context.config.otherConfigs.get("pg_14_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String s3_endpoint = getS3Endpoint() + String bucket = getS3BucketName() + String driver_url = "https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar" + + // Prepare 100 rows in PG; snapshot_split_size=5 -> 20 splits across multiple RPCs. + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${pgTable}""" + sql """CREATE TABLE ${pgDB}.${pgSchema}.${pgTable} ( + "id" int4 PRIMARY KEY, + "name" varchar(200), + "age" int4 + )""" + StringBuilder sb = new StringBuilder() + sb.append("INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (id, name, age) VALUES ") + for (int i = 1; i <= totalRows; i++) { + if (i > 1) sb.append(", ") + sb.append("(${i}, 'name_${i}', ${i})") + } + sql sb.toString() + } + + // CREATE should return promptly: splitting is now driven by the scheduler each tick. + long createStartMs = System.currentTimeMillis() + sql """ + CREATE JOB ${jobName} + ON STREAMING DO INSERT INTO ${currentDb}.${dorisTable} (id, name, age) + SELECT id, name, age FROM cdc_stream( + "type" = "postgres", + "jdbc_url" = "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}", + "driver_url" = "${driver_url}", + "driver_class" = "org.postgresql.Driver", + "user" = "${pgUser}", + "password" = "${pgPassword}", + "database" = "${pgDB}", + "schema" = "${pgSchema}", + "table" = "${pgTable}", + "offset" = "initial", + "snapshot_split_size" = "5", + "snapshot_parallelism" = "4" + ) + """ + long createElapsedMs = System.currentTimeMillis() - createStartMs + log.info("CREATE JOB elapsed: ${createElapsedMs} ms") + assert createElapsedMs < 30_000 : + "TVF CREATE should return quickly under async splitting, took ${createElapsedMs} ms" + + // Job metadata should be visible immediately. + def jobRows = sql """select Name from jobs("type"="insert") where Name='${jobName}'""" + assert jobRows.size() == 1 + + // Wait until snapshot fully syncs (20 splits at parallelism=4 -> ~5 task ticks). + try { + Awaitility.await().atMost(300, SECONDS) + .pollInterval(2, SECONDS).until( + { + def cnt = sql """SELECT COUNT(*) FROM ${currentDb}.${dorisTable}""" + log.info("doris row count: ${cnt}") + cnt.size() == 1 && cnt.get(0).get(0) == totalRows + } + ) + } catch (Exception ex) { + def showjob = sql """select * from jobs("type"="insert") where Name='${jobName}'""" + def showtask = sql """select * from tasks("type"="insert") where JobName='${jobName}'""" + log.info("show job: " + showjob) + log.info("show task: " + showtask) + throw ex + } + + // Distinct PK count must equal total — no duplicates from dedup / retry paths. + def doneDistinct = sql """SELECT COUNT(DISTINCT id) FROM ${currentDb}.${dorisTable}""" + assert doneDistinct.get(0).get(0) == totalRows + + // Verify binlog phase: INSERT/UPDATE/DELETE all propagate after snapshot. + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (id, name, age) VALUES (101, 'incr_101', 101);""" + sql """UPDATE ${pgDB}.${pgSchema}.${pgTable} SET age = 999 WHERE id = 1;""" + sql """DELETE FROM ${pgDB}.${pgSchema}.${pgTable} WHERE id = 2;""" + } + + try { + Awaitility.await().atMost(120, SECONDS) + .pollInterval(2, SECONDS).until( + { + def res = sql """SELECT + COUNT(*), + SUM(CASE WHEN id=1 AND age=999 THEN 1 ELSE 0 END), + SUM(CASE WHEN id=101 THEN 1 ELSE 0 END), + SUM(CASE WHEN id=2 THEN 1 ELSE 0 END) + FROM ${currentDb}.${dorisTable}""" + log.info("binlog state: ${res}") + res.size() == 1 + && res.get(0).get(0) == totalRows + && res.get(0).get(1) == 1 + && res.get(0).get(2) == 1 + && res.get(0).get(3) == 0 + } + ) + } catch (Exception ex) { + def showjob = sql """select * from jobs("type"="insert") where Name='${jobName}'""" + log.info("show job: " + showjob) + throw ex + } + + def jobInfo = sql """ + select status from jobs("type"="insert") where Name='${jobName}' + """ + assert jobInfo.get(0).get(0) == "RUNNING" + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + def jobCountRsp = sql """select count(1) from jobs("type"="insert") where Name='${jobName}'""" + assert jobCountRsp.get(0).get(0) == 0 + } +}