Skip to content

fix(aws_s3 source): complete in-flight SQS message processing before shutdown#24670

Open
sanjams2 wants to merge 6 commits intovectordotdev:masterfrom
sanjams2:sanjams2/sqs-dropped-messages
Open

fix(aws_s3 source): complete in-flight SQS message processing before shutdown#24670
sanjams2 wants to merge 6 commits intovectordotdev:masterfrom
sanjams2:sanjams2/sqs-dropped-messages

Conversation

@sanjams2
Copy link
Contributor

Problem

During shutdown, tokio::select! in IngestorProcess::run() cancels run_once() mid-flight, which can leave SQS messages undeleted after events have already been sent to sinks, causing duplicate delivery after visibility_timeout expires.

Fix

Split run_once() into two distinct phases:

  1. Cancellable receive_with_backoff() — polls SQS for messages with exponential backoff on errors. This is raced against shutdown in select! and is safe to cancel because no messages have been processed yet.

  2. Non-cancellable process_messages() — handles each received message, forwards deferred entries, and batch-deletes successfully processed messages from SQS. This runs to completion without being raced against shutdown, ensuring SQS deletes are never skipped.

This preserves all existing behavior (error metrics, deferred queue forwarding, batch delete handling, backoff logic) while eliminating the race condition.

Tests

Added 3 new unit tests:

  • shutdown_interrupts_long_poll — verifies that shutdown interrupts the SQS receive phase cleanly
  • shutdown_interrupts_backoff_sleep — verifies that shutdown interrupts the backoff sleep between retries
  • inflight_processing_completes_despite_shutdown — verifies that in-flight message processing (including SQS delete) runs to completion even when shutdown fires mid-processing

Closes #24010

…shutdown

Refactor IngestorProcess::run() to use a two-phase approach that prevents
the shutdown race condition where tokio::select! could cancel run_once()
mid-flight, leaving SQS messages undeleted and causing duplicate delivery
after visibility_timeout.

Phase 1 (cancellable): receive_with_backoff() polls SQS for messages,
handling errors with exponential backoff. This phase is safe to cancel
via select! because no messages have been received yet.

Phase 2 (non-cancellable): process_messages() handles each message,
forwards deferred entries, and batch-deletes successfully processed
messages. This phase runs to completion without being raced against
shutdown, ensuring SQS deletes are never skipped.

The old run_once() method is replaced by receive_with_backoff() and
process_messages(), with all existing behavior preserved: error metrics,
deferred queue forwarding, batch delete handling, and backoff logic.
@sanjams2 sanjams2 requested a review from a team as a code owner February 17, 2026 15:47
@github-actions github-actions bot added the domain: sources Anything related to the Vector's sources label Feb 17, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

domain: sources Anything related to the Vector's sources

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Block aws_s3_source graceful shutdown until object processing completes

1 participant