Skip to content

How to make solid_queue work well with fibers? #150

@mintuhouse

Description

@mintuhouse

We make quite a few LLM calls in our background jobs and want to leverage fibers to efficiently use the resources as these jobs spend most of their time waiting for HTTP responses.

I have attempted to make it work using async gem
But it sometimes get into a deadlock (my guess) and stops process the jobs

# config/initializers/solid_queue.rb

require "async"

module SolidQueue
  module AsyncableWorker
    extend ActiveSupport::Concern

    def initialize(**options)
      super
      @async = options.fetch(:async, false)
      @pool = Pool.new(options[:threads], on_idle: -> { wake_up }, async: @async)
    end

    def do_start_loop
      if @async
        Async do
          super
        end
      else
        super
      end
    end
  end

  module AsyncablePool
    extend ActiveSupport::Concern

    def initialize(size, on_idle: nil, async: false)
      super(size, on_idle: on_idle)
      @async = async
    end

    def post(execution)
      if @async
        available_threads.decrement
        Async do
          wrap_in_app_executor do
            execution.perform
          rescue => error
            handle_thread_error(error)
          ensure
            available_threads.increment
            mutex.synchronize { on_idle.try(:call) if idle? }
          end
        end
      else
        super
      end
    end
  end
end

SolidQueue::Pool.class_eval do
  prepend SolidQueue::AsyncablePool
end

SolidQueue::Worker.class_eval do
  prepend SolidQueue::AsyncableWorker
end

NOTE: See the newly introduced async option in workers which enables fiber usage in above initialiser code

#config/solid_queue.yml

# The supervisor forks workers and dispatchers according to the configuration, controls their heartbeats, and sends them signals to stop and start them when needed.
default: &default
  # Dispatchers are in charge of selecting jobs scheduled to run in the future that are due and dispatching them,
  # which is simply moving them from the solid_queue_scheduled_executions table over to the solid_queue_ready_executions table so that workers can pick them up.
  # They also do some maintenance work related to concurrency controls.
  dispatchers:
    - polling_interval: 1 # seconds
      batch_size: 500
      concurrency_maintenance_interval: 600 # seconds before checking if blocked jobs can be unblocked
  # Workers are in charge of picking jobs ready to run from queues and processing them.
  # They work off the solid_queue_ready_executions table.
  workers:
    - queues: [critical, default, lowpriority, oneoff]
      async: <%= ENV['SOLID_QUEUE_DISABLE_ASYNC_WORKERS'] != 'true' %> # whether to use async workers (fibers) or not (threads)
      threads: 5 # thread/fiber pool max size. Also used as batch size when fetching jobs.
      processes: <%= (ENV['SOLID_QUEUE_WORKER_PROCESS_COUNT'] || Concurrent.processor_count).to_i %>
      polling_interval: 0.1 # seconds

And start the process regularly bundle exec rake solid_queue:start


Has anyone else attempted similar or know what the issue could be?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions