[AURON #1857] Introduce FlinkAuronCalcOperator#2263
Open
weiqingy wants to merge 4 commits into
Open
Conversation
…arding Extract a reusable FlinkMetricNode class that forwards metric updates published by the native engine to a Flink MetricGroup. Counters are lazily created in a ConcurrentHashMap so concurrent native-thread callbacks via JNI are thread-safe. Non-positive deltas are ignored to match SparkMetricNode semantics (spark-extension/.../SparkMetricNode.scala:41-44), avoiding spurious counter activity from native callbacks that report zero deltas for unaffected metrics. This is the first commit toward FlinkAuronCalcOperator (AURON apache#1857). The existing inline anonymous MetricNode subclass in AuronKafkaSourceFunction.run() will migrate to FlinkMetricNode as a follow-up; intentionally not migrated here to keep this commit focused. Includes 4 JUnit 5 unit tests covering counter creation, caching, non-positive value handling, and child-node propagation. Tests use hand-rolled MetricGroup/Counter fakes since the module has no Mockito dependency.
…native row batching
Add FlinkArrowFFIExporter, a synchronous AuronArrowFFIExporter subclass
that wraps FlinkArrowWriter so the native engine can pull batches of
Flink RowData from the operator's hot path via the Arrow C-Data FFI.
The operator (next commit) calls:
- offer(row) to buffer a row into the active VectorSchemaRoot;
- isBatchFull() to decide when to drain;
- noMoreInput() before close;
- exportSchema(ptr) during open() to populate the FFI ArrowSchema;
- exportNextBatch(arrayPtr) on the native pull to populate the FFI
ArrowArray and rotate to a fresh root. Returns false only when the
buffer is empty AND noMoreInput has been set; partial batches still
flush on EOI.
Unlike Spark's threaded ArrowFFIExporter, this implementation runs
synchronously on the operator thread. Spark's threading exists to
absorb iterator-side IO latency from an Iterator[InternalRow] backed
by disk/shuffle. Flink Calc is push-based (processElement delivers
already-materialized rows) and Flink's runtime serializes operator
calls with in-flight JNI calls (tokio spawn_blocking), so there is no
latency to absorb and no concurrent producer to coordinate with.
See AURON-1857 DESIGN.md OQ2 for the full thread trace.
Resource lifecycle:
- rotateRoot() closes the previous VectorSchemaRoot before allocating
a new one;
- exportNextBatch() rotates only after the FFI export succeeds, so
the error path never leaks;
- close() is idempotent and verified to release all child-allocator
memory.
Row count is tracked on the exporter itself, not delegated to the
underlying FlinkArrowWriter (which has no public getRowCount()). This
keeps the writer untouched.
Includes 5 JUnit 5 round-trip tests covering schema export, 100-row
export with field-by-field verification via FlinkArrowReader, EOI
empty-buffer semantics, EOI partial-batch flush, and allocator-zero
verification on close. All tests are pure Arrow C-Data round-trip with
hand-rolled FFI struct allocation -- no native call, no libauron.
… plans Add FlinkAuronCalcOperator, the Flink TableStreamOperator<RowData> that executes a pre-composed native Calc plan (Project[Filter[FFIReader]]) by streaming RowData through the FlinkArrowFFIExporter (input bridge), AuronCallNativeWrapper (native execution loop), and FlinkArrowReader (output bridge). This is the operator layer of the three-layer architecture (plan / operator / graph-rewriter). The operator surface is intentionally free of RexNode, StreamExecCalc, and any Flink planner type -- the constructor takes only PhysicalPlanNode, RowType, RowType, and a string operator ID. Graph rewriting (substituting Flink's codegen factory) is the next PR (apache#1853); operator fusion is deferred to apache#1865. OQ1 resolution: the runtime FFI Reader resource ID has the form "FlinkAuronCalc-<flinkOpId>-<subtask>:<UUID>" so the operator ID rides in the resource-ID prefix. No proto change is required. The SupportsAuronNative.getAuronOperatorId() accessor still returns the unsuffixed constructor value, mirroring AuronKafkaSourceFunction. OQ2 resolution: the operator runs synchronously on the operator thread (no producer thread). Flink's runtime serializes processElement with in-flight loadNextBatch calls (which themselves run on tokio's spawn_blocking pool), so no concurrent producer needs coordination. This is a substantial simplification over Spark's threaded ArrowFFIExporter, which exists to absorb iterator-side IO latency that Flink Calc does not have. The open() method builds the per-subtask resource ID, allocates a child Arrow allocator, instantiates the FFI exporter and FlinkMetricNode, registers the exporter in JniBridge, rewrites the FFI Reader leaf with the bound resource ID, and constructs the native runtime via a @VisibleForTesting factory seam. processElement buffers rows and drains on batch-full or watermark or pre-checkpoint or close. close() uses nested try/finally to guarantee cleanup of every resource even on partial-init failures; the outer guard checks both exporter and nativeRuntime so a partial open() does not NPE during drain. The injectFfiReaderLeaf static helper accepts Project[Filter[FFIReader]], Project[FFIReader], Filter[FFIReader], and bare FFIReader plan shapes; unsupported shapes throw IllegalArgumentException with the offending plan type in the message. Includes 13 JUnit 5 tests via a NativeRuntime mock seam -- no libauron invocation, no flink-test-utils. Tests cover putResource registration, resource-ID composition, FFI Reader injection, processElement buffer and drain, watermark and checkpoint barrier drain ordering, idempotent close, SupportsAuronNative accessors, and all four accepted plan shapes plus the throw branch of the recursion helper.
Contributor
Author
|
Hi @Tartarus0zm, this PR implements the operator layer per the design discussed in #1857. CI is green. Would appreciate your review when you have time. Thanks! |
Tartarus0zm
reviewed
May 13, 2026
Contributor
Tartarus0zm
left a comment
There was a problem hiding this comment.
Thanks for your contribution!
I left a few comments, PTAL
| * {@link #emitArrowBatch(VectorSchemaRoot)}. | ||
| */ | ||
| private void drainNative() { | ||
| while (nativeRuntime.loadNextBatch(this::emitArrowBatch)) { |
Contributor
There was a problem hiding this comment.
I think that when the while loop exits here, it means the current Auron task has finished, and FlinkArrowFFIExporter and AuronCallNativeWrapper may need to be reinitialized.
We can create a test in FlinkAuronCalcOperatorTest, call processElement multiple times to make exporter.isBatchFull() == true, and then see if an exception is thrown.
| // Construction helpers | ||
| // ===================================================================== | ||
|
|
||
| private TestableOperator newOperator() { |
Contributor
There was a problem hiding this comment.
typo? TestTableOperator
…er drain cycle Addresses @Tartarus0zm's code review on PR apache#2263. The Calc operator's drainNative pattern was lifted from AuronKafkaSourceFunction's one-shot run loop. For multi-cycle streaming (per isBatchFull / watermark / checkpoint / close), three interactions converged into a latent bug: 1. AuronCallNativeWrapper.loadNextBatch auto-closes the native runtime on false return (AuronCallNativeWrapper.java:127), making the wrapper dead after the first drain. 2. The native FFIReaderExec's AutoCloseableExporter::drop calls Java close() on the exporter (ffi_reader_exec.rs:192-198), nulling its writer. 3. The native FFI Reader loop does not filter empty batches; the original exportNextBatch contract (true with empty when !noMoreInput) caused drainNative to spin forever. Fix (operator-layer only, no auron-core or Rust changes): * FlinkArrowFFIExporter.exportNextBatch now returns false whenever rowCount is zero. Adds public reset() to re-prepare the exporter after the FFI Reader's drop closed it. Adds public isEmpty() so the operator can short-circuit no-op drains. noMoreInput() downgraded to a soft hint. * FlinkAuronCalcOperator caches nativeMemory in open() and adds a closing flag. drainNative() early-returns on empty (keeps wrapper alive across idle watermarks/checkpoints), else loops then calls a new private reinitExporterAndRuntime() helper when !closing. close() sets closing first so the final drain does not allocate replacement resources only to tear them down. Per-subtask identity is preserved: the same resourceId from open() is reused on every reinit; OQ1's per-subtask uniqueness invariant holds. Also renames the test seam subclass TestableOperator -> TestFlinkAuronCalcOperator to remove the typo ambiguity flagged in the same review. Tests: * New testProcessElementMultiCycleDrainReinitializesWrapper with a OneShotNativeRuntime fake modeling production close-on-false. Asserts factory invocation count, per-cycle JniBridge re-registration, and post-cycles offer usability. * New testExportNextBatchReturnsFalseOnEmpty for the empty-batch contract. * TrackingExporter.isEmpty() override added so existing drain tests work with the new isEmpty short-circuit. 131/131 module tests pass. 0 checkstyle violations.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
Closes #1857
Rationale for this change
This PR adds the Flink-side runtime operator that executes the native Calc plan composed by #1856 + #1859. It is the operator layer of the three-layer architecture (plan / operator / graph-rewriter) for the Flink contribution track (tracking issue #1264).
The operator surface is intentionally free of
RexNode,StreamExecCalc, and any Flink planner type — the constructor takes onlyPhysicalPlanNode,RowType,RowType, and a string operator ID. That invariant lets #1853 (rewriteStreamExecCalc) plug the operator in without dragging planner types into the runtime module.This is the standalone-Calc MVP (Scenario B: with an FFI Reader leaf for input). Source/operator fusion (Scenario A) is deferred to #1865, as endorsed by the reviewer.
What changes are included in this PR?
Three sequential commits in
auron-flink-extension/auron-flink-runtime:FlinkMetricNode(org.apache.auron.flink.metric) — extracts the inline anonymousMetricNodecurrently inAuronKafkaSourceFunction.run()into a reusable class. Lazily maps each metric name to a FlinkCounterin aConcurrentHashMap(native callbacks may arrive on tokio threads). Non-positive deltas are ignored, matchingSparkMetricNodesemantics.FlinkArrowFFIExporter(org.apache.auron.flink.arrow) — synchronousAuronArrowFFIExporterthat wrapsFlinkArrowWriterso the native engine can pull batches ofRowDatavia the Arrow C-Data FFI. Unlike Spark's threadedArrowFFIExporter(which absorbs iterator-side IO latency), this exporter runs synchronously on the operator thread. Flink Calc is push-based and Flink's runtime serializes operator calls with the in-flight JNI call toAuronCallNativeWrapper.loadNextBatch(which runs on tokio'sspawn_blockingpool), so there is no latency to absorb and no concurrent producer to coordinate with.rotateRoot()closes the previousVectorSchemaRootbefore allocating a new one and rotates only after a successful FFI export, so the error path never leaks;close()is idempotent and verified to release all allocator memory.FlinkAuronCalcOperator(org.apache.auron.flink.runtime.operator) — the operator itself. ExtendsTableStreamOperator<RowData>(so chaining isALWAYSfor free) and implementsOneInputStreamOperator<RowData, RowData>+FlinkAuronOperator. Public constructor is planner-free:(PhysicalPlanNode plan, RowType inputRowType, RowType outputRowType, String auronOperatorId); a@VisibleForTestingoverload accepts aNativeRuntimeFactoryfor the unit-test mock seam.open()composes the per-subtask operator ID, allocates a childBufferAllocator, builds theFlinkMetricNodeandFlinkArrowFFIExporter, generates the runtime resource IDFlinkAuronCalc-<flinkOpId>-<subtask>:<UUID>(OQ1 resolution — operator ID rides in the resource-ID prefix; no proto change), registers the exporter viaJniBridge.putResource(resourceId, exporter), recursively rewrites the FFI Reader leaf of the supplied plan to carry the bound resource ID, and constructs the native runtime via the factory seam.processElementbuffers rows and drains on batch-full.processWatermarkandprepareSnapshotPreBarrierdrain before forwarding.close()uses nested try/finally so every resource is cleaned even on partial-init failure; the outer guard checks bothexporterandnativeRuntimeso a partialopen()does not NPE during drain.injectFfiReaderLeafacceptsProject[Filter[FFIReader]],Project[FFIReader],Filter[FFIReader], and bareFFIReader. Unsupported shapes throwIllegalArgumentExceptionwith the offending type in the message.Explicitly out of scope (per reviewer guidance):
AuronKafkaSourceFunctionis not migrated to useFlinkMetricNodehere — trivial follow-up, kept out to minimize diff.BATCH_ROW_LIMITis hard-coded at 8192; future PR wires it throughFlinkAuronConfiguration.Are there any user-facing changes?
No. This PR introduces internal classes used by the future
StreamExecCalcrewriter (#1853). End-user behavior is unchanged until #1853 lands.How was this patch tested?
22 new JUnit 5 unit tests across three test classes:
FlinkMetricNodeTest(4) — counter creation, counter caching,v > 0gate, child propagation.FlinkArrowFFIExporterTest(5) — schema FFI round-trip, 100-row export withFlinkArrowReader-based field-by-field verification, EOI empty-buffer semantics, EOI partial-batch flush, allocator-zero on close.FlinkAuronCalcOperatorTest(13) —putResourceregistration, resource-ID composition, FFI Reader injection (4 plan shapes + 1 throw branch),processElementbuffer + batch-full drain, watermark drain ordering, pre-checkpoint barrier drain, idempotentclose(),SupportsAuronNativeaccessors.All tests run with
junit-jupiter-api+arrow-c-data+arrow-vectoronly — noflink-test-utils, nolibauroninvocation, no Mockito. The native runtime is mocked via a@VisibleForTestingNativeRuntimeFactoryseam../build/mvn test -Pspark-3.5,scala-2.12,flink-1.18 \ -pl auron-flink-extension/auron-flink-runtime \ -Dtest=FlinkMetricNodeTest,FlinkArrowFFIExporterTest,FlinkAuronCalcOperatorTestauron-flink-runtimemodule: 129/129 tests pass (no regressions)../build/mvn checkstyle:check: 0 violations.