@@ -11,6 +11,8 @@ import (
1111 "github.com/trufflesecurity/trufflehog/v3/pkg/sources"
1212)
1313
14+ // TODO [INS-207] Add role to legacy scan resumption info
15+
1416// Checkpointer maintains resumption state for S3 bucket scanning,
1517// enabling resumable scans by tracking which objects have been successfully processed.
1618// It provides checkpoints that can be used to resume interrupted scans without missing objects.
@@ -33,6 +35,10 @@ import (
3335// resuming from the correct bucket. The scan will continue from the last checkpointed object
3436// in that bucket.
3537//
38+ // Unit scans are also supported. The encoded resume info in this case tracks the last processed object
39+ // for each unit separately by using the SetEncodedResumeInfoFor method on Progress. To use the
40+ // checkpointer for unit scans, call SetIsUnitScan(true) before starting the scan.
41+ //
3642// For example, if scanning is interrupted after processing 1500 objects across 2 pages:
3743// Page 1 (objects 0-999): Fully processed, checkpoint saved at object 999
3844// Page 2 (objects 1000-1999): Partially processed through 1600, but only consecutive through 1499
@@ -56,6 +62,8 @@ type Checkpointer struct {
5662 // progress holds the scan's overall progress state and enables persistence.
5763 // The EncodedResumeInfo field stores the JSON-encoded ResumeInfo checkpoint.
5864 progress * sources.Progress // Reference to source's Progress
65+
66+ isUnitScan bool // Indicates if scanning is done in unit scan mode
5967}
6068
6169const defaultMaxObjectsPerPage = 1000
@@ -153,9 +161,10 @@ func (p *Checkpointer) UpdateObjectCompletion(
153161 ctx context.Context ,
154162 completedIdx int ,
155163 bucket string ,
164+ role string ,
156165 pageContents []s3types.Object ,
157166) error {
158- ctx = context .WithValues (ctx , "bucket" , bucket , "completedIdx" , completedIdx )
167+ ctx = context .WithValues (ctx , "bucket" , bucket , "role" , role , " completedIdx" , completedIdx )
159168 ctx .Logger ().V (5 ).Info ("Updating progress" )
160169
161170 if completedIdx >= len (p .completedObjects ) {
@@ -184,7 +193,7 @@ func (p *Checkpointer) UpdateObjectCompletion(
184193 }
185194 obj := pageContents [checkpointIdx ]
186195
187- return p .updateCheckpoint (bucket , * obj .Key )
196+ return p .updateCheckpoint (bucket , role , * obj .Key )
188197}
189198
190199// advanceLowestIncompleteIdx moves the lowest incomplete index forward to the next incomplete object.
@@ -198,7 +207,14 @@ func (p *Checkpointer) advanceLowestIncompleteIdx() {
198207
199208// updateCheckpoint persists the current resumption state.
200209// Must be called with lock held.
201- func (p * Checkpointer ) updateCheckpoint (bucket string , lastKey string ) error {
210+ func (p * Checkpointer ) updateCheckpoint (bucket string , role string , lastKey string ) error {
211+ if p .isUnitScan {
212+ unitID := constructS3SourceUnitID (bucket , role )
213+ // track sub-unit resumption state
214+ p .progress .SetEncodedResumeInfoFor (unitID , lastKey )
215+ return nil
216+ }
217+
202218 encoded , err := json .Marshal (& ResumeInfo {CurrentBucket : bucket , StartAfter : lastKey })
203219 if err != nil {
204220 return fmt .Errorf ("failed to encode resume info: %w" , err )
@@ -212,3 +228,11 @@ func (p *Checkpointer) updateCheckpoint(bucket string, lastKey string) error {
212228 )
213229 return nil
214230}
231+
232+ // SetIsUnitScan sets whether the checkpointer is operating in unit scan mode.
233+ func (p * Checkpointer ) SetIsUnitScan (isUnitScan bool ) {
234+ p .mu .Lock ()
235+ defer p .mu .Unlock ()
236+
237+ p .isUnitScan = isUnitScan
238+ }
0 commit comments