diff --git a/rust/crates/sift_stream/src/backup/disk/async_manager.rs b/rust/crates/sift_stream/src/backup/disk/async_manager.rs index 4d322b2fd..031d65e58 100644 --- a/rust/crates/sift_stream/src/backup/disk/async_manager.rs +++ b/rust/crates/sift_stream/src/backup/disk/async_manager.rs @@ -388,7 +388,7 @@ impl AsyncBackupsManager { // // Update the needs_reingest flag for the file based on this checkpoint. if checkpoint.range.end() < file_range.end() { - ctx.needs_reingest = checkpoint.needs_reingest; + ctx.needs_reingest |= checkpoint.needs_reingest; break; } @@ -399,7 +399,10 @@ impl AsyncBackupsManager { // // At this point, the data is fully committed either with backup files for re-ingestion // or confirmed ingested by Sift. - if ctx.last_message_id > self.committed_message_id.unwrap_or(0) { + if self + .committed_message_id + .is_none_or(|id| ctx.last_message_id > id) + { self.committed_message_id = Some(ctx.last_message_id); self.metrics .backups @@ -475,8 +478,10 @@ impl AsyncBackupsManager { { // Effectively commit the message since the checkpoint the message belongs to has been successfully completed // already. - let committed_message_id = self.committed_message_id.unwrap_or(0); - if msg.message_id > committed_message_id { + if self + .committed_message_id + .is_none_or(|id| msg.message_id > id) + { self.committed_message_id = Some(msg.message_id); self.metrics .backups @@ -634,10 +639,13 @@ impl Drop for AsyncBackupsManager { "graceful shutdown was not used -- attempting to sync backup file during drop to prevent data loss" ); + // Capture any data remaining in the BufWriter's internal buffer before + // calling into_inner(), which discards the buffer without flushing. + let remaining_buf = writer.buffer().to_vec(); let file = writer.into_inner(); - // Conver to standard file for blocking sync_all. - let std_file = match file.try_into_std() { + // Convert to standard file for blocking write + sync_all. + let mut std_file = match file.try_into_std() { Ok(std_file) => std_file, Err(_) => { #[cfg(feature = "tracing")] @@ -646,7 +654,19 @@ impl Drop for AsyncBackupsManager { } }; - // Attempt to sync the file. + // Write any buffered data that was not yet flushed to the file. + if !remaining_buf.is_empty() + && let Err(e) = std::io::Write::write_all(&mut std_file, &remaining_buf) + { + #[cfg(feature = "tracing")] + tracing::warn!( + error = %e, + "unable to write buffered data during drop, data may be lost" + ); + return; + } + + // Ensure data is persisted to disk. if let Err(e) = std_file.sync_all() { #[cfg(feature = "tracing")] tracing::warn!( @@ -1629,6 +1649,81 @@ mod test { ); } + /// Regression test BufWriter data lost during Drop. + /// + /// When AsyncBackupsManager is dropped without graceful shutdown, the Drop + /// impl must flush the BufWriter's internal buffer to disk. Previously, + /// `into_inner()` was called without capturing the buffer, silently + /// discarding any unflushed data. + #[tokio::test] + async fn test_drop_flushes_bufwriter_to_disk() { + let tmp_dir = TempDir::new("test_drop_flushes_bufwriter").unwrap(); + let tmp_dir_path = tmp_dir.path(); + let backup_policy = DiskBackupPolicy { + backups_dir: Some(tmp_dir_path.to_path_buf()), + // Large file size so no rotation occurs — all messages stay in one file. + max_backup_file_size: 1024 * 1024, + rolling_file_policy: RollingFilePolicy { + max_file_count: Some(10), + }, + retain_backups: true, + }; + let (control_tx, control_rx) = broadcast::channel(1024); + let (_data_tx, data_rx) = async_channel::bounded(1024); + let metrics = Arc::new(SiftStreamMetrics::default()); + let mut backup_manager = AsyncBackupsManager::new( + true, + "test", + "test", + backup_policy, + control_tx, + control_rx, + data_rx, + metrics, + ) + .await + .unwrap(); + + // Write a few small messages. These are small enough (~50-100 bytes each) + // to stay entirely within the BufWriter's 128KB internal buffer without + // triggering an auto-flush to the OS. + let num_messages = 5; + for i in 0..num_messages { + let msg = create_test_data_message(i, false); + assert!( + backup_manager.handle_data_message(&msg).await.is_ok(), + "message {} should be handled", + i + ); + } + + // Verify the file exists and messages were written to the BufWriter. + assert!( + backup_manager.current_file.is_some(), + "current file should exist" + ); + assert_eq!( + backup_manager.current_file_ctx.message_count, num_messages as usize, + "should have written {} messages", + num_messages + ); + let file_path = backup_manager.current_file_ctx.file_path.clone(); + + // Drop without graceful shutdown — triggers the Drop impl. + drop(backup_manager); + + // Read the file back from disk and verify all messages were persisted. + let decoder = decode_backup::<_, IngestWithConfigDataStreamRequest>(&file_path) + .expect("should be able to decode backup file"); + let decoded_messages: Vec<_> = decoder.into_iter().filter_map(|r| r.ok()).collect(); + assert_eq!( + decoded_messages.len(), + num_messages as usize, + "all {} messages should have been persisted to disk during drop", + num_messages + ); + } + #[tokio::test] async fn test_async_manager_end_to_end() { let tmp_dir = TempDir::new("test_async_manager").unwrap(); @@ -2205,6 +2300,121 @@ mod test { ); } + /// Regression test for `needs_reingest` flag was overwritten instead of OR'd + /// when a checkpoint partially overlaps a file. + /// + /// Scenario: + /// - Messages 0-9 are written to a single backup file. + /// - Message 5 has `dropped_for_ingestion = true`, setting the file's `needs_reingest = true`. + /// - A successful checkpoint (needs_reingest=false) arrives for range 0..=5. + /// - The checkpoint ends in the middle of the file (file covers 0..=9). + /// - Previously, the code used `=` which would clear the flag to `false`. + /// - With the fix (`|=`), the flag correctly remains `true`. + #[tokio::test] + async fn test_dropped_message_flag_preserved_on_partial_checkpoint() { + let tmp_dir = + TempDir::new("test_dropped_message_flag_preserved_on_partial_checkpoint").unwrap(); + let tmp_dir_path = tmp_dir.path(); + let backup_policy = DiskBackupPolicy { + backups_dir: Some(tmp_dir_path.to_path_buf()), + // Large file size so all messages end up in one file. + max_backup_file_size: 1024, + rolling_file_policy: RollingFilePolicy { + max_file_count: Some(10), + }, + retain_backups: false, + }; + let (control_tx, mut control_rx) = broadcast::channel(1024); + let (_data_tx, data_rx) = async_channel::bounded(1024); + let metrics = Arc::new(SiftStreamMetrics::default()); + let mut backup_manager = AsyncBackupsManager::new( + true, + "test", + "test", + backup_policy, + control_tx.clone(), + control_tx.subscribe(), + data_rx, + metrics, + ) + .await + .unwrap(); + + // Write messages 0-9, with message 5 marked as dropped for ingestion. + for i in 0..10 { + let dropped = i == 5; + let msg = create_test_data_message(i, dropped); + assert!( + backup_manager.handle_data_message(&msg).await.is_ok(), + "message {} should be handled", + i + ); + } + + // All messages should be in the current file (large max size). + assert!( + backup_manager.current_file.is_some(), + "current file should exist" + ); + assert!( + backup_manager.file_ctx_buffer.is_empty(), + "no files should have been rotated yet" + ); + assert!( + backup_manager.current_file_ctx.needs_reingest, + "file should need re-ingestion due to dropped message" + ); + + // Send a *successful* checkpoint that partially covers the file (0..=5). + // The file spans 0..=9, so the checkpoint ends in the middle. + backup_manager.checkpoint_queue.push_back(CheckpointInfo { + range: 0..=5, + needs_reingest: false, + }); + assert!( + backup_manager.process_pending_checkpoints().await.is_ok(), + "checkpoint should be processed" + ); + + // After processing, the file should have been rotated to file_ctx_buffer + // (process_checkpoint calls rotate_file). The file's needs_reingest must + // still be true because it contains message 5 which was dropped. + assert!( + !backup_manager.file_ctx_buffer.is_empty(), + "file should be in the buffer after checkpoint processing" + ); + assert!( + backup_manager + .file_ctx_buffer + .front() + .unwrap() + .needs_reingest, + "file's needs_reingest flag must be preserved (dropped message 5 is in this file)" + ); + + // Now complete the second checkpoint covering the rest of the file. + backup_manager.checkpoint_queue.push_back(CheckpointInfo { + range: 6..=9, + needs_reingest: false, + }); + assert!( + backup_manager.process_pending_checkpoints().await.is_ok(), + "second checkpoint should be processed" + ); + + // The file should have been sent for re-ingestion despite both checkpoints + // being successful, because it contained a dropped message. + assert!( + backup_manager.file_ctx_buffer.is_empty(), + "file should have been processed out of the buffer" + ); + let control_message = control_rx.try_recv(); + assert!( + matches!(control_message, Ok(ControlMessage::ReingestBackups { .. })), + "a ReingestBackups signal should have been sent because the file had a dropped message" + ); + } + #[tokio::test] async fn test_multiple_checkpoints_backup_behind_ingestion() { let tmp_dir = @@ -2529,6 +2739,65 @@ mod test { } } + /// Regression test for message_id 0 can never be committed. + /// + /// The comparison `ctx.last_message_id > self.committed_message_id.unwrap_or(0)` + /// evaluates to `0 > 0 = false` when committed_message_id is None and the + /// message_id is 0, so committed_message_id is never set to Some(0). + #[tokio::test] + async fn test_message_id_zero_can_be_committed() { + let tmp_dir = TempDir::new("test_message_id_zero_can_be_committed").unwrap(); + let tmp_dir_path = tmp_dir.path(); + let backup_policy = DiskBackupPolicy { + backups_dir: Some(tmp_dir_path.to_path_buf()), + max_backup_file_size: 1024, + rolling_file_policy: RollingFilePolicy { + max_file_count: Some(10), + }, + retain_backups: false, + }; + let (control_tx, _control_rx) = broadcast::channel(1024); + let (_data_tx, data_rx) = async_channel::bounded(1024); + let metrics = Arc::new(SiftStreamMetrics::default()); + let mut backup_manager = AsyncBackupsManager::new( + true, + "test", + "test", + backup_policy, + control_tx.clone(), + control_tx.subscribe(), + data_rx, + metrics, + ) + .await + .unwrap(); + + // Write only message 0. + let msg = create_test_data_message(0, false); + assert!( + backup_manager.handle_data_message(&msg).await.is_ok(), + "message 0 should be handled" + ); + + // Complete a successful checkpoint covering just message 0. + backup_manager.checkpoint_queue.push_back(CheckpointInfo { + range: 0..=0, + needs_reingest: false, + }); + assert!( + backup_manager.process_pending_checkpoints().await.is_ok(), + "checkpoint should be processed" + ); + + // committed_message_id must be Some(0), not None. + // The system should distinguish "message 0 committed" from "nothing committed." + assert_eq!( + backup_manager.committed_message_id(), + Some(0), + "committed_message_id should be Some(0) after committing message 0" + ); + } + #[tokio::test] async fn test_checkpoint_with_no_messages() { let tmp_dir = TempDir::new("test_checkpoint_no_messages").unwrap(); @@ -3032,4 +3301,128 @@ mod test { "backup0 file should not have been removed" ); } + + /// Demonstrates a data loss scenario caused by a missing CheckpointComplete + /// in the IngestionTask's stream failure branch (tasks.rs line 322-338). + /// + /// When a gRPC stream fails outside of a checkpoint timer tick, the IngestionTask + /// sends CheckpointNeedsReingestion but does NOT send a corresponding + /// CheckpointComplete. The next timer-triggered CheckpointComplete covers a + /// different message range, consuming the reingest signal without overlap. + /// The backup files for the failed stream's messages are then deleted without + /// re-ingestion. + /// + /// Sequence: + /// 1. Stream #2 processes messages 0-9, then gRPC fails + /// 2. IngestionTask sends CheckpointNeedsReingestion { 0, 9 } (no CheckpointComplete) + /// 3. Stream #3 processes messages 10-19 + /// 4. Timer fires → CheckpointComplete { 10, 19 } + /// 5. Backup manager: reingest range 0..=9 doesn't overlap 10..=19 → reingest lost + /// 6. Files for messages 0-9 are deleted → DATA LOSS + #[tokio::test] + async fn test_reingest_signal_lost_when_stream_fails_outside_checkpoint() { + let tmp_dir = + TempDir::new("test_reingest_signal_lost_when_stream_fails_outside_checkpoint").unwrap(); + let tmp_dir_path = tmp_dir.path(); + let backup_policy = DiskBackupPolicy { + backups_dir: Some(tmp_dir_path.to_path_buf()), + max_backup_file_size: 64, + rolling_file_policy: RollingFilePolicy { + max_file_count: Some(10), + }, + retain_backups: false, + }; + let (control_tx, mut control_rx) = broadcast::channel(1024); + let (_data_tx, data_rx) = async_channel::bounded(1024); + let metrics = Arc::new(SiftStreamMetrics::default()); + let mut backup_manager = AsyncBackupsManager::new( + true, + "test", + "test", + backup_policy, + control_tx.clone(), + control_tx.subscribe(), + data_rx, + metrics, + ) + .await + .unwrap(); + + // Step 1: Stream #2 processes messages 0-9 (backed up to disk). + for i in 0..10 { + let msg = create_test_data_message(i, false); + backup_manager.handle_data_message(&msg).await.unwrap(); + } + + // Snapshot the backup file paths that contain messages 0-9. + let first_batch_files: Vec = backup_manager + .file_ctx_buffer + .iter() + .map(|ctx| ctx.file_path.clone()) + .collect(); + assert!( + !first_batch_files.is_empty(), + "should have backup files for first batch" + ); + + // Step 2: Stream #2 fails (gRPC error outside timer tick). + // With the fix, IngestionTask now sends BOTH CheckpointNeedsReingestion and + // CheckpointComplete for the failed stream's range, ensuring the reingest + // signal is properly paired and not consumed by a later unrelated checkpoint. + // + // Simulate CheckpointNeedsReingestion { 0, 9 }: + backup_manager.next_checkpoint_reingest_range = Some(0..=9); + // Simulate the paired CheckpointComplete { 0, 9 } (the fix): + let checkpoint_range = 0..=9u64; + let needs_reingest = + if let Some(reingest_range) = backup_manager.next_checkpoint_reingest_range.take() { + ranges_overlap(&checkpoint_range, &reingest_range) + } else { + false + }; + backup_manager.checkpoint_queue.push_back(CheckpointInfo { + range: checkpoint_range, + needs_reingest, + }); + backup_manager.process_pending_checkpoints().await.unwrap(); + + // Step 3: Verify that a ReingestBackups signal was sent for the failed + // stream's files and that the files still exist on disk for re-ingestion. + let mut reingest_files = Vec::new(); + while let Ok(msg) = control_rx.try_recv() { + if let ControlMessage::ReingestBackups { backup_files } = msg { + reingest_files.extend(backup_files); + } + } + assert!( + !reingest_files.is_empty(), + "a ReingestBackups signal should have been sent for the failed stream's backup files" + ); + for file_path in &reingest_files { + assert!( + file_path.exists(), + "backup file {} should exist on disk for re-ingestion", + file_path.display() + ); + } + + // Step 4: Stream #3 processes messages 10-19 and checkpoints successfully. + for i in 10..20 { + let msg = create_test_data_message(i, false); + backup_manager.handle_data_message(&msg).await.unwrap(); + } + backup_manager.checkpoint_queue.push_back(CheckpointInfo { + range: 10..=19, + needs_reingest: false, + }); + backup_manager.process_pending_checkpoints().await.unwrap(); + + // Step 5: The failed stream's files were already handled in step 3. + // The successful checkpoint should have cleaned up messages 10-19 without + // interfering with the earlier re-ingestion. + assert!( + backup_manager.file_ctx_buffer.is_empty(), + "all backup files should be processed after both checkpoints" + ); + } } diff --git a/rust/crates/sift_stream/src/stream/tasks.rs b/rust/crates/sift_stream/src/stream/tasks.rs index 224308fca..7af4c094e 100644 --- a/rust/crates/sift_stream/src/stream/tasks.rs +++ b/rust/crates/sift_stream/src/stream/tasks.rs @@ -328,6 +328,12 @@ impl IngestionTask { } Err(e) => { current_wait = self.handle_failed_stream(&e, stream_created_at, current_wait, first_message_id.load(Ordering::Relaxed), last_message_id.load(Ordering::Relaxed))?; + + // Send CheckpointComplete to pair with the CheckpointNeedsReingestion + // sent by handle_failed_stream. Without this, the reingest signal can + // be consumed by a later non-overlapping CheckpointComplete, causing + // backup files from the failed stream to be deleted without re-ingestion. + self.control_tx.send(ControlMessage::CheckpointComplete { first_message_id: first_message_id.load(Ordering::Relaxed), last_message_id: last_message_id.load(Ordering::Relaxed) }).map_err(|e| Error::new(ErrorKind::StreamError, e))?; } } @@ -873,7 +879,9 @@ mod tests { // Verify graceful shutdown drained the data channel and sent the final checkpoint complete message. assert!(data_tx.is_empty(), "data channel should be empty"); - // Each checkpoint expiration should generate a checkpoint complete control message. + // Each stream failure now also generates a CheckpointComplete (paired with + // CheckpointNeedsReingestion), plus 1 final from shutdown. + // With 2 errors configured: 2 failure checkpoints + 1 shutdown = 3. let mut complete_count = 0; while let Ok(msg) = control_rx.try_recv() { if matches!( @@ -886,7 +894,10 @@ mod tests { complete_count += 1; } } - assert_eq!(complete_count, 1, "should have completed 1 checkpoint"); + assert_eq!( + complete_count, 3, + "should have completed 3 checkpoints (2 from stream failures + 1 from shutdown)" + ); } #[tokio::test] diff --git a/rust/crates/sift_stream/tests/test_ingestion_config_streaming_retries.rs b/rust/crates/sift_stream/tests/test_ingestion_config_streaming_retries.rs index f43abcb32..64635cf0f 100644 --- a/rust/crates/sift_stream/tests/test_ingestion_config_streaming_retries.rs +++ b/rust/crates/sift_stream/tests/test_ingestion_config_streaming_retries.rs @@ -137,10 +137,10 @@ async fn test_retries_succeed() { tokio::time::sleep(Duration::from_millis(10)).await; } - assert_eq!( - num_messages as u32, + assert!( + num_messages_received.load(Ordering::Relaxed) >= num_messages as u32, + "expected no messages to be dropped, got {} (may include re-ingested messages from failed streams)", num_messages_received.load(Ordering::Relaxed), - "expected no messages to be dropped", ); assert!(