[SPARK-57041][SQL] Fix deadlock between waitForSubqueries and lazy val initialization in subquery#56095
Conversation
341ef05 to
cb27107
Compare
| @transient | ||
| private val runningSubqueries = new ArrayBuffer[ExecSubqueryExpression] | ||
|
|
||
| @transient private lazy val subqueryLock: AnyRef = new AnyRef |
There was a problem hiding this comment.
@transient is required here: AnyRef is not Serializable. lazy ensures the field re-initializes after deserialization - a plain @transient val would be null if prepare() were ever called on a deserialized plan. Currently that path is not exercised (runningSubqueries has the same gap), so lazy is just defensive and could be optional.
|
Hi @cloud-fan @dongjoon-hyun or anybody else: could you take a look at this deadlock fix when you get a chance? 🙇 |
cloud-fan
left a comment
There was a problem hiding this comment.
The PR fixes a deadlock between SparkPlan.waitForSubqueries() and Scala lazy-val initialization on the same plan instance (e.g., FileSourceScanLike.metadata). Pre-fix, waitForSubqueries() used this.synchronized and held the plan's intrinsic monitor while blocking on subquery futures; meanwhile another thread (such as AQE's onUpdatePlan → SparkPlanInfo.fromSparkPlan → metadata lazy val) needs the same monitor, deadlocking.
The fix introduces a private lock object and routes both prepare() and waitForSubqueries() through it. The two methods still mutex against each other (same lock object), the prepared flag idempotence is preserved, and the safety of runningSubqueries mutation is preserved. The plan's intrinsic monitor is no longer held during blocking subquery awaits, so lazy-val initialization on the same plan can proceed concurrently. No external code depends on prepare/waitForSubqueries mutexing against plan.this.synchronized — the only Spark code that takes a plan's intrinsic monitor is Scala's generated lazy-val init, which is short-lived.
The test is a good regression check: it uses a CountDownLatch-blocked BlockingSubquery and inspects ThreadInfo to ensure a competing plan.synchronized {} thread never enters BLOCKED.
Comments below are minor — one consistency item (test naming), one Scaladoc that now points at a private member, one missing finally, and a suggestion to match the existing AdaptiveSparkPlanExec.lock lock pattern.
| @transient | ||
| private val runningSubqueries = new ArrayBuffer[ExecSubqueryExpression] | ||
|
|
||
| @transient private lazy val subqueryLock: AnyRef = new AnyRef |
There was a problem hiding this comment.
Suggest matching the existing Spark idiom for ad-hoc lock objects — AdaptiveSparkPlanExec.lock, BlockManager.{asyncReregisterLock, peerFetchLock}, QueryExecution.observedMetricsLock, Executor.lock, etc. all use @transient private val xxxLock = new Object(). AdaptiveSparkPlanExec is the closest peer (also a SparkPlan, also Serializable) and uses exactly that pattern.
| @transient private lazy val subqueryLock: AnyRef = new AnyRef | |
| @transient private val subqueryLock = new Object() |
Your point that lazy would defend against a deserialized-then-prepared plan is fair, but the very next line up — runningSubqueries — makes the same assumption non-defensively, so as long as that gap stands, this one matches it.
Separately, the name subqueryLock is a bit narrow: it now also guards the prepared flag and the user-defined doPrepare() body. prepareLock would describe the role better — but only worth changing if you're touching this line anyway.
| releaseLatch.countDown() | ||
| ThreadUtils.awaitResult(futureA, Duration(10, "seconds")) | ||
| threadB.join(5000L) | ||
| executor.shutdown() |
There was a problem hiding this comment.
Cleanup isn't in a finally. If enteredLatch.await(10, SECONDS) returns false (or any assert/exception fires before line 221), futureA's thread sits on releaseLatch.await(30s) and the executor never shuts down. Suggest wrapping the body from line 193 onward in:
try {
// existing body, ending with the !threadBBlocked assert
} finally {
releaseLatch.countDown()
executor.shutdown()
}so a failed run doesn't leak threads for 30s.
…ion in subquery thread Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
cb27107 to
89e14fc
Compare
What changes were proposed in this pull request?
Replace
synchronizedwith an explicitsubqueryLockobject inwaitForSubqueries()andprepare().Why are the changes needed?
waitForSubqueries()holds the plan's obj lock (withsynchronized) while blocking on subquery futures. A concurrent thread computing a Scala lazy val on the same plan object (e.g.FileSourceScanLike.metadatavia AQE'sonUpdatePlan→SparkPlanInfo.fromSparkPlan) also needs the plan's obj lock, causing a deadlock.See the Jira https://issues.apache.org/jira/browse/SPARK-57041 for the thread dump when the deadlock happens.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Added a UT case.
Was this patch authored or co-authored using generative AI tooling?
Yes. Claude Code.