Skip to content

[AURON #1857] Introduce FlinkAuronCalcOperator#2263

Open
weiqingy wants to merge 4 commits into
apache:masterfrom
weiqingy:AURON-1857-impl
Open

[AURON #1857] Introduce FlinkAuronCalcOperator#2263
weiqingy wants to merge 4 commits into
apache:masterfrom
weiqingy:AURON-1857-impl

Conversation

@weiqingy
Copy link
Copy Markdown
Contributor

Which issue does this PR close?

Closes #1857

Rationale for this change

This PR adds the Flink-side runtime operator that executes the native Calc plan composed by #1856 + #1859. It is the operator layer of the three-layer architecture (plan / operator / graph-rewriter) for the Flink contribution track (tracking issue #1264).

The operator surface is intentionally free of RexNode, StreamExecCalc, and any Flink planner type — the constructor takes only PhysicalPlanNode, RowType, RowType, and a string operator ID. That invariant lets #1853 (rewrite StreamExecCalc) plug the operator in without dragging planner types into the runtime module.

This is the standalone-Calc MVP (Scenario B: with an FFI Reader leaf for input). Source/operator fusion (Scenario A) is deferred to #1865, as endorsed by the reviewer.

What changes are included in this PR?

Three sequential commits in auron-flink-extension/auron-flink-runtime:

  1. FlinkMetricNode (org.apache.auron.flink.metric) — extracts the inline anonymous MetricNode currently in AuronKafkaSourceFunction.run() into a reusable class. Lazily maps each metric name to a Flink Counter in a ConcurrentHashMap (native callbacks may arrive on tokio threads). Non-positive deltas are ignored, matching SparkMetricNode semantics.

  2. FlinkArrowFFIExporter (org.apache.auron.flink.arrow) — synchronous AuronArrowFFIExporter that wraps FlinkArrowWriter so the native engine can pull batches of RowData via the Arrow C-Data FFI. Unlike Spark's threaded ArrowFFIExporter (which absorbs iterator-side IO latency), this exporter runs synchronously on the operator thread. Flink Calc is push-based and Flink's runtime serializes operator calls with the in-flight JNI call to AuronCallNativeWrapper.loadNextBatch (which runs on tokio's spawn_blocking pool), so there is no latency to absorb and no concurrent producer to coordinate with. rotateRoot() closes the previous VectorSchemaRoot before allocating a new one and rotates only after a successful FFI export, so the error path never leaks; close() is idempotent and verified to release all allocator memory.

  3. FlinkAuronCalcOperator (org.apache.auron.flink.runtime.operator) — the operator itself. Extends TableStreamOperator<RowData> (so chaining is ALWAYS for free) and implements OneInputStreamOperator<RowData, RowData> + FlinkAuronOperator. Public constructor is planner-free: (PhysicalPlanNode plan, RowType inputRowType, RowType outputRowType, String auronOperatorId); a @VisibleForTesting overload accepts a NativeRuntimeFactory for the unit-test mock seam.

    open() composes the per-subtask operator ID, allocates a child BufferAllocator, builds the FlinkMetricNode and FlinkArrowFFIExporter, generates the runtime resource ID FlinkAuronCalc-<flinkOpId>-<subtask>:<UUID> (OQ1 resolution — operator ID rides in the resource-ID prefix; no proto change), registers the exporter via JniBridge.putResource(resourceId, exporter), recursively rewrites the FFI Reader leaf of the supplied plan to carry the bound resource ID, and constructs the native runtime via the factory seam.

    processElement buffers rows and drains on batch-full. processWatermark and prepareSnapshotPreBarrier drain before forwarding. close() uses nested try/finally so every resource is cleaned even on partial-init failure; the outer guard checks both exporter and nativeRuntime so a partial open() does not NPE during drain.

    injectFfiReaderLeaf accepts Project[Filter[FFIReader]], Project[FFIReader], Filter[FFIReader], and bare FFIReader. Unsupported shapes throw IllegalArgumentException with the offending type in the message.

Explicitly out of scope (per reviewer guidance):

Are there any user-facing changes?

No. This PR introduces internal classes used by the future StreamExecCalc rewriter (#1853). End-user behavior is unchanged until #1853 lands.

How was this patch tested?

22 new JUnit 5 unit tests across three test classes:

  • FlinkMetricNodeTest (4) — counter creation, counter caching, v > 0 gate, child propagation.
  • FlinkArrowFFIExporterTest (5) — schema FFI round-trip, 100-row export with FlinkArrowReader-based field-by-field verification, EOI empty-buffer semantics, EOI partial-batch flush, allocator-zero on close.
  • FlinkAuronCalcOperatorTest (13) — putResource registration, resource-ID composition, FFI Reader injection (4 plan shapes + 1 throw branch), processElement buffer + batch-full drain, watermark drain ordering, pre-checkpoint barrier drain, idempotent close(), SupportsAuronNative accessors.

All tests run with junit-jupiter-api + arrow-c-data + arrow-vector only — no flink-test-utils, no libauron invocation, no Mockito. The native runtime is mocked via a @VisibleForTesting NativeRuntimeFactory seam.

./build/mvn test -Pspark-3.5,scala-2.12,flink-1.18 \
    -pl auron-flink-extension/auron-flink-runtime \
    -Dtest=FlinkMetricNodeTest,FlinkArrowFFIExporterTest,FlinkAuronCalcOperatorTest
  • All 22 new tests pass.
  • Full auron-flink-runtime module: 129/129 tests pass (no regressions).
  • ./build/mvn checkstyle:check: 0 violations.

weiqingy added 3 commits May 10, 2026 23:11
…arding

Extract a reusable FlinkMetricNode class that forwards metric updates
published by the native engine to a Flink MetricGroup. Counters are
lazily created in a ConcurrentHashMap so concurrent native-thread
callbacks via JNI are thread-safe.

Non-positive deltas are ignored to match SparkMetricNode semantics
(spark-extension/.../SparkMetricNode.scala:41-44), avoiding spurious
counter activity from native callbacks that report zero deltas for
unaffected metrics.

This is the first commit toward FlinkAuronCalcOperator (AURON apache#1857).
The existing inline anonymous MetricNode subclass in
AuronKafkaSourceFunction.run() will migrate to FlinkMetricNode as a
follow-up; intentionally not migrated here to keep this commit focused.

Includes 4 JUnit 5 unit tests covering counter creation, caching,
non-positive value handling, and child-node propagation. Tests use
hand-rolled MetricGroup/Counter fakes since the module has no Mockito
dependency.
…native row batching

Add FlinkArrowFFIExporter, a synchronous AuronArrowFFIExporter subclass
that wraps FlinkArrowWriter so the native engine can pull batches of
Flink RowData from the operator's hot path via the Arrow C-Data FFI.

The operator (next commit) calls:
  - offer(row) to buffer a row into the active VectorSchemaRoot;
  - isBatchFull() to decide when to drain;
  - noMoreInput() before close;
  - exportSchema(ptr) during open() to populate the FFI ArrowSchema;
  - exportNextBatch(arrayPtr) on the native pull to populate the FFI
    ArrowArray and rotate to a fresh root. Returns false only when the
    buffer is empty AND noMoreInput has been set; partial batches still
    flush on EOI.

Unlike Spark's threaded ArrowFFIExporter, this implementation runs
synchronously on the operator thread. Spark's threading exists to
absorb iterator-side IO latency from an Iterator[InternalRow] backed
by disk/shuffle. Flink Calc is push-based (processElement delivers
already-materialized rows) and Flink's runtime serializes operator
calls with in-flight JNI calls (tokio spawn_blocking), so there is no
latency to absorb and no concurrent producer to coordinate with.
See AURON-1857 DESIGN.md OQ2 for the full thread trace.

Resource lifecycle:
  - rotateRoot() closes the previous VectorSchemaRoot before allocating
    a new one;
  - exportNextBatch() rotates only after the FFI export succeeds, so
    the error path never leaks;
  - close() is idempotent and verified to release all child-allocator
    memory.

Row count is tracked on the exporter itself, not delegated to the
underlying FlinkArrowWriter (which has no public getRowCount()). This
keeps the writer untouched.

Includes 5 JUnit 5 round-trip tests covering schema export, 100-row
export with field-by-field verification via FlinkArrowReader, EOI
empty-buffer semantics, EOI partial-batch flush, and allocator-zero
verification on close. All tests are pure Arrow C-Data round-trip with
hand-rolled FFI struct allocation -- no native call, no libauron.
… plans

Add FlinkAuronCalcOperator, the Flink TableStreamOperator<RowData> that
executes a pre-composed native Calc plan (Project[Filter[FFIReader]])
by streaming RowData through the FlinkArrowFFIExporter (input bridge),
AuronCallNativeWrapper (native execution loop), and FlinkArrowReader
(output bridge).

This is the operator layer of the three-layer architecture (plan /
operator / graph-rewriter). The operator surface is intentionally free
of RexNode, StreamExecCalc, and any Flink planner type -- the
constructor takes only PhysicalPlanNode, RowType, RowType, and a string
operator ID. Graph rewriting (substituting Flink's codegen factory) is
the next PR (apache#1853); operator fusion is deferred to apache#1865.

OQ1 resolution: the runtime FFI Reader resource ID has the form
"FlinkAuronCalc-<flinkOpId>-<subtask>:<UUID>" so the operator ID rides
in the resource-ID prefix. No proto change is required. The
SupportsAuronNative.getAuronOperatorId() accessor still returns the
unsuffixed constructor value, mirroring AuronKafkaSourceFunction.

OQ2 resolution: the operator runs synchronously on the operator thread
(no producer thread). Flink's runtime serializes processElement with
in-flight loadNextBatch calls (which themselves run on tokio's
spawn_blocking pool), so no concurrent producer needs coordination.
This is a substantial simplification over Spark's threaded
ArrowFFIExporter, which exists to absorb iterator-side IO latency that
Flink Calc does not have.

The open() method builds the per-subtask resource ID, allocates a
child Arrow allocator, instantiates the FFI exporter and FlinkMetricNode,
registers the exporter in JniBridge, rewrites the FFI Reader leaf with
the bound resource ID, and constructs the native runtime via a
@VisibleForTesting factory seam. processElement buffers rows and drains
on batch-full or watermark or pre-checkpoint or close. close() uses
nested try/finally to guarantee cleanup of every resource even on
partial-init failures; the outer guard checks both exporter and
nativeRuntime so a partial open() does not NPE during drain.

The injectFfiReaderLeaf static helper accepts Project[Filter[FFIReader]],
Project[FFIReader], Filter[FFIReader], and bare FFIReader plan shapes;
unsupported shapes throw IllegalArgumentException with the offending
plan type in the message.

Includes 13 JUnit 5 tests via a NativeRuntime mock seam -- no libauron
invocation, no flink-test-utils. Tests cover putResource registration,
resource-ID composition, FFI Reader injection, processElement buffer
and drain, watermark and checkpoint barrier drain ordering, idempotent
close, SupportsAuronNative accessors, and all four accepted plan shapes
plus the throw branch of the recursion helper.
@github-actions github-actions Bot added the flink label May 11, 2026
@weiqingy
Copy link
Copy Markdown
Contributor Author

Hi @Tartarus0zm, this PR implements the operator layer per the design discussed in #1857. CI is green. Would appreciate your review when you have time. Thanks!

@Tartarus0zm Tartarus0zm self-requested a review May 12, 2026 03:01
Copy link
Copy Markdown
Contributor

@Tartarus0zm Tartarus0zm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your contribution!
I left a few comments, PTAL

* {@link #emitArrowBatch(VectorSchemaRoot)}.
*/
private void drainNative() {
while (nativeRuntime.loadNextBatch(this::emitArrowBatch)) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that when the while loop exits here, it means the current Auron task has finished, and FlinkArrowFFIExporter and AuronCallNativeWrapper may need to be reinitialized.

We can create a test in FlinkAuronCalcOperatorTest, call processElement multiple times to make exporter.isBatchFull() == true, and then see if an exception is thrown.

// Construction helpers
// =====================================================================

private TestableOperator newOperator() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo? TestTableOperator

…er drain cycle

Addresses @Tartarus0zm's code review on PR apache#2263.

The Calc operator's drainNative pattern was lifted from AuronKafkaSourceFunction's
one-shot run loop. For multi-cycle streaming (per isBatchFull / watermark /
checkpoint / close), three interactions converged into a latent bug:

  1. AuronCallNativeWrapper.loadNextBatch auto-closes the native runtime on
     false return (AuronCallNativeWrapper.java:127), making the wrapper dead
     after the first drain.
  2. The native FFIReaderExec's AutoCloseableExporter::drop calls Java close()
     on the exporter (ffi_reader_exec.rs:192-198), nulling its writer.
  3. The native FFI Reader loop does not filter empty batches; the original
     exportNextBatch contract (true with empty when !noMoreInput) caused
     drainNative to spin forever.

Fix (operator-layer only, no auron-core or Rust changes):

  * FlinkArrowFFIExporter.exportNextBatch now returns false whenever rowCount
    is zero. Adds public reset() to re-prepare the exporter after the FFI
    Reader's drop closed it. Adds public isEmpty() so the operator can
    short-circuit no-op drains. noMoreInput() downgraded to a soft hint.

  * FlinkAuronCalcOperator caches nativeMemory in open() and adds a closing
    flag. drainNative() early-returns on empty (keeps wrapper alive across
    idle watermarks/checkpoints), else loops then calls a new private
    reinitExporterAndRuntime() helper when !closing. close() sets closing
    first so the final drain does not allocate replacement resources only
    to tear them down.

Per-subtask identity is preserved: the same resourceId from open() is reused
on every reinit; OQ1's per-subtask uniqueness invariant holds.

Also renames the test seam subclass TestableOperator -> TestFlinkAuronCalcOperator
to remove the typo ambiguity flagged in the same review.

Tests:
  * New testProcessElementMultiCycleDrainReinitializesWrapper with a
    OneShotNativeRuntime fake modeling production close-on-false. Asserts
    factory invocation count, per-cycle JniBridge re-registration, and
    post-cycles offer usability.
  * New testExportNextBatchReturnsFalseOnEmpty for the empty-batch contract.
  * TrackingExporter.isEmpty() override added so existing drain tests work
    with the new isEmpty short-circuit.

131/131 module tests pass. 0 checkstyle violations.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Introduce FlinkAuronCalcOperator

2 participants