Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion docs/source/user-guide/latest/tuning.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,12 @@ The valid pool types are:
- `fair_unified` (default when `spark.memory.offHeap.enabled=true` is set)
- `greedy_unified`

The `fair_unified` pool types prevents operators from using more than an even fraction of the available memory
Both pool types are shared across all native execution contexts within the same Spark task. When
Comet executes a shuffle, it runs two native execution contexts concurrently (e.g. one for
pre-shuffle operators and one for the shuffle writer). The shared pool ensures that the combined
memory usage stays within the per-task limit.

The `fair_unified` pool prevents operators from using more than an even fraction of the available memory
(i.e. `pool_size / num_reservations`). This pool works best when you know beforehand
the query has multiple operators that will likely all need to spill. Sometimes it will cause spills even
when there is sufficient memory in order to leave enough memory for other operators.
Expand Down
5 changes: 4 additions & 1 deletion native/core/src/execution/memory_pools/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ impl MemoryPoolType {
pub(crate) fn is_task_shared(&self) -> bool {
matches!(
self,
MemoryPoolType::GreedyTaskShared | MemoryPoolType::FairSpillTaskShared
MemoryPoolType::GreedyTaskShared
| MemoryPoolType::FairSpillTaskShared
| MemoryPoolType::FairUnified
| MemoryPoolType::GreedyUnified
)
}
}
Expand Down
42 changes: 28 additions & 14 deletions native/core/src/execution/memory_pools/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,22 +42,36 @@ pub(crate) fn create_memory_pool(
const NUM_TRACKED_CONSUMERS: usize = 10;
match memory_pool_config.pool_type {
MemoryPoolType::GreedyUnified => {
// Set Comet memory pool for native
let memory_pool =
CometUnifiedMemoryPool::new(comet_task_memory_manager, task_attempt_id);
Arc::new(TrackConsumersPool::new(
memory_pool,
NonZeroUsize::new(NUM_TRACKED_CONSUMERS).unwrap(),
))
let mut memory_pool_map = TASK_SHARED_MEMORY_POOLS.lock().unwrap();
let per_task_memory_pool =
memory_pool_map.entry(task_attempt_id).or_insert_with(|| {
let pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
CometUnifiedMemoryPool::new(
Arc::clone(&comet_task_memory_manager),
task_attempt_id,
),
NonZeroUsize::new(NUM_TRACKED_CONSUMERS).unwrap(),
));
PerTaskMemoryPool::new(pool)
});
per_task_memory_pool.num_plans += 1;
Arc::clone(&per_task_memory_pool.memory_pool)
}
MemoryPoolType::FairUnified => {
// Set Comet fair memory pool for native
let memory_pool =
CometFairMemoryPool::new(comet_task_memory_manager, memory_pool_config.pool_size);
Arc::new(TrackConsumersPool::new(
memory_pool,
NonZeroUsize::new(NUM_TRACKED_CONSUMERS).unwrap(),
))
let mut memory_pool_map = TASK_SHARED_MEMORY_POOLS.lock().unwrap();
let per_task_memory_pool =
memory_pool_map.entry(task_attempt_id).or_insert_with(|| {
let pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
CometFairMemoryPool::new(
Arc::clone(&comet_task_memory_manager),
memory_pool_config.pool_size,
),
NonZeroUsize::new(NUM_TRACKED_CONSUMERS).unwrap(),
));
PerTaskMemoryPool::new(pool)
});
per_task_memory_pool.num_plans += 1;
Arc::clone(&per_task_memory_pool.memory_pool)
}
MemoryPoolType::Greedy => Arc::new(TrackConsumersPool::new(
GreedyMemoryPool::new(memory_pool_config.pool_size),
Expand Down
Loading