Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,22 @@ GEM
rake
thor (>= 0.14.0)
ast (2.4.2)
async (2.38.1)
console (~> 1.29)
fiber-annotation
io-event (~> 1.11)
metrics (~> 0.12)
traces (~> 0.18)
base64 (0.3.0)
benchmark (0.4.1)
bigdecimal (3.3.1)
builder (3.3.0)
concurrent-ruby (1.3.5)
connection_pool (2.5.4)
console (1.34.3)
fiber-annotation
fiber-local (~> 1.1)
json
crass (1.0.6)
date (3.4.1)
debug (1.9.2)
Expand All @@ -70,6 +80,10 @@ GEM
erubi (1.13.1)
et-orbi (1.2.11)
tzinfo
fiber-annotation (0.2.0)
fiber-local (1.1.0)
fiber-storage
fiber-storage (1.0.1)
fugit (1.11.1)
et-orbi (~> 1, >= 1.2.11)
raabro (~> 1.4)
Expand All @@ -78,6 +92,7 @@ GEM
i18n (1.14.7)
concurrent-ruby (~> 1.0)
io-console (0.8.0)
io-event (1.14.5)
irb (1.14.3)
rdoc (>= 4.0.0)
reline (>= 0.4.2)
Expand All @@ -87,6 +102,7 @@ GEM
loofah (2.23.1)
crass (~> 1.0.2)
nokogiri (>= 1.12.0)
metrics (0.15.0)
minitest (5.26.0)
mocha (2.1.0)
ruby2_keywords (>= 0.0.5)
Expand Down Expand Up @@ -181,6 +197,7 @@ GEM
stringio (3.1.2)
thor (1.3.2)
timeout (0.4.3)
traces (0.18.2)
tzinfo (2.0.6)
concurrent-ruby (~> 1.0)
unicode-display_width (3.1.3)
Expand All @@ -199,6 +216,7 @@ PLATFORMS

DEPENDENCIES
appraisal
async (>= 2.24)
debug (~> 1.9)
logger
minitest (~> 5.0)
Expand Down
37 changes: 30 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,10 @@ Or you can also set the environment variable `SOLID_QUEUE_SUPERVISOR_MODE` to `a

**The recommended and default mode is `fork`. Only use `async` if you know what you're doing and have strong reasons to**

This supervisor mode is separate from a worker's `execution_mode`. Supervisor mode decides whether supervised processes live in forks or threads. Worker execution mode decides whether a worker runs claimed jobs in a thread pool or as fibers on a single async reactor thread.

Because these are separate concerns, you can combine the default `fork` supervisor mode with `execution_mode: async` on workers. In that setup, each worker process gets its own async reactor and bounded fiber capacity.

## Configuration

By default, Solid Queue will try to find your configuration under `config/queue.yml`, but you can set a different path using the environment variable `SOLID_QUEUE_CONFIG` or by using the `-c/--config_file` option with `bin/jobs`, like this:
Expand All @@ -222,10 +226,12 @@ production:
batch_size: 500
concurrency_maintenance_interval: 300
workers:
- queues: "*"
threads: 3
polling_interval: 2
- queues: "llm*"
execution_mode: async
capacity: 100
polling_interval: 0.05
- queues: [ real_time, background ]
execution_mode: thread
threads: 5
polling_interval: 0.1
processes: 3
Expand Down Expand Up @@ -271,9 +277,14 @@ Here's an overview of the different options:

Check the sections below on [how queue order behaves combined with priorities](#queue-order-and-priorities), and [how the way you specify the queues per worker might affect performance](#queues-specification-and-performance).

- `threads`: this is the max size of the thread pool that each worker will have to run jobs. Each worker will fetch this number of jobs from their queue(s), at most and will post them to the thread pool to be run. By default, this is `3`. Only workers have this setting.
It is recommended to set this value less than or equal to the queue database's connection pool size minus 2, as each worker thread uses one connection, and two additional connections are reserved for polling and heartbeat.
- `processes`: this is the number of worker processes that will be forked by the supervisor with the settings given. By default, this is `1`, just a single process. This setting is useful if you want to dedicate more than one CPU core to a queue or queues with the same configuration. Only workers have this setting. **Note**: this option will be ignored if [running in `async` mode](#fork-vs-async-mode).
- `execution_mode`: controls how a worker executes claimed jobs. `thread` is the default and uses the existing thread pool behavior. `async` executes jobs as fibers on a single reactor thread. `fiber` is accepted as an alias for `async`.
Async worker execution requires fiber-scoped isolated execution state. In Rails apps, set `config.active_support.isolation_level = :fiber` before using `execution_mode: async`. Solid Queue refuses to boot async workers when isolation remains thread-scoped.
- `threads`: this is the execution capacity for a worker in `thread` mode. It is the max size of the thread pool. By default, this is `3`. Only workers have this setting.
It is recommended to set this value less than or equal to the queue database's connection pool size minus 2, as each worker uses connections for polling and heartbeat and thread mode may use additional connections for job execution.
- `capacity`: an alias for worker execution capacity. This is the clearer name when `execution_mode: async`, because it refers to in-flight execution capacity rather than operating system threads.
Async workers reject `threads`; use `capacity` or `fibers` instead. On Rails 7.2 and later, a practical starting point is usually `3-5` queue database connections per worker process rather than `capacity`, because ordinary Active Record query paths can release connections between async waits. On Rails 7.1, size the queue database pool more conservatively, as in-flight async jobs may still retain connections roughly in proportion to `capacity`.
- `fibers`: an alias for `capacity` when `execution_mode: async`.
- `processes`: this is the number of worker processes that will be forked by the supervisor with the settings given. By default, this is `1`, just a single process. This setting is useful if you want to dedicate more than one CPU core to a queue or queues with the same configuration. Only workers have this setting. This works with both `execution_mode: thread` and `execution_mode: async` as long as the supervisor is running in the default `fork` mode. **Note**: this option is ignored only when the supervisor itself is [running in `async` mode](#fork-vs-async-mode).
- `concurrency_maintenance`: whether the dispatcher will perform the concurrency maintenance work. This is `true` by default, and it's useful if you don't use any [concurrency controls](#concurrency-controls) and want to disable it or if you run multiple dispatchers and want some of them to just dispatch jobs without doing anything else.


Expand Down Expand Up @@ -364,7 +375,15 @@ queues: back*

### Threads, processes, and signals

Workers in Solid Queue use a thread pool to run work in multiple threads, configurable via the `threads` parameter above. Besides this, parallelism can be achieved via multiple processes on one machine (configurable via different workers or the `processes` parameter above) or by horizontal scaling.
By default, workers in Solid Queue use a thread pool to run work in multiple threads, configurable via the `threads` parameter above. Workers can also be configured with `execution_mode: async`, in which case claimed jobs are executed as fibers on a single reactor thread and bounded by the worker's execution capacity. Besides this, parallelism can be achieved via multiple processes on one machine (configurable via different workers or the `processes` parameter above) or by horizontal scaling.

Async worker execution is best suited for cooperative, mostly I/O-bound jobs. Blocking or CPU-heavy work still blocks the single reactor thread, so it should not be expected to outperform thread mode for every workload.

Because async workers run multiple fibers on a single thread, Rails must also isolate execution state per fiber rather than per thread. If your app keeps the default thread-scoped isolation level, Solid Queue will raise a boot-time error instead of running async workers with shared Active Record state.

On Rails 7.2 and later, async workers can often use a much smaller queue database pool than an equivalent thread pool. A practical starting point is `3-5` queue database connections per worker process: one for job execution, one for polling, one for heartbeats, plus some headroom. In the default `fork` supervisor mode, that guidance applies per worker process. In supervisor `async` mode, all workers share one process, so add together the requirements for the workers running there.

That lower-pool guidance depends on job code not holding connections open across async waits. APIs such as `ActiveRecord::Base.connection`, `lease_connection`, `connection_pool.checkout`, or long-lived `with_connection` / transaction blocks can pin connections and push async workers back toward thread-like pool usage. On Rails 7.1, plan conservatively and assume async capacity can still grow queue database connection usage.

The supervisor is in charge of managing these processes, and it responds to the following signals when running in its own process via `bin/jobs` or with [the Puma plugin](#puma-plugin) with the default `fork` mode:
- `TERM`, `INT`: starts graceful termination. The supervisor will send a `TERM` signal to its supervised processes, and it'll wait up to `SolidQueue.shutdown_timeout` time until they're done. If any supervised processes are still around by then, it'll send a `QUIT` signal to them to indicate they must exit.
Expand All @@ -374,6 +393,10 @@ When receiving a `QUIT` signal, if workers still have jobs in-flight, these will

If processes have no chance of cleaning up before exiting (e.g. if someone pulls a cable somewhere), in-flight jobs might remain claimed by the processes executing them. Processes send heartbeats, and the supervisor checks and prunes processes with expired heartbeats. Jobs that were claimed by processes with an expired heartbeat will be marked as failed with a `SolidQueue::Processes::ProcessPrunedError`. You can configure both the frequency of heartbeats and the threshold to consider a process dead. See the section below for this.

Worker heartbeats are driven by a separate timer task, not by the worker execution backend itself. This means async workers do not rely on the reactor loop to prove liveness. However, liveness is still tracked at the worker-process level, not at the individual thread or fiber level.

This means finished and failed jobs still follow the normal Solid Queue lifecycle, but a single stuck job can remain claimed if the worker process itself is still alive. If you need stronger stuck-job detection, that requires an explicit timeout or watchdog mechanism on top of process heartbeats.

In a similar way, if a worker is terminated in any other way not initiated by the above signals (e.g. a worker is sent a `KILL` signal), jobs in progress will be marked as failed so that they can be inspected, with a `SolidQueue::Processes::ProcessExitError`. Sometimes a job in particular is responsible for this, for example, if it has a memory leak and you have a mechanism to kill processes over a certain memory threshold, so this will help identifying this kind of situation.


Expand Down
111 changes: 99 additions & 12 deletions lib/solid_queue/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ class Configuration

validate :ensure_configured_processes
validate :ensure_valid_recurring_tasks
validate :ensure_correctly_sized_thread_pool
validate :ensure_correctly_sized_database_pool
validate :ensure_valid_worker_execution_modes
validate :ensure_async_workers_use_capacity_aliases
validate :ensure_async_workers_have_required_dependency
validate :ensure_async_workers_use_supported_isolation_level

class Process < Struct.new(:kind, :attributes)
def instantiate
Expand All @@ -18,7 +22,8 @@ def instantiate
queues: "*",
threads: 3,
processes: 1,
polling_interval: 0.1
polling_interval: 0.1,
execution_mode: :thread
}

DISPATCHER_DEFAULTS = {
Expand All @@ -35,6 +40,7 @@ def instantiate

DEFAULT_CONFIG_FILE_PATH = "config/queue.yml"
DEFAULT_RECURRING_SCHEDULE_FILE_PATH = "config/recurring.yml"
ASYNC_QUERY_SCOPED_CONNECTIONS_VERSION = Gem::Version.new("7.2.0")

def initialize(**options)
@options = options.with_defaults(default_options)
Expand Down Expand Up @@ -88,13 +94,46 @@ def ensure_valid_recurring_tasks
end
end

def ensure_correctly_sized_thread_pool
if (db_pool_size = SolidQueue::Record.connection_pool&.size) && db_pool_size < estimated_number_of_threads
errors.add(:base, "Solid Queue is configured to use #{estimated_number_of_threads} threads but the " +
"database connection pool is #{db_pool_size}. Increase it in `config/database.yml`")
def ensure_correctly_sized_database_pool
if (db_pool_size = SolidQueue::Record.connection_pool&.size) && db_pool_size < estimated_database_pool_size
errors.add(:base, "Solid Queue requires at least #{estimated_database_pool_size} database connections " +
"for the configured workers, but the queue database connection pool is #{db_pool_size}. " +
"Increase it in `config/database.yml`")
end
end

def ensure_valid_worker_execution_modes
workers_options.each do |options|
SolidQueue::ExecutionPools.normalize_mode(options[:execution_mode] || WORKER_DEFAULTS[:execution_mode])
rescue ArgumentError => error
errors.add(:base, error.message)
end
end

def ensure_async_workers_use_capacity_aliases
workers_options.each do |options|
if async_worker?(options) && options.key?(:threads)
errors.add(:base, "Async workers do not accept `threads`. Use `capacity` or `fibers` instead.")
end
end
end

def ensure_async_workers_have_required_dependency
return unless workers_options.any? { |options| async_worker?(options) }

SolidQueue::ExecutionPools::AsyncPool.ensure_dependency!
rescue LoadError => error
errors.add(:base, error.message)
end

def ensure_async_workers_use_supported_isolation_level
return unless workers_options.any? { |options| async_worker?(options) }

SolidQueue::ExecutionPools::AsyncPool.ensure_supported_isolation_level!
rescue ArgumentError => error
errors.add(:base, error.message)
end

def default_options
{
mode: ENV["SOLID_QUEUE_SUPERVISOR_MODE"] || :fork,
Expand Down Expand Up @@ -131,7 +170,8 @@ def workers
1
end

processes.times.map { Process.new(:worker, worker_options.with_defaults(WORKER_DEFAULTS)) }
defaults = worker_defaults_for(worker_options)
processes.times.map { Process.new(:worker, worker_options.with_defaults(defaults)) }
end
end

Expand All @@ -153,7 +193,7 @@ def schedulers

def workers_options
@workers_options ||= processes_config.fetch(:workers, [])
.map { |options| options.dup.symbolize_keys }
.map { |options| normalize_worker_options(options) }
end

def dispatchers_options
Expand Down Expand Up @@ -226,10 +266,57 @@ def load_config_from_file(file)
end
end

def estimated_number_of_threads
# At most "threads" in each worker + 1 thread for the worker + 1 thread for the heartbeat task
thread_count = workers_options.map { |options| options.fetch(:threads, WORKER_DEFAULTS[:threads]) }.max
(thread_count || 1) + 2
def estimated_database_pool_size
worker_pool_size = workers_options.map { |options| estimated_database_pool_size_for_worker(options) }.max
worker_pool_size || 1
end

def estimated_database_pool_size_for_worker(options)
estimated_execution_connections_for_worker(options) + 2
end

def normalize_worker_options(options)
options = options.dup.symbolize_keys
options[:execution_mode] = normalized_worker_execution_mode(options)
options[:capacity] = worker_capacity(options) if options.key?(:capacity) || options.key?(:fibers)
options[:threads] = worker_capacity(options) unless async_worker?(options) && !options.key?(:threads)
options
end

def worker_capacity(options)
options[:capacity] || options[:fibers] || options[:threads] || WORKER_DEFAULTS[:threads]
end

def normalized_worker_execution_mode(options)
SolidQueue::ExecutionPools.normalize_mode(options[:execution_mode] || WORKER_DEFAULTS[:execution_mode])
rescue ArgumentError
options[:execution_mode] || WORKER_DEFAULTS[:execution_mode]
end

def estimated_execution_connections_for_worker(options)
async_worker?(options) ? async_execution_connections_for_worker(options) : worker_capacity(options)
end

def async_execution_connections_for_worker(options)
async_jobs_release_connections_between_queries? ? 1 : worker_capacity(options)
end

def async_jobs_release_connections_between_queries?
ActiveRecord.gem_version >= ASYNC_QUERY_SCOPED_CONNECTIONS_VERSION
end

def async_worker?(options)
normalized_worker_execution_mode(options) == :async
end

def worker_defaults_for(options)
if async_worker?(options)
WORKER_DEFAULTS.except(:threads).tap do |defaults|
defaults[:capacity] = WORKER_DEFAULTS[:threads] unless options.key?(:threads)
end
else
WORKER_DEFAULTS
end
end
end
end
27 changes: 27 additions & 0 deletions lib/solid_queue/execution_pools.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# frozen_string_literal: true

module SolidQueue
module ExecutionPools
class << self
def build(mode:, size:, on_state_change: nil)
case normalize_mode(mode)
when :thread
ThreadPool.new(size, on_state_change: on_state_change)
when :async
AsyncPool.new(size, on_state_change: on_state_change)
end
end

def normalize_mode(mode)
case mode.to_s
when "", "thread"
:thread
when "async", "fiber"
:async
else
raise ArgumentError, "Unknown execution mode #{mode.inspect}. Expected one of: :thread, :async, :fiber"
end
end
end
end
end
Loading
Loading