Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,18 @@
public class FetchTableSplitsRequest extends JobBaseConfig {
private String snapshotTable;

/**
* PK values of the next split start (inclusive lower bound). null = fresh start (START_BOUND).
* Same shape as SnapshotSplit.splitStart/splitEnd; cdc_client takes [0] to construct ChunkBound.
*/
private Object[] nextSplitStart;

/** Next split id; null = 0 (fresh start). */
private Integer nextSplitId;

/** Max splits to fetch in this RPC; null = default 100. */
private Integer batchSize;
Comment on lines +36 to +46

public FetchTableSplitsRequest(Long jobId, String name,
Map<String, String> sourceProperties, String frontendAddress, String snapshotTable) {
super(jobId.toString(), name, sourceProperties, frontendAddress);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,11 @@ public class StreamingInsertJob extends AbstractJob<StreamingJobSchedulerTask, M
@SerializedName("ccn")
private volatile String cloudCluster;

/** Source tables this job syncs. Determined at CREATE; used by JdbcSourceOffsetProvider.advanceSplits. */
@Getter
@SerializedName("st")
private List<String> syncTables;

// The sampling window starts at the beginning of the sampling window.
// If the error rate exceeds `max_filter_ratio` within the window, the sampling fails.
@Setter
Expand Down Expand Up @@ -238,6 +243,7 @@ private void initSourceJob() {
init();
checkRequiredSourceProperties();
List<String> createTbls = createTableIfNotExists();
this.syncTables = createTbls;
if (sourceProperties.get(DataSourceConfigKeys.INCLUDE_TABLES) == null) {
// cdc need the final includeTables
String includeTables = String.join(",", createTbls);
Expand All @@ -246,8 +252,8 @@ private void initSourceJob() {
StreamingJobUtils.resolveAndValidateSource(
dataSourceType, sourceProperties, String.valueOf(getJobId()), createTbls);
this.offsetProvider = createOffsetProvider(getConvertedSourceProperties());
JdbcSourceOffsetProvider rdsOffsetProvider = (JdbcSourceOffsetProvider) this.offsetProvider;
rdsOffsetProvider.splitChunks(createTbls);
// Initialize split progress; advanceSplits is driven later by the scheduler each tick.
this.offsetProvider.initSplitProgress(this.syncTables);
} catch (Exception ex) {
Comment on lines 252 to 257
log.warn("init streaming job for {} failed", dataSourceType, ex);
throw new RuntimeException(ex.getMessage());
Expand Down Expand Up @@ -382,6 +388,10 @@ private void initInsertJob() {
this.originTvfProps = currentTvf.getProperties().getMap();
this.offsetProvider = createOffsetProvider(sourceProperties);
this.offsetProvider.ensureInitialized(getJobId(), originTvfProps);
// cdc_stream TVF has TABLE; S3 TVF doesn't, so syncTables stays null there.
String tvfTable = originTvfProps.get(DataSourceConfigKeys.TABLE);
this.syncTables = tvfTable == null ? null : Collections.singletonList(tvfTable);
this.offsetProvider.initSplitProgress(this.syncTables);
// Validate source-side resources (e.g. PG slot/publication ownership) once at job
// creation so conflicts fail fast. No-op for standalone cdc_stream TVF (no job).
StreamingJobUtils.validateTvfSource(tvfType, originTvfProps, String.valueOf(getJobId()));
Expand Down Expand Up @@ -704,6 +714,28 @@ protected void fetchMeta() throws JobException {
}
}

/**
* Advance one batch of split fetching if there are still tables to split.
* Called by scheduler each tick (PENDING/RUNNING). Mirrors fetchMeta error handling.
*/
public void advanceSplitsIfNeed() {
if (offsetProvider.noMoreSplits()) {
return;
}
try {
offsetProvider.advanceSplits();
} catch (Exception ex) {
log.warn("advance splits failed, job id: {}", getJobId(), ex);
if (this.getFailureReason() == null
|| !InternalErrorCode.MANUAL_PAUSE_ERR.equals(this.getFailureReason().getCode())) {
this.setFailureReason(new FailureReason(
InternalErrorCode.GET_REMOTE_DATA_ERROR,
"Failed to advance splits, " + ex.getMessage()));
this.updateJobStatus(JobStatus.PAUSED);
}
}
}

public boolean needScheduleTask() {
readLock();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,13 @@ private void handlePendingState() throws JobException {
}
}
streamingInsertJob.replayOffsetProviderIfNeed();
// Pre-advance one batch so the first task has splits to consume; otherwise the
// first dispatch would fall on the next tick (= waiting one max_interval).
streamingInsertJob.advanceSplitsIfNeed();
if (streamingInsertJob.getJobStatus() == JobStatus.PAUSED) {
// advanceSplits failed and paused the job; skip task dispatch this tick.
return;
}
if (streamingInsertJob.hasReachedEnd()) {
// Source already fully consumed (e.g. snapshot-only mode recovered after FE restart).
// Transition directly to FINISHED without creating a new task.
Expand All @@ -80,6 +87,8 @@ private void handlePendingState() throws JobException {
private void handleRunningState() throws JobException {
streamingInsertJob.processTimeoutTasks();
streamingInsertJob.fetchMeta();
// Each tick: fetch one more batch of splits from cdc_client.
streamingInsertJob.advanceSplitsIfNeed();
}

private void autoResumeHandler() throws JobException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,30 @@ default boolean hasReachedEnd() {
return false;
}

/**
* Initialize split progress at CREATE. Called once with the list of source tables this job syncs.
* For providers without splitting concept (e.g. S3), default is no-op.
*/
default void initSplitProgress(List<String> syncTables) {}

/**
* Advance one batch of split fetching, called by scheduler each tick during PENDING/RUNNING.
* For providers without async splitting work (e.g. S3, Kafka), default is no-op.
* Aligned with flink-cdc SnapshotSplitAssigner naming.
*
* @throws JobException if fetching splits fails fatally
*/
default void advanceSplits() throws JobException {}

/**
* Returns true if no more splits will be produced.
* For providers without splitting concept, always returns true.
* Aligned with flink-cdc SnapshotSplitAssigner.noMoreSplits() naming.
*/
default boolean noMoreSplits() {
return true;
}

/**
* Get the lag of the data source in seconds.
* For CDC sources, lag = (now - last consumed event timestamp) in seconds.
Expand Down
Loading
Loading