diff --git a/docs/source/user-guide/latest/tuning.md b/docs/source/user-guide/latest/tuning.md index 5939e89ef3..ff9acee1f4 100644 --- a/docs/source/user-guide/latest/tuning.md +++ b/docs/source/user-guide/latest/tuning.md @@ -61,7 +61,12 @@ The valid pool types are: - `fair_unified` (default when `spark.memory.offHeap.enabled=true` is set) - `greedy_unified` -The `fair_unified` pool types prevents operators from using more than an even fraction of the available memory +Both pool types are shared across all native execution contexts within the same Spark task. When +Comet executes a shuffle, it runs two native execution contexts concurrently (e.g. one for +pre-shuffle operators and one for the shuffle writer). The shared pool ensures that the combined +memory usage stays within the per-task limit. + +The `fair_unified` pool prevents operators from using more than an even fraction of the available memory (i.e. `pool_size / num_reservations`). This pool works best when you know beforehand the query has multiple operators that will likely all need to spill. Sometimes it will cause spills even when there is sufficient memory in order to leave enough memory for other operators. diff --git a/native/core/src/execution/memory_pools/config.rs b/native/core/src/execution/memory_pools/config.rs index d30126a99a..83d6c14a36 100644 --- a/native/core/src/execution/memory_pools/config.rs +++ b/native/core/src/execution/memory_pools/config.rs @@ -34,7 +34,10 @@ impl MemoryPoolType { pub(crate) fn is_task_shared(&self) -> bool { matches!( self, - MemoryPoolType::GreedyTaskShared | MemoryPoolType::FairSpillTaskShared + MemoryPoolType::GreedyTaskShared + | MemoryPoolType::FairSpillTaskShared + | MemoryPoolType::FairUnified + | MemoryPoolType::GreedyUnified ) } } diff --git a/native/core/src/execution/memory_pools/mod.rs b/native/core/src/execution/memory_pools/mod.rs index d8b3473353..34f0587537 100644 --- a/native/core/src/execution/memory_pools/mod.rs +++ b/native/core/src/execution/memory_pools/mod.rs @@ -42,22 +42,36 @@ pub(crate) fn create_memory_pool( const NUM_TRACKED_CONSUMERS: usize = 10; match memory_pool_config.pool_type { MemoryPoolType::GreedyUnified => { - // Set Comet memory pool for native - let memory_pool = - CometUnifiedMemoryPool::new(comet_task_memory_manager, task_attempt_id); - Arc::new(TrackConsumersPool::new( - memory_pool, - NonZeroUsize::new(NUM_TRACKED_CONSUMERS).unwrap(), - )) + let mut memory_pool_map = TASK_SHARED_MEMORY_POOLS.lock().unwrap(); + let per_task_memory_pool = + memory_pool_map.entry(task_attempt_id).or_insert_with(|| { + let pool: Arc = Arc::new(TrackConsumersPool::new( + CometUnifiedMemoryPool::new( + Arc::clone(&comet_task_memory_manager), + task_attempt_id, + ), + NonZeroUsize::new(NUM_TRACKED_CONSUMERS).unwrap(), + )); + PerTaskMemoryPool::new(pool) + }); + per_task_memory_pool.num_plans += 1; + Arc::clone(&per_task_memory_pool.memory_pool) } MemoryPoolType::FairUnified => { - // Set Comet fair memory pool for native - let memory_pool = - CometFairMemoryPool::new(comet_task_memory_manager, memory_pool_config.pool_size); - Arc::new(TrackConsumersPool::new( - memory_pool, - NonZeroUsize::new(NUM_TRACKED_CONSUMERS).unwrap(), - )) + let mut memory_pool_map = TASK_SHARED_MEMORY_POOLS.lock().unwrap(); + let per_task_memory_pool = + memory_pool_map.entry(task_attempt_id).or_insert_with(|| { + let pool: Arc = Arc::new(TrackConsumersPool::new( + CometFairMemoryPool::new( + Arc::clone(&comet_task_memory_manager), + memory_pool_config.pool_size, + ), + NonZeroUsize::new(NUM_TRACKED_CONSUMERS).unwrap(), + )); + PerTaskMemoryPool::new(pool) + }); + per_task_memory_pool.num_plans += 1; + Arc::clone(&per_task_memory_pool.memory_pool) } MemoryPoolType::Greedy => Arc::new(TrackConsumersPool::new( GreedyMemoryPool::new(memory_pool_config.pool_size),