Skip to content

Commit cf8e241

Browse files
committed
Implement streaming approach
1 parent 155db16 commit cf8e241

File tree

6 files changed

+312
-16
lines changed

6 files changed

+312
-16
lines changed

ruby/lib/ci/queue/configuration.rb

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ class Configuration
66
attr_accessor :requeue_tolerance, :namespace, :failing_test, :statsd_endpoint
77
attr_accessor :max_test_duration, :max_test_duration_percentile, :track_test_duration
88
attr_accessor :max_test_failed, :redis_ttl, :warnings_file, :debug_log, :max_missed_heartbeat_seconds
9+
attr_accessor :batch_upload, :batch_size
910
attr_reader :circuit_breakers
1011
attr_writer :seed, :build_id
1112
attr_writer :queue_init_timeout, :report_timeout, :inactive_workers_timeout
@@ -46,7 +47,8 @@ def initialize(
4647
grind_count: nil, max_duration: nil, failure_file: nil, max_test_duration: nil,
4748
max_test_duration_percentile: 0.5, track_test_duration: false, max_test_failed: nil,
4849
queue_init_timeout: nil, redis_ttl: 8 * 60 * 60, report_timeout: nil, inactive_workers_timeout: nil,
49-
export_flaky_tests_file: nil, warnings_file: nil, debug_log: nil, max_missed_heartbeat_seconds: nil)
50+
export_flaky_tests_file: nil, warnings_file: nil, debug_log: nil, max_missed_heartbeat_seconds: nil,
51+
batch_upload: false, batch_size: 100)
5052
@build_id = build_id
5153
@circuit_breakers = [CircuitBreaker::Disabled]
5254
@failure_file = failure_file
@@ -73,6 +75,8 @@ def initialize(
7375
@warnings_file = warnings_file
7476
@debug_log = debug_log
7577
@max_missed_heartbeat_seconds = max_missed_heartbeat_seconds
78+
@batch_upload = batch_upload
79+
@batch_size = batch_size
7680
end
7781

7882
def queue_init_timeout

ruby/lib/ci/queue/redis/base.rb

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -155,15 +155,33 @@ def wait_for_master(timeout: 30)
155155
return true if master?
156156
return true if queue_initialized?
157157

158-
(timeout * 10 + 1).to_i.times do
159-
if queue_initialized?
160-
return true
161-
else
162-
sleep 0.1
158+
if config.batch_upload
159+
return wait_for_streaming(timeout: timeout)
160+
else
161+
(timeout * 10 + 1).to_i.times do
162+
if queue_initialized?
163+
return true
164+
else
165+
sleep 0.1
166+
end
163167
end
168+
169+
raise LostMaster, "The master worker is still `#{master_status}` after #{timeout} seconds waiting."
170+
end
171+
end
172+
173+
def wait_for_streaming(timeout:)
174+
(timeout * 10 + 1).to_i.times do
175+
status = master_status
176+
177+
# Ready to work if streaming or complete
178+
return true if status == 'streaming' || status == 'ready' || status == 'finished'
179+
180+
# Master hasn't started yet
181+
sleep 0.1
164182
end
165183

166-
raise LostMaster, "The master worker is still `#{master_status}` after #{timeout} seconds waiting."
184+
raise LostMaster, "The master worker didn't start streaming after #{timeout} seconds waiting."
167185
end
168186

169187
def workers_count
@@ -173,7 +191,11 @@ def workers_count
173191
def queue_initialized?
174192
@queue_initialized ||= begin
175193
status = master_status
176-
status == 'ready' || status == 'finished'
194+
if config.batch_upload
195+
status == 'streaming' || status == 'ready' || status == 'finished'
196+
else
197+
status == 'ready' || status == 'finished'
198+
end
177199
end
178200
end
179201

ruby/lib/ci/queue/redis/worker.rb

Lines changed: 186 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,25 @@ def distributed?
2626
end
2727

2828
def populate(tests, random: Random.new)
29-
@index = tests.map { |t| [t.id, t] }.to_h
29+
if config.batch_upload
30+
@index = {}
31+
@source_files_loaded = Set.new
32+
else
33+
@index = tests.map { |t| [t.id, t] }.to_h
34+
end
3035
tests = Queue.shuffle(tests, random)
3136
push(tests.map(&:id))
3237
self
3338
end
3439

40+
def populate_from_files(file_paths, random: Random.new)
41+
@file_paths = file_paths.sort
42+
@index = {}
43+
@source_files_loaded = Set.new
44+
push_files_in_batches(@file_paths, random)
45+
self
46+
end
47+
3548
def populated?
3649
!!defined?(@index)
3750
end
@@ -54,9 +67,17 @@ def poll
5467
wait_for_master
5568
attempt = 0
5669
until shutdown_required? || config.circuit_breakers.any?(&:open?) || exhausted? || max_test_failed?
57-
if test = reserve
70+
if test_id = reserve
5871
attempt = 0
59-
yield index.fetch(test)
72+
73+
# Lazy load test if needed (batch mode)
74+
test = if config.batch_upload && !@index.key?(test_id)
75+
@index[test_id] = build_index_entry(test_id)
76+
else
77+
index.fetch(test_id)
78+
end
79+
80+
yield test
6081
else
6182
# Adding exponential backoff to avoid hammering Redis
6283
# we just stay online here in case a test gets retried or times out so we can afford to wait
@@ -153,6 +174,120 @@ def release!
153174

154175
attr_reader :index
155176

177+
def push_files_in_batches(file_paths, random)
178+
#Elect master (existing logic)
179+
value = key('setup', worker_id)
180+
_, status = redis.pipelined do |pipeline|
181+
pipeline.set(key('master-status'), value, nx: true)
182+
pipeline.get(key('master-status'))
183+
end
184+
185+
if @master = (value == status)
186+
puts "Worker elected as leader, loading and pushing tests in batches..."
187+
puts
188+
189+
# Set status to 'streaming' to signal workers can start
190+
redis.set(key('master-status'), 'streaming')
191+
192+
# Group files into batches based on batch_size
193+
# Since we're batching by files, calculate files per batch to approximate tests per batch
194+
files_per_batch = [config.batch_size / 10, 1].max # Estimate ~10 tests per file
195+
196+
all_tests = []
197+
tests_uploaded = 0
198+
199+
attempts = 0
200+
duration = measure do
201+
file_paths.each_slice(files_per_batch).with_index do |file_batch, batch_num|
202+
# Load files in this batch
203+
batch_tests = []
204+
file_batch.each do |file_path|
205+
abs_path = ::File.expand_path(file_path)
206+
require abs_path
207+
@source_files_loaded.add(abs_path)
208+
end
209+
210+
# Extract tests from newly loaded files
211+
if defined?(Minitest)
212+
Minitest::Test.runnables.each do |runnable|
213+
runnable.runnable_methods.each do |method_name|
214+
test = Minitest::Queue::SingleExample.new(runnable, method_name)
215+
unless @index.key?(test.id)
216+
batch_tests << test
217+
@index[test.id] = test
218+
end
219+
end
220+
end
221+
end
222+
223+
# Shuffle tests in this batch
224+
batch_tests = Queue.shuffle(batch_tests, random)
225+
226+
unless batch_tests.empty?
227+
# Extract metadata
228+
test_ids = []
229+
metadata = {}
230+
231+
batch_tests.each do |test|
232+
test_ids << test.id
233+
if test.respond_to?(:source_location) && (location = test.source_location)
234+
metadata[test.id] = location[0] # file path
235+
end
236+
end
237+
238+
# Upload batch to Redis
239+
with_redis_timeout(5) do
240+
redis.without_reconnect do
241+
redis.pipelined do |pipeline|
242+
pipeline.lpush(key('queue'), test_ids)
243+
pipeline.mapped_hmset(key('test-metadata'), metadata) unless metadata.empty?
244+
pipeline.incr(key('batch-count'))
245+
pipeline.expire(key('queue'), config.redis_ttl)
246+
pipeline.expire(key('test-metadata'), config.redis_ttl)
247+
pipeline.expire(key('batch-count'), config.redis_ttl)
248+
end
249+
end
250+
rescue ::Redis::BaseError => error
251+
if attempts < 3
252+
puts "Retrying batch upload... (#{error})"
253+
attempts += 1
254+
retry
255+
end
256+
raise
257+
end
258+
259+
tests_uploaded += test_ids.size
260+
261+
# Progress reporting
262+
if (batch_num + 1) % 10 == 0 || batch_num == 0
263+
puts "Uploaded #{tests_uploaded} tests from #{(batch_num + 1) * files_per_batch} files..."
264+
end
265+
end
266+
267+
all_tests.concat(batch_tests)
268+
end
269+
end
270+
271+
@total = all_tests.size
272+
273+
# Mark upload complete
274+
redis.multi do |transaction|
275+
transaction.set(key('total'), @total)
276+
transaction.set(key('master-status'), 'ready')
277+
transaction.expire(key('total'), config.redis_ttl)
278+
transaction.expire(key('master-status'), config.redis_ttl)
279+
end
280+
281+
puts
282+
puts "Finished pushing #{@total} tests to the queue in #{duration.round(2)}s."
283+
end
284+
285+
register
286+
redis.expire(key('workers'), config.redis_ttl)
287+
rescue *CONNECTION_ERRORS
288+
raise if @master
289+
end
290+
156291
def reserved_tests
157292
@reserved_tests ||= Concurrent::Set.new
158293
end
@@ -161,6 +296,54 @@ def worker_id
161296
config.worker_id
162297
end
163298

299+
def build_index_entry(test_id)
300+
# Try to load from metadata
301+
file_path = redis.hget(key('test-metadata'), test_id)
302+
303+
if file_path && !@source_files_loaded.include?(file_path)
304+
# Lazy load the test file
305+
require_test_file(file_path)
306+
@source_files_loaded.add(file_path)
307+
end
308+
309+
# Find the test in loaded runnables
310+
find_test_object(test_id)
311+
end
312+
313+
def require_test_file(file_path)
314+
# Make path absolute if needed
315+
abs_path = if file_path.start_with?('/')
316+
file_path
317+
else
318+
::File.expand_path(file_path)
319+
end
320+
321+
# Require the file
322+
require abs_path
323+
rescue LoadError => e
324+
# Log warning but continue
325+
warn "Warning: Could not load test file #{file_path}: #{e.message}"
326+
end
327+
328+
def find_test_object(test_id)
329+
# For Minitest
330+
if defined?(Minitest)
331+
Minitest::Test.runnables.each do |runnable|
332+
runnable.runnable_methods.each do |method_name|
333+
candidate_id = "#{runnable}##{method_name}"
334+
if candidate_id == test_id
335+
return Minitest::Queue::SingleExample.new(runnable, method_name)
336+
end
337+
end
338+
end
339+
end
340+
341+
# Fallback: create a test object that will report an error
342+
warn "Warning: Test #{test_id} not found after loading file. Ensure all dependencies are explicitly required in test_helper.rb"
343+
# Return nil and let index.fetch handle the KeyError
344+
nil
345+
end
346+
164347
def raise_on_mismatching_test(test)
165348
unless reserved_tests.delete?(test)
166349
raise ReservationError, "Acknowledged #{test.inspect} but only #{reserved_tests.map(&:inspect).join(", ")} reserved"

ruby/lib/minitest/queue/runner.rb

Lines changed: 41 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -97,12 +97,10 @@ def run_command
9797
if remaining <= running
9898
puts green("Queue almost empty, exiting early...")
9999
else
100-
load_tests
101-
populate_queue
100+
load_tests_and_populate
102101
end
103102
else
104-
load_tests
105-
populate_queue
103+
load_tests_and_populate
106104
end
107105
end
108106

@@ -357,6 +355,19 @@ def reset_counters
357355
queue.build.reset_worker_error
358356
end
359357

358+
def load_tests_and_populate
359+
if queue_config.batch_upload && queue.respond_to?(:populate_from_files)
360+
# In batch mode, pass file paths directly to the queue
361+
# The master will load files in batches as it uploads
362+
# Workers will load files lazily as needed
363+
Minitest.queue.populate_from_files(argv, random: ordering_seed)
364+
else
365+
# Traditional mode: load all tests upfront
366+
load_tests
367+
populate_queue
368+
end
369+
end
370+
360371
def populate_queue
361372
Minitest.queue.populate(Minitest.loaded_tests, random: ordering_seed)
362373
end
@@ -636,6 +647,32 @@ def parser
636647
queue_config.debug_log = path
637648
end
638649

650+
help = <<~EOS
651+
Enable batch/streaming upload mode. In this mode, the master worker will load test files
652+
and push tests to the queue in batches, allowing other workers to start processing tests
653+
immediately without waiting for all tests to be uploaded. This significantly reduces
654+
startup time for large test suites.
655+
656+
IMPORTANT: When using this mode, test files are loaded lazily on-demand on workers.
657+
You MUST explicitly require all dependencies (models, helpers, etc.) in your test_helper.rb.
658+
Autoloading may not work as expected since not all test files are loaded upfront.
659+
EOS
660+
opts.separator ""
661+
opts.on('--batch-upload', help) do
662+
queue_config.batch_upload = true
663+
end
664+
665+
help = <<~EOS
666+
Specify the number of tests to upload in each batch when --batch-upload is enabled.
667+
Smaller batches allow workers to start sooner but may increase overhead.
668+
Larger batches reduce overhead but increase initial wait time.
669+
Defaults to 100.
670+
EOS
671+
opts.separator ""
672+
opts.on('--batch-size SIZE', Integer, help) do |size|
673+
queue_config.batch_size = size
674+
end
675+
639676
opts.separator ""
640677
opts.separator " retry: Replays a previous run in the same order."
641678

0 commit comments

Comments
 (0)