Skip to content

[SPARK-57041][SQL] Fix deadlock between waitForSubqueries and lazy val initialization in subquery#56095

Open
jiwen624 wants to merge 1 commit into
apache:masterfrom
jiwen624:SPARK-waitForSubqueries-deadlock-fix
Open

[SPARK-57041][SQL] Fix deadlock between waitForSubqueries and lazy val initialization in subquery#56095
jiwen624 wants to merge 1 commit into
apache:masterfrom
jiwen624:SPARK-waitForSubqueries-deadlock-fix

Conversation

@jiwen624
Copy link
Copy Markdown
Contributor

@jiwen624 jiwen624 commented May 24, 2026

What changes were proposed in this pull request?

Replace synchronized with an explicit subqueryLock object in waitForSubqueries() and prepare().

Why are the changes needed?

waitForSubqueries() holds the plan's obj lock (with synchronized) while blocking on subquery futures. A concurrent thread computing a Scala lazy val on the same plan object (e.g. FileSourceScanLike.metadata via AQE's onUpdatePlanSparkPlanInfo.fromSparkPlan) also needs the plan's obj lock, causing a deadlock.

See the Jira https://issues.apache.org/jira/browse/SPARK-57041 for the thread dump when the deadlock happens.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Added a UT case.

Was this patch authored or co-authored using generative AI tooling?

Yes. Claude Code.

@jiwen624 jiwen624 changed the title [WIP][SQL] Fix deadlock between waitForSubqueries and lazy val initialization in subquery [WIP][SPARK-57041][SQL] Fix deadlock between waitForSubqueries and lazy val initialization in subquery May 24, 2026
@jiwen624 jiwen624 force-pushed the SPARK-waitForSubqueries-deadlock-fix branch 2 times, most recently from 341ef05 to cb27107 Compare May 25, 2026 05:16
@jiwen624 jiwen624 changed the title [WIP][SPARK-57041][SQL] Fix deadlock between waitForSubqueries and lazy val initialization in subquery [SPARK-57041][SQL] Fix deadlock between waitForSubqueries and lazy val initialization in subquery May 25, 2026
@transient
private val runningSubqueries = new ArrayBuffer[ExecSubqueryExpression]

@transient private lazy val subqueryLock: AnyRef = new AnyRef
Copy link
Copy Markdown
Contributor Author

@jiwen624 jiwen624 May 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@transient is required here: AnyRef is not Serializable. lazy ensures the field re-initializes after deserialization - a plain @transient val would be null if prepare() were ever called on a deserialized plan. Currently that path is not exercised (runningSubqueries has the same gap), so lazy is just defensive and could be optional.

@jiwen624
Copy link
Copy Markdown
Contributor Author

Hi @cloud-fan @dongjoon-hyun or anybody else: could you take a look at this deadlock fix when you get a chance? 🙇

@jiwen624 jiwen624 marked this pull request as ready for review May 25, 2026 22:57
Copy link
Copy Markdown
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR fixes a deadlock between SparkPlan.waitForSubqueries() and Scala lazy-val initialization on the same plan instance (e.g., FileSourceScanLike.metadata). Pre-fix, waitForSubqueries() used this.synchronized and held the plan's intrinsic monitor while blocking on subquery futures; meanwhile another thread (such as AQE's onUpdatePlanSparkPlanInfo.fromSparkPlanmetadata lazy val) needs the same monitor, deadlocking.

The fix introduces a private lock object and routes both prepare() and waitForSubqueries() through it. The two methods still mutex against each other (same lock object), the prepared flag idempotence is preserved, and the safety of runningSubqueries mutation is preserved. The plan's intrinsic monitor is no longer held during blocking subquery awaits, so lazy-val initialization on the same plan can proceed concurrently. No external code depends on prepare/waitForSubqueries mutexing against plan.this.synchronized — the only Spark code that takes a plan's intrinsic monitor is Scala's generated lazy-val init, which is short-lived.

The test is a good regression check: it uses a CountDownLatch-blocked BlockingSubquery and inspects ThreadInfo to ensure a competing plan.synchronized {} thread never enters BLOCKED.

Comments below are minor — one consistency item (test naming), one Scaladoc that now points at a private member, one missing finally, and a suggestion to match the existing AdaptiveSparkPlanExec.lock lock pattern.

@transient
private val runningSubqueries = new ArrayBuffer[ExecSubqueryExpression]

@transient private lazy val subqueryLock: AnyRef = new AnyRef
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest matching the existing Spark idiom for ad-hoc lock objects — AdaptiveSparkPlanExec.lock, BlockManager.{asyncReregisterLock, peerFetchLock}, QueryExecution.observedMetricsLock, Executor.lock, etc. all use @transient private val xxxLock = new Object(). AdaptiveSparkPlanExec is the closest peer (also a SparkPlan, also Serializable) and uses exactly that pattern.

Suggested change
@transient private lazy val subqueryLock: AnyRef = new AnyRef
@transient private val subqueryLock = new Object()

Your point that lazy would defend against a deserialized-then-prepared plan is fair, but the very next line up — runningSubqueries — makes the same assumption non-defensively, so as long as that gap stands, this one matches it.

Separately, the name subqueryLock is a bit narrow: it now also guards the prepared flag and the user-defined doPrepare() body. prepareLock would describe the role better — but only worth changing if you're touching this line anyway.

Comment thread sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala Outdated
releaseLatch.countDown()
ThreadUtils.awaitResult(futureA, Duration(10, "seconds"))
threadB.join(5000L)
executor.shutdown()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cleanup isn't in a finally. If enteredLatch.await(10, SECONDS) returns false (or any assert/exception fires before line 221), futureA's thread sits on releaseLatch.await(30s) and the executor never shuts down. Suggest wrapping the body from line 193 onward in:

try {
  // existing body, ending with the !threadBBlocked assert
} finally {
  releaseLatch.countDown()
  executor.shutdown()
}

so a failed run doesn't leak threads for 30s.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

…ion in subquery thread

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@jiwen624 jiwen624 force-pushed the SPARK-waitForSubqueries-deadlock-fix branch from cb27107 to 89e14fc Compare May 26, 2026 03:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants