Skip to content

output/aws_dynamodb: chunk batch writes to 25 item limit#4112

Open
twmb wants to merge 1 commit intomainfrom
992
Open

output/aws_dynamodb: chunk batch writes to 25 item limit#4112
twmb wants to merge 1 commit intomainfrom
992

Conversation

@twmb
Copy link
Copy Markdown
Contributor

@twmb twmb commented Mar 17, 2026

Summary

  • Chunks BatchWriteItem calls into groups of 25 items per the DynamoDB API limit, instead of sending all items at once and falling back to expensive one-by-one writes on failure
  • Resets backoff between chunks so each chunk gets a fresh retry budget
  • Marks items in unattempted chunks as failed in BatchError when a chunk fails, preventing silent data loss

Test plan

  • Existing unit tests pass (go test ./internal/impl/aws/dynamodb/...)
  • Manual test with batch sizes > 25 against a real DynamoDB table
  • Integration tests (require Docker)

Closes #992

🤖 Generated with Claude Code

@claude
Copy link
Copy Markdown

claude bot commented Mar 17, 2026

Commits
LGTM

Three well-scoped commits, each a single logical change: chunking, backoff reset, and marking unattempted chunks as failed. Messages follow system: message format with imperative mood.

Review
Single file change to WriteBatch in the DynamoDB output. Chunks write requests into groups of 25 per the API limit, resets backoff per chunk, adjusts batch error indices correctly with chunkStart+j, and marks items in subsequent unattempted chunks as failed to prevent silent data loss. The unprocessed-items retry path returns a plain error (not BatchError), causing the framework to retry the entire batch — this is pre-existing behavior unchanged by this PR.

LGTM

Comment on lines +475 to 480
if len(unproc) > 0 {
if err == nil {
err = errors.New("ran out of request retries")
}
return err
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: With chunking, returning a plain error here causes the framework to retry all items in the original batch — including items from earlier chunks that were fully written — leading to duplicate writes. This path also doesn't mark items in subsequent unattempted chunks as failed (unlike the individualRequestsLoop path at lines 438-442 which correctly does both).

This should use a BatchError that marks only the unprocessed items and any subsequent unattempted chunks as failed, mirroring the approach in the individual-requests fallback path above.

@claude
Copy link
Copy Markdown

claude bot commented Mar 17, 2026

Review of PR #4112

Commits
LGTM

Review
The chunking logic and backoff reset are correct. The individualRequestsLoop path properly uses BatchError with global indices and marks unattempted chunks as failed. However, the unprocessed-items retry path introduces a data correctness issue.

  1. Bug: plain error return in unprocessed-items path causes duplicate writes with chunking (output.go#L474-L481): When unprocessedLoop exhausts retries, returning a plain error (instead of a BatchError) tells the framework that all items failed. With chunking, this means items from earlier fully-successful chunks get retried, causing duplicates. This path should use BatchError to mark only the specific unprocessed items and subsequent unattempted chunks as failed — matching the approach already used in the individualRequestsLoop fallback.

DynamoDB BatchWriteItem API limits requests to 25 items. Previously,
exceeding this limit caused a fallback to individual writes. Now batches
are automatically chunked into groups of 25, with backoff reset between
chunks. Items in unattempted chunks are marked as failed in the
BatchError when a chunk fails, preventing silent data loss.

Closes #992

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Comment on lines +395 to 481
for chunkStart := 0; chunkStart < len(writeReqs); chunkStart += dynamoDBMaxBatchItems {
boff.Reset()
chunkEnd := min(chunkStart+dynamoDBMaxBatchItems, len(writeReqs))
chunk := writeReqs[chunkStart:chunkEnd]

batchResult, err := d.client.BatchWriteItem(ctx, &dynamodb.BatchWriteItemInput{
RequestItems: map[string][]types.WriteRequest{
*d.table: chunk,
},
})
if err != nil {
headlineErr := err

// None of the messages were successful, attempt to send individually
individualRequestsLoop:
for err != nil {
batchErr := service.NewBatchError(b, headlineErr)
for j, req := range chunk {
if req.PutRequest == nil {
continue
}
select {
case <-time.After(wait):
case <-ctx.Done():
break individualRequestsLoop
if _, iErr := d.client.PutItem(ctx, &dynamodb.PutItemInput{
TableName: d.table,
Item: req.PutRequest.Item,
}); iErr != nil {
d.log.Errorf("Put error: %v\n", iErr)
wait := boff.NextBackOff()
if wait == backoff.Stop {
break individualRequestsLoop
}
select {
case <-time.After(wait):
case <-ctx.Done():
break individualRequestsLoop
}
batchErr.Failed(chunkStart+j, iErr)
} else {
chunk[j].PutRequest = nil
}
batchErr.Failed(i, iErr)
}
if batchErr.IndexedErrors() == 0 {
err = nil
} else {
writeReqs[i].PutRequest = nil
// Mark all items in subsequent unattempted chunks as
// failed to prevent silent data loss.
for k := chunkEnd; k < len(writeReqs); k++ {
batchErr.Failed(k, headlineErr)
}
err = batchErr
}
}
if batchErr.IndexedErrors() == 0 {
err = nil
} else {
err = batchErr
}
return err
}
return err
}

unproc := batchResult.UnprocessedItems[*d.table]
unprocessedLoop:
for len(unproc) > 0 {
wait := boff.NextBackOff()
if wait == backoff.Stop {
break unprocessedLoop
}
unproc := batchResult.UnprocessedItems[*d.table]
unprocessedLoop:
for len(unproc) > 0 {
wait := boff.NextBackOff()
if wait == backoff.Stop {
break unprocessedLoop
}

select {
case <-time.After(wait):
case <-ctx.Done():
break unprocessedLoop
}
if batchResult, err = d.client.BatchWriteItem(ctx, &dynamodb.BatchWriteItemInput{
RequestItems: map[string][]types.WriteRequest{
*d.table: unproc,
},
}); err != nil {
d.log.Errorf("Write multi error: %v\n", err)
} else if unproc = batchResult.UnprocessedItems[*d.table]; len(unproc) > 0 {
err = fmt.Errorf("setting %v items", len(unproc))
} else {
unproc = nil
select {
case <-time.After(wait):
case <-ctx.Done():
break unprocessedLoop
}
if batchResult, err = d.client.BatchWriteItem(ctx, &dynamodb.BatchWriteItemInput{
RequestItems: map[string][]types.WriteRequest{
*d.table: unproc,
},
}); err != nil {
d.log.Errorf("Write multi error: %v\n", err)
} else if unproc = batchResult.UnprocessedItems[*d.table]; len(unproc) > 0 {
err = fmt.Errorf("setting %v items", len(unproc))
} else {
unproc = nil
}
}
}

if len(unproc) > 0 {
if err == nil {
err = errors.New("ran out of request retries")
if len(unproc) > 0 {
if err == nil {
err = errors.New("ran out of request retries")
}
return err
}
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This chunking logic introduces non-trivial behavior changes — index remapping (chunkStart+j), per-chunk backoff resets, and marking subsequent chunks as failed — but no unit tests were added. Consider adding table-driven tests covering: (1) a batch <= 25 items (single chunk, no splitting), (2) a batch > 25 items (multiple chunks, all succeed), (3) a chunk where BatchWriteItem fails and individual fallback partially succeeds (verifying correct global indices in BatchError), and (4) a chunk failure that marks subsequent unattempted chunks as failed.

Per project test patterns: "Flag changed code lacking tests."

@claude
Copy link
Copy Markdown

claude bot commented Mar 17, 2026

Commits
LGTM

Review
The chunking logic to respect the DynamoDB 25-item BatchWriteItem limit looks correct — index remapping, per-chunk backoff, and marking subsequent chunks as failed on error are all properly handled.

  1. Missing tests for chunking logic — The new chunking behavior (index remapping, multi-chunk splitting, partial failure handling, subsequent-chunk failure marking) has no test coverage. See inline comment on output.go#L395-L481.

@mmatczuk
Copy link
Copy Markdown
Contributor

Please add integration test

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

DynamoDB writer should limit write batches to 25 items

2 participants