[improve](streaming-job) async chunk splitting for StreamingInsertJob#63079
[improve](streaming-job) async chunk splitting for StreamingInsertJob#63079JNSimba wants to merge 1 commit into
Conversation
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
There was a problem hiding this comment.
Pull request overview
This PR moves StreamingInsertJob (CDC FROM-TO and cdc_stream TVF) snapshot chunk splitting from a synchronous CREATE STREAMING JOB path to an incremental, scheduler-tick-driven flow. The goal is to avoid long blocking CREATE times and BRPC timeouts on large / skewed PK tables by fetching snapshot splits in small batches and persisting progress for recovery.
Changes:
- Adds split-progress APIs to
SourceOffsetProviderand implements an async split state machine inJdbcSourceOffsetProvider(plus new FE tests). - Introduces
FetchTableSplitsRequestfields to drive stateless, resumable split generation (nextSplitStart/nextSplitId/batchSize) and rebuilds cdc_client split fetching around flink-cdcChunkSplitter. - Persists per-table chunk lists incrementally via
StreamingJobUtils.upsertChunkList, and advances splits each scheduler tick (including a pre-advance in PENDING).
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java | Reworks /api/fetchSplits handling to drive flink-cdc ChunkSplitter directly (stateless batch split generation). |
| fe/fe-core/src/test/java/org/apache/doris/job/offset/jdbc/SplitProgressTest.java | Unit tests for SplitProgress default state and deep-copy semantics. |
| fe/fe-core/src/test/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProviderAsyncSplitTest.java | Unit tests covering async split advancement, dedup, noMoreSplits, and committed-progress advancement. |
| fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java | Adds per-table chunk_list UPSERT support with id reuse / allocation. |
| fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java | Adds default split-progress hooks (initSplitProgress, advanceSplits, noMoreSplits). |
| fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcTvfSourceOffsetProvider.java | Removes create-time pre-splitting; re-init split progress on replay; relies on scheduler-driven split fetching. |
| fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java | Implements async split progress, scheduler-driven split fetching, persistence to system table, and restart replay logic. |
| fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java | Calls advanceSplitsIfNeed() each tick and pre-advances once in PENDING before dispatch. |
| fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java | Persists syncTables, initializes split progress on CREATE, and adds advanceSplitsIfNeed() that pauses job on failure. |
| fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/FetchTableSplitsRequest.java | Adds nextSplitStart, nextSplitId, and batchSize fields to support resumable batched split fetching. |
Comments suppressed due to low confidence (1)
fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcTvfSourceOffsetProvider.java:320
- replayIfNeed() comment still says snapshot splits in the meta table are "written by initOnCreate", but initOnCreate() is now an intentional no-op and meta writes come from scheduler-driven advanceSplits()/upsertChunkList. Updating this comment will avoid confusion when debugging TVF recovery behavior.
// Re-init transient split progress fields lost across FE restart.
// syncTables itself is persisted on StreamingInsertJob; cdcSplitProgress is rebuilt empty
// here and advanceSplits will resume from the system table on next tick.
if (cdcSplitProgress == null) {
initSplitProgress(job.getSyncTables());
}
if (currentOffset == null) {
// No committed txn yet. If snapshot splits exist in the meta table (written by
// initOnCreate), restore remainingSplits so getNextOffset() returns snapshot splits
// instead of a BinlogSplit (which would incorrectly skip the snapshot phase).
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| StreamingJobUtils.resolveAndValidateSource( | ||
| dataSourceType, sourceProperties, String.valueOf(getJobId()), createTbls); | ||
| this.offsetProvider = createOffsetProvider(getConvertedSourceProperties()); | ||
| JdbcSourceOffsetProvider rdsOffsetProvider = (JdbcSourceOffsetProvider) this.offsetProvider; | ||
| rdsOffsetProvider.splitChunks(createTbls); | ||
| // Initialize split progress; advanceSplits is driven later by the scheduler each tick. | ||
| this.offsetProvider.initSplitProgress(this.syncTables); | ||
| } catch (Exception ex) { |
| /** Initialize at CREATE time. syncTables is the list of source tables this job syncs. */ | ||
| @Override | ||
| public void initSplitProgress(List<String> syncTables) { | ||
| synchronized (splitsLock) { | ||
| this.cachedSyncTables = syncTables; | ||
| this.committedSplitProgress = new SplitProgress(); | ||
| this.cdcSplitProgress = new SplitProgress(); | ||
| } | ||
| } |
| public List<AbstractSourceSplit> getSourceSplits(FetchTableSplitsRequest ftsReq) { | ||
| LOG.info("Get table {} splits for job {}", ftsReq.getSnapshotTable(), ftsReq.getJobId()); | ||
| LOG.info("Get table {} splits for job {} (nextSplitId={}, hasNextSplitStart={})", | ||
| ftsReq.getSnapshotTable(), ftsReq.getJobId(), | ||
| ftsReq.getNextSplitId(), | ||
| ftsReq.getNextSplitStart() != null); | ||
| JdbcSourceConfig sourceConfig = getSourceConfig(ftsReq); | ||
| List<org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit> | ||
| remainingSnapshotSplits = new ArrayList<>(); | ||
| StreamSplit remainingStreamSplit = null; | ||
|
|
||
| // Check startup mode - for PostgreSQL, we use similar logic as MySQL | ||
| String startupMode = ftsReq.getConfig().get(DataSourceConfigKeys.OFFSET); | ||
| if (DataSourceConfigKeys.OFFSET_INITIAL.equalsIgnoreCase(startupMode) | ||
| || DataSourceConfigKeys.OFFSET_SNAPSHOT.equalsIgnoreCase(startupMode)) { | ||
| remainingSnapshotSplits = | ||
| startSplitChunks(sourceConfig, ftsReq.getSnapshotTable(), ftsReq.getConfig()); | ||
| } else { | ||
| // For non-initial mode, create a stream split | ||
| Offset startingOffset = createInitialOffset(); | ||
| remainingStreamSplit = | ||
| new StreamSplit( | ||
| STREAM_SPLIT_ID, | ||
| startingOffset, | ||
| createNoStoppingOffset(), | ||
| new ArrayList<>(), | ||
| new HashMap<>(), | ||
| 0); | ||
| } | ||
| String schema = ftsReq.getConfig().get(DataSourceConfigKeys.SCHEMA); | ||
| TableId tableId = new TableId(null, schema, ftsReq.getSnapshotTable()); | ||
|
|
||
| // Reconstruct flink-cdc ChunkBound from FE-side nextSplitStart. | ||
| // null / empty -> START_BOUND (first time splitting this table, start from min(pk)) | ||
| // non-empty -> middleOf(value[0]) (continue from previous split's splitEnd) | ||
| // END_BOUND never appears in transit: when a table is fully split, FE clears | ||
| // committedSplitProgress.currentSplittingTable to null, so the next RPC will be for | ||
| // a different table or no RPC at all. | ||
| Object[] pkValues = ftsReq.getNextSplitStart(); | ||
| ChunkBound bound = (pkValues == null || pkValues.length == 0) | ||
| ? ChunkBound.START_BOUND | ||
| : ChunkBound.middleOf(pkValues[0]); | ||
|
|
||
| int splitId = ftsReq.getNextSplitId() == null ? 0 : ftsReq.getNextSplitId(); | ||
| int batchSize = ftsReq.getBatchSize() == null ? 100 : ftsReq.getBatchSize(); | ||
|
|
||
| ChunkSplitterState state = new ChunkSplitterState(tableId, bound, splitId); | ||
| ChunkSplitter splitter = getDialect(sourceConfig).createChunkSplitter(sourceConfig, state); | ||
| splitter.open(); |
| synchronized (splitsLock) { | ||
| // 1. Pick next table if not currently splitting one. | ||
| if (cdcSplitProgress.getCurrentSplittingTable() == null) { | ||
| List<String> remaining = computeCdcRemainingTables(); | ||
| if (remaining.isEmpty()) { | ||
| return; | ||
| } | ||
| cdcSplitProgress.setCurrentSplittingTable(remaining.get(0)); | ||
| cdcSplitProgress.setNextSplitStart(null); | ||
| cdcSplitProgress.setNextSplitId(null); | ||
| } | ||
| String tbl = cdcSplitProgress.getCurrentSplittingTable(); | ||
| Object[] startVal = cdcSplitProgress.getNextSplitStart(); | ||
| Integer splitId = cdcSplitProgress.getNextSplitId(); | ||
|
|
||
| // 2. RPC. OK to hold lock during RPC: no async splitting thread; updateOffset on | ||
| // task commit waits briefly (max_interval >> RPC time anyway). | ||
| List<SnapshotSplit> batch = rpcFetchSplitsBatch(tbl, startVal, splitId); | ||
| if (batch == null || batch.isEmpty()) { | ||
| return; | ||
| } | ||
|
|
||
| // 3. mergeBySplitId (defensive dedup). | ||
| Set<String> existingIds = new HashSet<>(); | ||
| finishedSplits.forEach(s -> existingIds.add(s.getSplitId())); | ||
| remainingSplits.forEach(s -> existingIds.add(s.getSplitId())); | ||
| List<SnapshotSplit> newSplits = new ArrayList<>(); | ||
| for (SnapshotSplit s : batch) { | ||
| if (!existingIds.contains(s.getSplitId())) { | ||
| newSplits.add(s); | ||
| } | ||
| } | ||
| remainingSplits.addAll(newSplits); | ||
|
|
||
| // 4. UPSERT this table's full chunk_list to system table. | ||
| List<SnapshotSplit> splitsOfTbl = Stream.concat( | ||
| finishedSplits.stream(), remainingSplits.stream()) | ||
| .filter(s -> tbl.equals(s.getTableId())) | ||
| .sorted(Comparator.comparingInt(s -> splitIdOf(s.getSplitId()))) | ||
| .collect(Collectors.toList()); | ||
| try { | ||
| StreamingJobUtils.upsertChunkList(getJobId(), tbl, splitsOfTbl); | ||
| } catch (Exception e) { | ||
| throw new JobException("UPSERT chunk_list failed for " + tbl + ": " + e.getMessage()); | ||
| } | ||
|
|
||
| // 5. Advance cdcSplitProgress to the last split in the batch. | ||
| applySplitToProgress(cdcSplitProgress, batch.get(batch.size() - 1)); | ||
| } | ||
| } | ||
|
|
| // 3. mergeBySplitId (defensive dedup). | ||
| Set<String> existingIds = new HashSet<>(); | ||
| finishedSplits.forEach(s -> existingIds.add(s.getSplitId())); | ||
| remainingSplits.forEach(s -> existingIds.add(s.getSplitId())); | ||
| List<SnapshotSplit> newSplits = new ArrayList<>(); | ||
| for (SnapshotSplit s : batch) { | ||
| if (!existingIds.contains(s.getSplitId())) { | ||
| newSplits.add(s); | ||
| } | ||
| } | ||
| remainingSplits.addAll(newSplits); | ||
|
|
||
| // 4. UPSERT this table's full chunk_list to system table. | ||
| List<SnapshotSplit> splitsOfTbl = Stream.concat( | ||
| finishedSplits.stream(), remainingSplits.stream()) | ||
| .filter(s -> tbl.equals(s.getTableId())) | ||
| .sorted(Comparator.comparingInt(s -> splitIdOf(s.getSplitId()))) | ||
| .collect(Collectors.toList()); | ||
| try { |
| /** Parse the trailing integer id from flink-cdc splitId format "tableId:id". */ | ||
| private static int splitIdOf(String splitId) { | ||
| int colon = splitId.lastIndexOf(':'); | ||
| return Integer.parseInt(splitId.substring(colon + 1)); | ||
| } |
| // non-empty -> middleOf(value[0]) (continue from previous split's splitEnd) | ||
| // END_BOUND never appears in transit: when a table is fully split, FE clears | ||
| // committedSplitProgress.currentSplittingTable to null, so the next RPC will be for | ||
| // a different table or no RPC at all. | ||
| Object[] pkValues = ftsReq.getNextSplitStart(); | ||
| ChunkBound bound = (pkValues == null || pkValues.length == 0) | ||
| ? ChunkBound.START_BOUND | ||
| : ChunkBound.middleOf(pkValues[0]); |
|
/review |
There was a problem hiding this comment.
I found one additional blocking issue beyond the existing review threads.
Critical checkpoint conclusions:
- Goal/test coverage: the PR moves CDC chunk splitting from CREATE-time blocking to scheduler-driven async fetching and adds focused unit tests, but the TVF FE-restart path is not covered and can stop fetching the remaining chunks.
- Scope: the change is focused on async split progress, though it touches both FROM-TO and TVF paths.
- Concurrency: existing threads already cover the broad splitsLock/RPC concern; I did not add a duplicate. The new issue is lifecycle/replay state, not a new lock-order finding.
- Lifecycle/replay: blocking issue found for cdc_stream TVF restart because transient cachedSyncTables is not restored.
- Configuration/compatibility: no new config or wire-incompatible persistent format issue found beyond the added optional request fields.
- Parallel paths: FROM-TO replay initializes split progress, but the TVF override does not do the equivalent unconditionally.
- Testing: unit tests cover the base provider state machine, but miss TVF replay after FE restart. No end-to-end regression result is included in this PR.
- Observability/performance/transactions: no additional distinct issue found beyond existing review threads.
- User focus: no additional user-provided review focus was supplied.
| // syncTables itself is persisted on StreamingInsertJob; cdcSplitProgress is rebuilt empty | ||
| // here and advanceSplits will resume from the system table on next tick. | ||
| if (cdcSplitProgress == null) { | ||
| initSplitProgress(job.getSyncTables()); |
There was a problem hiding this comment.
This restart guard only runs when cdcSplitProgress is null, but the no-arg constructor already initializes it in the base class. After an FE restart for a cdc_stream TVF job, replayIfNeed() runs before ensureInitialized(), so cachedSyncTables remains null. The restored remainingSplits from streaming_job_meta can still be consumed, but once those are committed, noMoreSplits() treats the null table cache as empty and advanceSplitsIfNeed() never fetches the next batch. A mid-snapshot restart therefore truncates the snapshot to only the chunks already present in the meta table. Please always restore the transient split progress/table cache here, or at least call initSplitProgress(job.getSyncTables()) when cachedSyncTables is null as well, and add a TVF restart test that consumes restored chunks then verifies another batch is fetched.
| public Offset getNextOffset(StreamingJobProperties jobProps, Map<String, String> properties) { | ||
| JdbcOffset nextOffset = new JdbcOffset(); | ||
| if (!remainingSplits.isEmpty()) { | ||
| int splitsNum = Math.min(remainingSplits.size(), snapshotParallelism); | ||
| List<SnapshotSplit> snapshotSplits = new ArrayList<>(remainingSplits.subList(0, splitsNum)); | ||
| nextOffset.setSplits(snapshotSplits); | ||
| return nextOffset; | ||
| } else if (currentOffset != null && currentOffset.snapshotSplit()) { | ||
| // initial mode: snapshot to binlog | ||
| // snapshot-only mode must be intercepted by hasReachedEnd() before reaching here | ||
| } else if (currentOffset != null && currentOffset.snapshotSplit() && noMoreSplits()) { | ||
| // initial mode: snapshot to binlog. noMoreSplits() guards against switching while | ||
| // splitting is still in progress (remainingSplits empty doesn't mean fully cut). | ||
| // snapshot-only mode must be intercepted by hasReachedEnd() before reaching here. |
| @Override | ||
| public boolean hasMoreDataToConsume() { | ||
| if (currentOffset == null) { | ||
| return true; | ||
| } | ||
|
|
||
| if (currentOffset.snapshotSplit()) { | ||
| if (isSnapshotOnlyMode() && remainingSplits.isEmpty()) { | ||
| if (!remainingSplits.isEmpty()) { | ||
| return true; | ||
| } | ||
| // remainingSplits empty: if splitting is still in progress, scheduler should delay | ||
| // task dispatch (advanceSplits will push more splits next tick). | ||
| if (!noMoreSplits()) { | ||
| return false; | ||
| } | ||
| return true; | ||
| // Splitting fully done: snapshot-only completes here; initial mode falls through to binlog. | ||
| return !isSnapshotOnlyMode(); |
| /** Parse the trailing integer id from flink-cdc splitId format "tableId:id". */ | ||
| private static int splitIdOf(String splitId) { | ||
| int colon = splitId.lastIndexOf(':'); | ||
| return Integer.parseInt(splitId.substring(colon + 1)); |
| /** RPC fetchSplits with (table, nextSplitStart, nextSplitId, batchSize). protected for UT subclass. */ | ||
| protected List<SnapshotSplit> rpcFetchSplitsBatch(String table, Object[] nextSplitStart, Integer nextSplitId) | ||
| throws JobException { | ||
| Backend backend = StreamingJobUtils.selectBackend(cloudCluster); | ||
| FetchTableSplitsRequest requestParams = new FetchTableSplitsRequest( | ||
| getJobId(), sourceType.name(), sourceProperties, getFrontendAddress(), table); | ||
| requestParams.setNextSplitStart(nextSplitStart); | ||
| requestParams.setNextSplitId(nextSplitId); | ||
| InternalService.PRequestCdcClientRequest request = InternalService.PRequestCdcClientRequest.newBuilder() | ||
| .setApi("/api/fetchSplits") | ||
| .setParams(new Gson().toJson(requestParams)) | ||
| .build(); |
| public static void upsertChunkList(Long jobId, String tableName, List<SnapshotSplit> chunks) throws Exception { | ||
| createMetaTableIfNotExist(); | ||
| Integer id = querySingleTableId(jobId, tableName); | ||
| if (id == null) { | ||
| id = queryNextAvailableId(jobId); | ||
| } | ||
| Map<String, String> params = new HashMap<>(); | ||
| params.put("id", String.valueOf(id)); | ||
| params.put("job_id", String.valueOf(jobId)); | ||
| params.put("table_name", tableName); | ||
| params.put("chunk_list", objectMapper.writeValueAsString(chunks)); | ||
| StringSubstitutor sub = new StringSubstitutor(params); | ||
| String sql = sub.replace(INSERT_INTO_META_TABLE_TEMPLATE); | ||
| batchInsert(Collections.singletonList(sql)); |
| // non-empty -> middleOf(value[0]) (continue from previous split's splitEnd) | ||
| // END_BOUND never appears in transit: when a table is fully split, FE clears | ||
| // committedSplitProgress.currentSplittingTable to null, so the next RPC will be for | ||
| // a different table or no RPC at all. | ||
| Object[] pkValues = ftsReq.getNextSplitStart(); | ||
| ChunkBound bound = (pkValues == null || pkValues.length == 0) | ||
| ? ChunkBound.START_BOUND | ||
| : ChunkBound.middleOf(pkValues[0]); |
| ChunkSplitterState state = new ChunkSplitterState(tableId, bound, splitId); | ||
| ChunkSplitter splitter = getDialect(sourceConfig).createChunkSplitter(sourceConfig, state); | ||
| splitter.open(); | ||
|
|
||
| List<AbstractSourceSplit> splits = new ArrayList<>(); | ||
| if (!remainingSnapshotSplits.isEmpty()) { | ||
| for (org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit | ||
| snapshotSplit : remainingSnapshotSplits) { | ||
| String splitId = snapshotSplit.splitId(); | ||
| String tableId = snapshotSplit.getTableId().identifier(); | ||
| Object[] splitStart = snapshotSplit.getSplitStart(); | ||
| Object[] splitEnd = snapshotSplit.getSplitEnd(); | ||
| List<String> splitKey = snapshotSplit.getSplitKeyType().getFieldNames(); | ||
| SnapshotSplit split = | ||
| new SnapshotSplit(splitId, tableId, splitKey, splitStart, splitEnd, null); | ||
| splits.add(split); | ||
| try { | ||
| List<AbstractSourceSplit> result = new ArrayList<>(); | ||
| while (result.size() < batchSize) { | ||
| Collection<org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit> | ||
| chunks = splitter.generateSplits(tableId); |
| /** | ||
| * PK values of the next split start (inclusive lower bound). null = fresh start (START_BOUND). | ||
| * Same shape as SnapshotSplit.splitStart/splitEnd; cdc_client takes [0] to construct ChunkBound. | ||
| */ | ||
| private Object[] nextSplitStart; | ||
|
|
||
| /** Next split id; null = 0 (fresh start). */ | ||
| private Integer nextSplitId; | ||
|
|
||
| /** Max splits to fetch in this RPC; null = default 100. */ | ||
| private Integer batchSize; |
Summary
StreamingInsertJob(CDC FROM-TO andcdc_streamTVF paths) used to callsplitChunks()synchronously insideCREATE STREAMING JOB, asking cdc_client to cut every chunk of every table before returning. On large/non-uniform PK tables this can take 30+ minutes — far beyond the BE→cdc_client BRPC 60s timeout, and the SQL client blocks the whole time.This PR makes splitting tick-driven by the FE scheduler:
CREATEreturns immediately; no more synchronoussplitChunks().advanceSplits()issues one short fetchSplits RPC (defaultbatchSize=100) and pushes that batch intoremainingSplits. Tasks dispatch as soon as the first batch lands, so end-to-end first-byte latency stays close to flink-cdc's.ChunkSplitterfrom the(currentSplittingTable, nextSplitStart, nextSplitId)triple supplied by FE; flink-cdc internals are untouched (uses the publicChunkSplitterAPI only).committedSplitProgress(3-fieldSplitProgress) + existingchunkHighWatermarkMap/binlogOffsetPersiststreaming_job_metasystem table holds fullchunk_listJSON per table (UPSERT eachadvanceSplits)SourceOffsetProvider#initSplitProgress/noMoreSplits/advanceSplitsinterface;StreamingJobSchedulerTask.handlePendingStatepre-advances one batch so the first task doesn't wait a fullmax_interval.Detailed design lives in the linked plan.
Changes
fe-common:FetchTableSplitsRequestaddsnextSplitStart(Object[]) /nextSplitId/batchSize.fe-core:SourceOffsetProvideradds 3 default methods:initSplitProgress/advanceSplits/noMoreSplits.JdbcSourceOffsetProviderimplements the async state machine (committed/cdcSplitProgress,advanceSplits, dedup, system-table UPSERT, replay path).JdbcTvfSourceOffsetProvider.initOnCreateno longer pre-splits; relies on the same scheduler tick path.StreamingInsertJobcarriessyncTables(@SerializedName("st"));initSourceJob/initInsertJobinitializeSplitProgress;advanceSplitsIfNeed()mirrorsfetchMetaerror handling (PAUSE on failure).StreamingJobSchedulerTask.handlePendingState/handleRunningStatecalladvanceSplitsIfNeed()each tick; PENDING handler pre-advances and short-circuits if PAUSED.StreamingJobUtils.upsertChunkListcovers id-allocation viaMAX(id)+1lookup.cdc_client/JdbcIncrementalSourceReader:getSourceSplits()rebuilt around the publicChunkSplitterAPI (no more in-memory loop / reflection hack).Tests
SplitProgressTest— copy/null-field semantics.JdbcSourceOffsetProviderAsyncSplitTest— coversadvanceSplits(first call / continue same table / cross-table switch / dedup / empty batch),noMoreSplits,updateOffsetcommitted-progress advancement (mid-chunk vs last chunk vs replay missing-split path), andcomputeCdcRemainingTables.test_streaming_postgres_job_async_split.groovy— 100 rows ×snapshot_split_size=5→ 20 splits across multiple ticks; asserts CREATE returns < 30s, full snapshot count + DISTINCT id, then INSERT/UPDATE/DELETE in binlog phase.Test plan
mvn test -pl fe/fe-core -Dtest=JdbcSourceOffsetProviderAsyncSplitTest,SplitProgressTesttest_streaming_postgres_job_async_splitregression locallyCREATEreturns in seconds,SHOW STREAMING JOBimmediately reflects the new job, snapshot completes, binlog phase healthycdc_streamTVF + StreamingInsertJob path: confirm CREATE no longer blocks