feat(dao): add operator_port_cache table#5967
Conversation
Adds the operator_port_cache table (texera_ddl.sql + Liquibase migration sql/updates/26.sql), keyed by (workflow_id, global_port_id, cache_key) with ON DELETE CASCADE to workflow. The cache read/write logic and its tests land with the cache service that uses it. Part of apache#5882.
Automated Reviewer SuggestionsBased on the
|
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #5967 +/- ##
============================================
+ Coverage 56.28% 56.63% +0.35%
- Complexity 2992 3023 +31
============================================
Files 1120 1121 +1
Lines 43217 43288 +71
Branches 4662 4667 +5
============================================
+ Hits 24326 24518 +192
+ Misses 17472 17330 -142
- Partials 1419 1440 +21
*This pull request uses carry forward flags. Click here to find out more. ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
|
@carloea2 could you help review this one when you get a chance? Thanks! |
|
| config | throughput | MB/s | latency | max Δ latest / 7d | |
|---|---|---|---|---|---|
| 🔴 | bs=10 sw=10 sl=64 | 426 | 0.26 | 23,160/30,286/30,286 us | 🔴 +11.6% / 🔴 +101.0% |
| 🟢 | bs=100 sw=10 sl=64 | 950 | 0.58 | 105,644/119,331/119,331 us | 🟢 -10.0% / 🔴 +9.6% |
| 🔴 | bs=1000 sw=10 sl=64 | 1,085 | 0.662 | 918,400/1,018,610/1,018,610 us | 🔴 +6.7% / 🟢 -8.3% |
Baseline details
Latest main a24d1d1 from same runner
| config | metric | PR | latest main | 7d avg | Δ latest | Δ 7d |
|---|---|---|---|---|---|---|
| bs=10 sw=10 sl=64 | throughput | 426 tuples/sec | 447 tuples/sec | 770.82 tuples/sec | -4.7% | -44.7% |
| bs=10 sw=10 sl=64 | MB/s | 0.26 MB/s | 0.273 MB/s | 0.47 MB/s | -4.8% | -44.7% |
| bs=10 sw=10 sl=64 | p50 | 23,160 us | 20,758 us | 12,723 us | +11.6% | +82.0% |
| bs=10 sw=10 sl=64 | p95 | 30,286 us | 32,226 us | 15,070 us | -6.0% | +101.0% |
| bs=10 sw=10 sl=64 | p99 | 30,286 us | 32,226 us | 18,429 us | -6.0% | +64.3% |
| bs=100 sw=10 sl=64 | throughput | 950 tuples/sec | 885 tuples/sec | 973.75 tuples/sec | +7.3% | -2.4% |
| bs=100 sw=10 sl=64 | MB/s | 0.58 MB/s | 0.54 MB/s | 0.594 MB/s | +7.4% | -2.4% |
| bs=100 sw=10 sl=64 | p50 | 105,644 us | 111,819 us | 102,519 us | -5.5% | +3.0% |
| bs=100 sw=10 sl=64 | p95 | 119,331 us | 132,558 us | 108,855 us | -10.0% | +9.6% |
| bs=100 sw=10 sl=64 | p99 | 119,331 us | 132,558 us | 117,788 us | -10.0% | +1.3% |
| bs=1000 sw=10 sl=64 | throughput | 1,085 tuples/sec | 1,115 tuples/sec | 1,004 tuples/sec | -2.7% | +8.1% |
| bs=1000 sw=10 sl=64 | MB/s | 0.662 MB/s | 0.681 MB/s | 0.613 MB/s | -2.8% | +8.0% |
| bs=1000 sw=10 sl=64 | p50 | 918,400 us | 890,152 us | 1,001,930 us | +3.2% | -8.3% |
| bs=1000 sw=10 sl=64 | p95 | 1,018,610 us | 954,686 us | 1,042,923 us | +6.7% | -2.3% |
| bs=1000 sw=10 sl=64 | p99 | 1,018,610 us | 954,686 us | 1,074,893 us | +6.7% | -5.2% |
Raw CSV
config_idx,batch_size,schema_width,string_len,num_batches,total_ms,total_tuples,total_bytes,tuples_per_sec,mb_per_sec,lat_p50_us,lat_p95_us,lat_p99_us
0,10,10,64,20,469.09,200,128000,426,0.260,23159.89,30286.23,30286.23
1,100,10,64,20,2105.44,2000,1280000,950,0.580,105644.20,119331.22,119331.22
2,1000,10,64,20,18426.53,20000,12800000,1085,0.662,918399.53,1018610.21,1018610.21| cache_key CHAR(64) NOT NULL, | ||
| cache_key_json TEXT NOT NULL, |
There was a problem hiding this comment.
why do we have two parts?
There was a problem hiding this comment.
Two columns on purpose: cache_key is the SHA-256 hash we look up by (and part of the PK), and cache_key_json is the JSON it was computed from. On a hash hit we compare the stored JSON to confirm the match, so a hash collision can never reuse the wrong result.
| global_port_id VARCHAR(200) NOT NULL, | ||
| cache_key CHAR(64) NOT NULL, | ||
| cache_key_json TEXT NOT NULL, | ||
| result_uri TEXT NOT NULL, |
There was a problem hiding this comment.
please change to storage_uri. result indicates direction.
There was a problem hiding this comment.
Done, renamed to storage_uri.
| cache_key CHAR(64) NOT NULL, | ||
| cache_key_json TEXT NOT NULL, | ||
| result_uri TEXT NOT NULL, | ||
| tuple_count BIGINT, |
There was a problem hiding this comment.
do we want to store statistics separately? I think iceberg catalog has this info already, is this duplicate?
There was a problem hiding this comment.
I'd keep tuple_count. A cache row is immutable (a changed computation is a new row, never an update), so it can't drift from the Iceberg result. It's populated at materialization and read by the coordinator during execution — the coordinator already queries this table for the cache lookup, so a cached region's output stats come from the same row with no extra read. Reading it from getCount() instead is a catalog/storage round-trip per cached port on the coordinator, which is the work the cache is meant to avoid.
| tuple_count BIGINT, | ||
| source_execution_id BIGINT, | ||
| updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), | ||
| PRIMARY KEY (workflow_id, global_port_id, cache_key), |
There was a problem hiding this comment.
how come execution id is not part of primary key?
There was a problem hiding this comment.
Intentional — the cache is reused across executions, keyed by cache_key (the upstream computation hash). Putting execution_id in the PK would create a row per execution and prevent reuse. source_execution_id is provenance metadata. The per-execution tables key by execution because they store per-execution results — a different concern.
|
@Xiao-zhen-Liu please link issue properly |
Address review: result implies a direction, storage_uri is clearer. tuple_count is kept (immutable per row, populated at materialization, read by the coordinator alongside the cache lookup so cached-region stats need no extra storage round-trip).
|
Thanks @Yicong-Huang — replies inline. Renamed |
What changes were proposed in this PR?
Adds the
operator_port_cachetable that records a materialized output portresult so it can be reused across executions. It is keyed by
(workflow_id, global_port_id, cache_key)and stores the JSON the cache key wascomputed from, the result location, an optional tuple count and source execution
id, and a database-managed
updated_at. The foreign key toworkflow(wid)isON DELETE CASCADE. The stored JSON (cache_key_json) lets a lookup confirm ahash match by comparing the full JSON, so a hash collision never reuses the wrong
result.
The change is additive: a new table in
sql/texera_ddl.sql(fresh installs) plusa Liquibase migration
sql/updates/26.sqlregistered insql/changelog.xml(existing deployments). No code reads or writes the table yet; the cache read/write
logic and its tests land with the cache service that uses it, following the
convention of testing a table through its consumer (as
feedbackis tested viaFeedbackResourceSpec).Any related issues, documentation, discussions?
Closes #5969. Part of the storage foundation #5882 (umbrella #5881). Design discussion: #5880.
How was this PR tested?
Verified the schema directly against Postgres: the migration applies cleanly, the
columns and primary key
(workflow_id, global_port_id, cache_key)are correct,the foreign key's delete rule is
CASCADE, the schema file and the migrationdefine identical columns/keys, and
changelog.xmlis well-formed and registers26.sql. The generated jOOQ classes build from the table. The table's runtimebehavior is exercised by the cache service tests in the follow-up PR.
Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Opus 4.8 (Claude Code)