Skip to content
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
bdc0d26
implemented Source unit for S3.
mustansir14 Jun 26, 2025
21d9334
use bucket as source unit
mustansir14 Jul 7, 2025
a25afff
remove code duplication, reuse from Chunks
mustansir14 Nov 19, 2025
ea21d02
remove unnecessary change
mustansir14 Nov 19, 2025
9915187
remove unused functions
mustansir14 Nov 19, 2025
c32e12c
revisit tests
mustansir14 Nov 19, 2025
5161090
revert unnecessary change
mustansir14 Nov 19, 2025
ef324d1
change SourceUnitKind to s3_bucket
mustansir14 Nov 20, 2025
84e0cda
handle nil objectCount inside scanBucket
mustansir14 Nov 20, 2025
b5a66d5
handle nil objectCount outside loop
mustansir14 Nov 20, 2025
966007f
add bucket to resume log
mustansir14 Nov 20, 2025
50e5a90
Merge branch 'main' into INS-104-Support-units-in-S3-source
amanfcp Nov 20, 2025
0faa70e
add bucket and role to error log, remove enumerating log
mustansir14 Nov 21, 2025
474172c
Merge branch 'INS-104-Support-units-in-S3-source' of mustansir:mustan…
mustansir14 Nov 21, 2025
10f91ff
implement sub unit resumption
mustansir14 Nov 24, 2025
6bfbc14
add comment to checkpointer for unit scans
mustansir14 Nov 24, 2025
1863659
Merge branch 'main' into INS-104-Support-units-in-S3-source
mustansir14 Nov 25, 2025
5ca4151
implement SourceUnitUnmarshaller on source with the new S3SourceUnit,…
mustansir14 Nov 26, 2025
45a133b
Merge branch 'main' into INS-104-Support-units-in-S3-source
mustansir14 Dec 1, 2025
549e6be
add role to SourceUnitID
mustansir14 Dec 2, 2025
b5cb928
Merge branch 'INS-104-Support-units-in-S3-source' of mustansir:mustan…
mustansir14 Dec 2, 2025
1cee9af
Revert "add role to SourceUnitID"
mustansir14 Dec 2, 2025
6f06776
add role to source unit ID, keep track of resumption using source uni…
mustansir14 Dec 3, 2025
85e681b
Merge branch 'main' into INS-104-Support-units-in-S3-source
mustansir14 Dec 3, 2025
53a91c8
rename bucket -> unitID in UnmarshalSourceUnit
mustansir14 Dec 4, 2025
3e8e6b9
Merge branch 'main' into INS-104-Support-units-in-S3-source
mustansir14 Dec 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 27 additions & 3 deletions pkg/sources/s3/checkpointer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"github.com/trufflesecurity/trufflehog/v3/pkg/sources"
)

// TODO [INS-207] Add role to legacy scan resumption info

// Checkpointer maintains resumption state for S3 bucket scanning,
// enabling resumable scans by tracking which objects have been successfully processed.
// It provides checkpoints that can be used to resume interrupted scans without missing objects.
Expand All @@ -33,6 +35,10 @@ import (
// resuming from the correct bucket. The scan will continue from the last checkpointed object
// in that bucket.
//
// Unit scans are also supported. The encoded resume info in this case tracks the last processed object
// for each unit separately by using the SetEncodedResumeInfoFor method on Progress. To use the
// checkpointer for unit scans, call SetIsUnitScan(true) before starting the scan.
//
// For example, if scanning is interrupted after processing 1500 objects across 2 pages:
// Page 1 (objects 0-999): Fully processed, checkpoint saved at object 999
// Page 2 (objects 1000-1999): Partially processed through 1600, but only consecutive through 1499
Expand All @@ -56,6 +62,8 @@ type Checkpointer struct {
// progress holds the scan's overall progress state and enables persistence.
// The EncodedResumeInfo field stores the JSON-encoded ResumeInfo checkpoint.
progress *sources.Progress // Reference to source's Progress

isUnitScan bool // Indicates if scanning is done in unit scan mode
}

const defaultMaxObjectsPerPage = 1000
Expand Down Expand Up @@ -153,9 +161,10 @@ func (p *Checkpointer) UpdateObjectCompletion(
ctx context.Context,
completedIdx int,
bucket string,
role string,
pageContents []s3types.Object,
) error {
ctx = context.WithValues(ctx, "bucket", bucket, "completedIdx", completedIdx)
ctx = context.WithValues(ctx, "bucket", bucket, "role", role, "completedIdx", completedIdx)
ctx.Logger().V(5).Info("Updating progress")

if completedIdx >= len(p.completedObjects) {
Expand Down Expand Up @@ -184,7 +193,7 @@ func (p *Checkpointer) UpdateObjectCompletion(
}
obj := pageContents[checkpointIdx]

return p.updateCheckpoint(bucket, *obj.Key)
return p.updateCheckpoint(bucket, role, *obj.Key)
}

// advanceLowestIncompleteIdx moves the lowest incomplete index forward to the next incomplete object.
Expand All @@ -198,7 +207,14 @@ func (p *Checkpointer) advanceLowestIncompleteIdx() {

// updateCheckpoint persists the current resumption state.
// Must be called with lock held.
func (p *Checkpointer) updateCheckpoint(bucket string, lastKey string) error {
func (p *Checkpointer) updateCheckpoint(bucket string, role string, lastKey string) error {
if p.isUnitScan {
unitID := constructS3SourceUnitID(bucket, role)
// track sub-unit resumption state
p.progress.SetEncodedResumeInfoFor(unitID, lastKey)
return nil
}

encoded, err := json.Marshal(&ResumeInfo{CurrentBucket: bucket, StartAfter: lastKey})
if err != nil {
return fmt.Errorf("failed to encode resume info: %w", err)
Expand All @@ -212,3 +228,11 @@ func (p *Checkpointer) updateCheckpoint(bucket string, lastKey string) error {
)
return nil
}

// SetIsUnitScan sets whether the checkpointer is operating in unit scan mode.
func (p *Checkpointer) SetIsUnitScan(isUnitScan bool) {
p.mu.Lock()
defer p.mu.Unlock()

p.isUnitScan = isUnitScan
}
36 changes: 33 additions & 3 deletions pkg/sources/s3/checkpointer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestCheckpointerResumption(t *testing.T) {

// Process first 6 objects.
for i := range 6 {
err := tracker.UpdateObjectCompletion(ctx, i, "test-bucket", firstPage.Contents)
err := tracker.UpdateObjectCompletion(ctx, i, "test-bucket", "", firstPage.Contents)
assert.NoError(t, err)
}

Expand All @@ -50,7 +50,7 @@ func TestCheckpointerResumption(t *testing.T) {

// Process remaining objects.
for i := range len(resumePage.Contents) {
err := resumeTracker.UpdateObjectCompletion(ctx, i, "test-bucket", resumePage.Contents)
err := resumeTracker.UpdateObjectCompletion(ctx, i, "test-bucket", "", resumePage.Contents)
assert.NoError(t, err)
}

Expand Down Expand Up @@ -244,7 +244,7 @@ func TestCheckpointerUpdate(t *testing.T) {
}
}

err := tracker.UpdateObjectCompletion(ctx, tt.completedIdx, "test-bucket", page.Contents)
err := tracker.UpdateObjectCompletion(ctx, tt.completedIdx, "test-bucket", "", page.Contents)
assert.NoError(t, err, "Unexpected error updating progress")

var info ResumeInfo
Expand All @@ -258,6 +258,36 @@ func TestCheckpointerUpdate(t *testing.T) {
}
}

func TestCheckpointerUpdateUnitScan(t *testing.T) {
ctx := context.Background()
progress := new(sources.Progress)
tracker := NewCheckpointer(ctx, progress)
tracker.SetIsUnitScan(true)

page := &s3.ListObjectsV2Output{
Contents: make([]s3types.Object, 3),
}
for i := range 3 {
key := fmt.Sprintf("key-%d", i)
page.Contents[i] = s3types.Object{Key: &key}
}

// Complete first object.
err := tracker.UpdateObjectCompletion(ctx, 0, "test-bucket", "test-role", page.Contents)
assert.NoError(t, err, "Unexpected error updating progress")

var info map[string]string
err = json.Unmarshal([]byte(progress.EncodedResumeInfo), &info)
var gotUnitID, gotStartAfter string
for k, v := range info {
gotUnitID = k
gotStartAfter = v
}
assert.NoError(t, err, "Failed to decode resume info")
assert.Equal(t, "test-role|test-bucket", gotUnitID, "Incorrect unit ID")
assert.Equal(t, "key-0", gotStartAfter, "Incorrect resume point")
}

func TestComplete(t *testing.T) {
tests := []struct {
name string
Expand Down
Loading
Loading