Skip to content

[SPARK-55998][SHS] Synchronize more places on accessing SHS listing.db#54817

Open
pan3793 wants to merge 1 commit intoapache:masterfrom
pan3793:SPARK-55998
Open

[SPARK-55998][SHS] Synchronize more places on accessing SHS listing.db#54817
pan3793 wants to merge 1 commit intoapache:masterfrom
pan3793:SPARK-55998

Conversation

@pan3793
Copy link
Member

@pan3793 pan3793 commented Mar 16, 2026

What changes were proposed in this pull request?

Wrap all

  • listing.delete(classOf[LogInfo], <path>) and
  • listing.view(classOf[LogInfo])

operation in SHS's FsHistoryProvider with listing.synchronized { ... }, this is similar to SPARK-37659.

Why are the changes needed?

spark.history.fs.cleaner.enabled true
spark.history.fs.cleaner.interval 2h
spark.history.fs.cleaner.maxNum 650000
spark.history.provider org.apache.spark.deploy.history.FsHistoryProvider
spark.history.store.hybridStore.enabled true
spark.history.store.hybridStore.diskBackend ROCKSDB

With the above configs, we found that the real preserved number of eventlogs sometimes can exceed 1 million, which reaches the number of files limitation of the HDFS single folder, and then causes the inability to submit new Spark apps (fails due to being unable to create event log file).

After digging the SHS's logs (it's an internal version based on OSS Spark 3.3.4, after taking a look, the master code should have the same issue), we found that in most cases, the scheduled cleanLogs task will fail with

2026-02-14 01:05:34 [ERROR] [spark-history-task-0] Utils#302 - Uncaught exception in thread spark-history-task-0
java.util.NoSuchElementException: 1^@__main__^@+hdfs://namenode/spark2-history/application_1770077136288_1231234_1.lz4.inprogress
        at org.apache.spark.util.kvstore.RocksDB.get(RocksDB.java:167) ~[spark-kvstore_2.12-3.3.4.113.jar:3.3.4.113]
        at org.apache.spark.util.kvstore.RocksDBIterator.next(RocksDBIterator.java:128) ~[spark-kvstore_2.12-3.3.4.113.jar:3.3.4.113]
        at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:46) ~[scala-library-2.12.20.jar:?]
        at scala.collection.Iterator.foreach(Iterator.scala:943) ~[scala-library-2.12.20.jar:?]
        at scala.collection.Iterator.foreach$(Iterator.scala:943) ~[scala-library-2.12.20.jar:?]
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) ~[scala-library-2.12.20.jar:?]
        at scala.collection.IterableLike.foreach(IterableLike.scala:74) ~[scala-library-2.12.20.jar:?]
        at scala.collection.IterableLike.foreach$(IterableLike.scala:73) ~[scala-library-2.12.20.jar:?]
        at scala.collection.AbstractIterable.foreach(Iterable.scala:56) ~[scala-library-2.12.20.jar:?]
        at scala.collection.TraversableOnce.size(TraversableOnce.scala:139) ~[scala-library-2.12.20.jar:?]
        at scala.collection.TraversableOnce.size$(TraversableOnce.scala:129) ~[scala-library-2.12.20.jar:?]
        at scala.collection.AbstractTraversable.size(Traversable.scala:108) ~[scala-library-2.12.20.jar:?]
        at org.apache.spark.deploy.history.FsHistoryProvider.$anonfun$cleanLogs$1(FsHistoryProvider.scala:1020) ~[spark-core_2.12-3.3.4.113.jar:3.3.4.113]
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) ~[scala-library-2.12.20.jar:?]
        at org.apache.spark.util.Utils$.tryLog(Utils.scala:2091) [spark-core_2.12-3.3.4.113.jar:3.3.4.113]
        at org.apache.spark.deploy.history.FsHistoryProvider.cleanLogs(FsHistoryProvider.scala:984) [spark-core_2.12-3.3.4.113.jar:3.3.4.113]
        at org.apache.spark.deploy.history.FsHistoryProvider.$anonfun$startPolling$4(FsHistoryProvider.scala:326) [spark-core_2.12-3.3.4.113.jar:3.3.4.113]
        at org.apache.spark.util.Utils$.tryOrExit(Utils.scala:1436) [spark-core_2.12-3.3.4.113.jar:3.3.4.113]
        at org.apache.spark.deploy.history.FsHistoryProvider.$anonfun$getRunner$1(FsHistoryProvider.scala:228) [spark-core_2.12-3.3.4.113.jar:3.3.4.113]
        ...

and the failure causes a significant backlog in the event log folders

$ zgrep 'Uncaught exception\|Try to delete' spark-history-server.log.20260214.gz
2026-02-14 01:05:34 [ERROR] [spark-history-task-0] Utils#302 - Uncaught exception in thread spark-history-task-0
2026-02-14 03:05:40 [ERROR] [spark-history-task-0] Utils#302 - Uncaught exception in thread spark-history-task-0
2026-02-14 05:07:04 [ERROR] [spark-history-task-0] Utils#302 - Uncaught exception in thread spark-history-task-0
2026-02-14 07:08:12 [ERROR] [spark-history-task-0] Utils#302 - Uncaught exception in thread spark-history-task-0
2026-02-14 09:10:02 [ERROR] [spark-history-task-0] Utils#302 - Uncaught exception in thread spark-history-task-0
2026-02-14 11:11:20 [ERROR] [spark-history-task-0] Utils#302 - Uncaught exception in thread spark-history-task-0
2026-02-14 13:11:51 [ERROR] [spark-history-task-0] Utils#302 - Uncaught exception in thread spark-history-task-0
2026-02-14 15:12:33 [ERROR] [spark-history-task-0] Utils#302 - Uncaught exception in thread spark-history-task-0
2026-02-14 17:13:37 [ERROR] [spark-history-task-0] Utils#302 - Uncaught exception in thread spark-history-task-0
2026-02-14 19:14:55 [INFO] [spark-history-task-0] FsHistoryProvider#185 - Try to delete 55271 old event logs to keep 650000 logs in total.
2026-02-14 21:18:59 [INFO] [spark-history-task-0] FsHistoryProvider#185 - Try to delete 3566 old event logs to keep 650000 logs in total.
2026-02-14 23:19:35 [ERROR] [spark-history-task-0] Utils#302 - Uncaught exception in thread spark-history-task-0

Does this PR introduce any user-facing change?

Yes, this fixes a bug that SHS may not clean up expired event logs in time.

How was this patch tested?

I have rolled out the patched SHS online for a few days, and am monitoring logs, it has no failures in the cleaning up phase anymore.

$ zgrep 'Uncaught exception\|Try to delete' spark-history-server.log.20260314.gz
2026-03-14 01:58:20 [INFO] [spark-history-task-0] FsHistoryProvider#185 - Try to delete 5886 old event logs to keep 650000 logs in total.
2026-03-14 03:59:49 [INFO] [spark-history-task-0] FsHistoryProvider#185 - Try to delete 5792 old event logs to keep 650000 logs in total.
2026-03-14 06:00:33 [INFO] [spark-history-task-0] FsHistoryProvider#185 - Try to delete 6467 old event logs to keep 650000 logs in total.
2026-03-14 08:02:41 [INFO] [spark-history-task-0] FsHistoryProvider#185 - Try to delete 7531 old event logs to keep 650000 logs in total.
2026-03-14 10:03:39 [INFO] [spark-history-task-0] FsHistoryProvider#185 - Try to delete 7831 old event logs to keep 650000 logs in total.
2026-03-14 12:06:07 [INFO] [spark-history-task-0] FsHistoryProvider#185 - Try to delete 5662 old event logs to keep 650000 logs in total.
2026-03-14 14:07:55 [INFO] [spark-history-task-0] FsHistoryProvider#185 - Try to delete 4894 old event logs to keep 650000 logs in total.
2026-03-14 16:09:04 [INFO] [spark-history-task-0] FsHistoryProvider#185 - Try to delete 4476 old event logs to keep 650000 logs in total.
2026-03-14 18:11:17 [INFO] [spark-history-task-0] FsHistoryProvider#185 - Try to delete 3757 old event logs to keep 650000 logs in total.
2026-03-14 20:12:05 [INFO] [spark-history-task-0] FsHistoryProvider#185 - Try to delete 3477 old event logs to keep 650000 logs in total.
2026-03-14 22:13:34 [INFO] [spark-history-task-0] FsHistoryProvider#185 - Try to delete 3350 old event logs to keep 650000 logs in total.

$ zgrep 'Uncaught exception\|Try to delete' spark-history-server.log.20260315.gz
2026-03-15 00:15:34 [INFO] [spark-history-task-0] FsHistoryProvider#185 - Try to delete 3480 old event logs to keep 650000 logs in total.
2026-03-15 02:16:24 [INFO] [spark-history-task-0] FsHistoryProvider#185 - Try to delete 5805 old event logs to keep 650000 logs in total.
2026-03-15 04:17:56 [INFO] [spark-history-task-0] FsHistoryProvider#185 - Try to delete 6003 old event logs to keep 650000 logs in total.
2026-03-15 06:20:06 [INFO] [spark-history-task-0] FsHistoryProvider#185 - Try to delete 6576 old event logs to keep 650000 logs in total.
2026-03-15 08:20:52 [INFO] [spark-history-task-0] FsHistoryProvider#185 - Try to delete 7348 old event logs to keep 650000 logs in total.
2026-03-15 10:24:21 [INFO] [spark-history-task-0] FsHistoryProvider#185 - Try to delete 7358 old event logs to keep 650000 logs in total.
2026-03-15 12:27:48 [INFO] [spark-history-task-0] FsHistoryProvider#185 - Try to delete 5537 old event logs to keep 650000 logs in total.
2026-03-15 14:28:35 [INFO] [spark-history-task-0] FsHistoryProvider#185 - Try to delete 4922 old event logs to keep 650000 logs in total.
2026-03-15 16:29:19 [INFO] [spark-history-task-0] FsHistoryProvider#185 - Try to delete 4014 old event logs to keep 650000 logs in total.
2026-03-15 18:31:08 [INFO] [spark-history-task-0] FsHistoryProvider#185 - Try to delete 3510 old event logs to keep 650000 logs in total.
2026-03-15 20:33:05 [INFO] [spark-history-task-0] FsHistoryProvider#185 - Try to delete 3351 old event logs to keep 650000 logs in total.
2026-03-15 22:34:55 [INFO] [spark-history-task-0] FsHistoryProvider#185 - Try to delete 3653 old event logs to keep 650000 logs in total.

Was this patch authored or co-authored using generative AI tooling?

No.


// If the number of files is bigger than MAX_LOG_NUM,
// clean up all completed attempts per application one by one.
val num = KVUtils.size(listing.view(classOf[LogInfo]).index("lastProcessed"))
Copy link
Member Author

@pan3793 pan3793 Mar 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NoSuchElementException is thrown from here, because the LevelDB/RocksDB-based KVStore does not support MVCC, we must use a relatively heavy synchronized to fix the concurrency issue.

@pan3793
Copy link
Member Author

pan3793 commented Mar 16, 2026

@sarutak @LuciferYang @dongjoon-hyun could you please take a look?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants