Skip to content

[SPARK-57052][SS] Add state row format validation to multiGet in RocksDBStateStoreProvider#56097

Open
yadavay-amzn wants to merge 1 commit into
apache:masterfrom
yadavay-amzn:fix/SPARK-57052
Open

[SPARK-57052][SS] Add state row format validation to multiGet in RocksDBStateStoreProvider#56097
yadavay-amzn wants to merge 1 commit into
apache:masterfrom
yadavay-amzn:fix/SPARK-57052

Conversation

@yadavay-amzn
Copy link
Copy Markdown
Contributor

@yadavay-amzn yadavay-amzn commented May 25, 2026

What changes were proposed in this pull request?

Add validateStateRowFormat to multiGet() in RocksDBStateStoreProvider, consistent with existing validation in get(), iterator(), prefixScan(), and rangeScan().

Why are the changes needed?

multiGet() decodes values via kvEncoder._2.decodeValue but never calls validateStateRowFormat. If a stateful operator evolves its schema between restarts, multiGet() will silently return corrupted data instead of failing fast with StateStoreValueRowFormatValidationFailure.

All other read-path methods that decode rows already perform this validation. multiGet() is the only inconsistency:

Method Decodes rows? Has validation?
get() Yes Yes
iterator() Yes Yes
prefixScan() Yes Yes (SPARK-56539)
rangeScan() Yes Yes (SPARK-56539)
multiGet() Yes (this PR)

Does this PR introduce any user-facing change?

No. The validation is gated behind the existing spark.sql.streaming.stateStore.formatValidation.enabled config (default: true in testing mode).

How was this patch tested?

Added test in RocksDBStateStoreSuite that writes data with one schema, reopens with a mismatched schema, and verifies multiGet() throws StateStoreValueRowFormatValidationFailure. The test follows the same pattern as the existing SPARK-56539 tests for prefixScan and rangeScan.

  • Without fix: multiGet() silently returns corrupted rows
  • With fix: StateStoreValueRowFormatValidationFailure thrown on first decoded value

Was this patch authored or co-authored using generative AI tooling?

Generated by: Claude Opus 4.7

…sDBStateStoreProvider

### What changes were proposed in this pull request?

Add `validateStateRowFormat` to `multiGet()` in `RocksDBStateStoreProvider`, consistent with existing validation in `get()`, `iterator()`, `prefixScan()`, and `rangeScan()`.

### Why are the changes needed?

`multiGet()` decodes values via `kvEncoder._2.decodeValue` but never calls `validateStateRowFormat`. If a stateful operator evolves its schema between restarts, `multiGet()` will silently return corrupted data instead of failing fast with `StateStoreValueRowFormatValidationFailure`.

All other read-path methods that decode rows already perform this validation. `multiGet()` is the only inconsistency.

### Does this PR introduce _any_ user-facing change?

No. The validation is gated behind the existing `spark.sql.streaming.stateStore.formatValidation.enabled` config (default: true in testing mode).

### How was this patch tested?

Added test in `RocksDBStateStoreSuite` that writes data with one schema, reopens with a mismatched schema, and verifies `multiGet()` throws `StateStoreValueRowFormatValidationFailure`. The test follows the same pattern as the existing SPARK-56539 tests for `prefixScan` and `rangeScan`.

### Was this patch authored or co-authored using generative AI tooling?

Yes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant