Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ FROM solid_queue_ready_executions
WHERE queue_name LIKE 'beta%';
```

This type of `DISTINCT` query on a column that's the leftmost column in an index can be performed very fast in MySQL thanks to a technique called [Loose Index Scan](https://dev.mysql.com/doc/refman/8.0/en/group-by-optimization.html#loose-index-scan). PostgreSQL and SQLite, however, don't implement this technique, which means that if your `solid_queue_ready_executions` table is very big because your queues get very deep, this query will get slow. Normally your `solid_queue_ready_executions` table will be small, but it can happen.
This type of `DISTINCT` query on a column that's the leftmost column in an index can be performed very fast in MySQL thanks to a technique called [Loose Index Scan](https://dev.mysql.com/doc/refman/8.0/en/group-by-optimization.html#loose-index-scan). PostgreSQL doesn't implement this technique natively, so Solid Queue uses a [recursive CTE](https://www.postgresql.org/docs/current/queries-with.html#QUERIES-WITH-RECURSIVE) to emulate it, achieving similar performance by walking the B-tree index and jumping between distinct values. SQLite doesn't implement loose index scan either, but this is unlikely to be a problem since SQLite is typically used in development with small datasets.

Similarly to using prefixes, the same will happen if you have paused queues, because we need to get a list of all queues with a query like
```sql
Expand Down
4 changes: 1 addition & 3 deletions app/models/solid_queue/queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@ class Queue

class << self
def all
Job.select(:queue_name).distinct.collect do |job|
new(job.queue_name)
end
Job.distinct_values_of(:queue_name).map { |name| new(name) }
end

def find_by_name(name)
Expand Down
4 changes: 2 additions & 2 deletions app/models/solid_queue/queue_selector.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def include_all_queues?
end

def all_queues
relation.distinct(:queue_name).pluck(:queue_name)
relation.distinct_values_of(:queue_name)
end

def exact_names
Expand All @@ -53,7 +53,7 @@ def exact_names
def prefixed_names
if prefixes.empty? then []
else
relation.where(([ "queue_name LIKE ?" ] * prefixes.count).join(" OR "), *prefixes).distinct(:queue_name).pluck(:queue_name)
relation.distinct_values_of(:queue_name, like_conditions: prefixes)
end
end

Expand Down
2 changes: 2 additions & 0 deletions app/models/solid_queue/record.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ module SolidQueue
class Record < ActiveRecord::Base
self.abstract_class = true

include LooseDistinct

connects_to(**SolidQueue.connects_to) if SolidQueue.connects_to

class << self
Expand Down
52 changes: 52 additions & 0 deletions app/models/solid_queue/record/loose_distinct.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# frozen_string_literal: true

module SolidQueue
class Record
module LooseDistinct
extend ActiveSupport::Concern

class_methods do
def distinct_values_of(column, like_conditions: [])
if postgresql?
loose_distinct_via_recursive_cte(column, like_conditions)
elsif like_conditions.any?
where(like_sql(column, like_conditions)).distinct.pluck(column)
else
distinct.pluck(column)
end
end

private
def loose_distinct_via_recursive_cte(column, like_conditions)
table = quoted_table_name
col = connection.quote_column_name(column)

like_filter = if like_conditions.any?
"AND (" + like_conditions.map { |pattern| sanitize_sql_array([ "#{col} LIKE ?", pattern ]) }.join(" OR ") + ")"
end

sql = <<~SQL.squish
WITH RECURSIVE t AS (
(SELECT #{col} FROM #{table} WHERE #{col} IS NOT NULL #{like_filter} ORDER BY #{col} LIMIT 1)
UNION ALL
SELECT (SELECT #{col} FROM #{table} WHERE #{col} > t.#{col} #{like_filter} ORDER BY #{col} LIMIT 1)
FROM t WHERE t.#{col} IS NOT NULL
)
SELECT #{col} FROM t WHERE #{col} IS NOT NULL
SQL

connection_pool.with_connection { |conn| conn.select_values(sql) }
end

def like_sql(column, patterns)
col = connection.quote_column_name(column)
([ "#{col} LIKE ?" ] * patterns.count).join(" OR ").then { |clause| [ clause, *patterns ] }
end

def postgresql?
connection_pool.with_connection { |conn| conn.adapter_name == "PostgreSQL" }
end
end
end
end
end
34 changes: 34 additions & 0 deletions test/models/solid_queue/ready_execution_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,40 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase
claimed_jobs.map(&:queue_name)
end

test "distinct_values_of returns all distinct queue names" do
AddToBufferJob.perform_later("hey") # goes to background queue

names = SolidQueue::ReadyExecution.distinct_values_of(:queue_name)
assert_includes names, "backend"
assert_includes names, "background"
assert_equal 2, names.size
end

test "distinct_values_of filters by like conditions" do
AddToBufferJob.perform_later("hey") # background queue

names = SolidQueue::ReadyExecution.distinct_values_of(:queue_name, like_conditions: [ "back%" ])
assert_includes names, "backend"
assert_includes names, "background"
assert_equal 2, names.size

names = SolidQueue::ReadyExecution.distinct_values_of(:queue_name, like_conditions: [ "backe%" ])
assert_equal [ "backend" ], names
end

test "distinct_values_of returns empty array for no matches" do
names = SolidQueue::ReadyExecution.distinct_values_of(:queue_name, like_conditions: [ "nonexistent%" ])
assert_equal [], names
end

test "distinct_values_of returns empty array on empty table" do
SolidQueue::ReadyExecution.delete_all
SolidQueue::Job.delete_all

names = SolidQueue::ReadyExecution.distinct_values_of(:queue_name)
assert_equal [], names
end

test "discard all" do
3.times { |i| AddToBufferJob.perform_later(i) }

Expand Down
Loading