Conversation
|
Commits Three well-scoped commits, each a single logical change: chunking, backoff reset, and marking unattempted chunks as failed. Messages follow Review LGTM |
| if len(unproc) > 0 { | ||
| if err == nil { | ||
| err = errors.New("ran out of request retries") | ||
| } | ||
| return err | ||
| } |
There was a problem hiding this comment.
Bug: With chunking, returning a plain error here causes the framework to retry all items in the original batch — including items from earlier chunks that were fully written — leading to duplicate writes. This path also doesn't mark items in subsequent unattempted chunks as failed (unlike the individualRequestsLoop path at lines 438-442 which correctly does both).
This should use a BatchError that marks only the unprocessed items and any subsequent unattempted chunks as failed, mirroring the approach in the individual-requests fallback path above.
Review of PR #4112Commits Review
|
DynamoDB BatchWriteItem API limits requests to 25 items. Previously, exceeding this limit caused a fallback to individual writes. Now batches are automatically chunked into groups of 25, with backoff reset between chunks. Items in unattempted chunks are marked as failed in the BatchError when a chunk fails, preventing silent data loss. Closes #992 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
| for chunkStart := 0; chunkStart < len(writeReqs); chunkStart += dynamoDBMaxBatchItems { | ||
| boff.Reset() | ||
| chunkEnd := min(chunkStart+dynamoDBMaxBatchItems, len(writeReqs)) | ||
| chunk := writeReqs[chunkStart:chunkEnd] | ||
|
|
||
| batchResult, err := d.client.BatchWriteItem(ctx, &dynamodb.BatchWriteItemInput{ | ||
| RequestItems: map[string][]types.WriteRequest{ | ||
| *d.table: chunk, | ||
| }, | ||
| }) | ||
| if err != nil { | ||
| headlineErr := err | ||
|
|
||
| // None of the messages were successful, attempt to send individually | ||
| individualRequestsLoop: | ||
| for err != nil { | ||
| batchErr := service.NewBatchError(b, headlineErr) | ||
| for j, req := range chunk { | ||
| if req.PutRequest == nil { | ||
| continue | ||
| } | ||
| select { | ||
| case <-time.After(wait): | ||
| case <-ctx.Done(): | ||
| break individualRequestsLoop | ||
| if _, iErr := d.client.PutItem(ctx, &dynamodb.PutItemInput{ | ||
| TableName: d.table, | ||
| Item: req.PutRequest.Item, | ||
| }); iErr != nil { | ||
| d.log.Errorf("Put error: %v\n", iErr) | ||
| wait := boff.NextBackOff() | ||
| if wait == backoff.Stop { | ||
| break individualRequestsLoop | ||
| } | ||
| select { | ||
| case <-time.After(wait): | ||
| case <-ctx.Done(): | ||
| break individualRequestsLoop | ||
| } | ||
| batchErr.Failed(chunkStart+j, iErr) | ||
| } else { | ||
| chunk[j].PutRequest = nil | ||
| } | ||
| batchErr.Failed(i, iErr) | ||
| } | ||
| if batchErr.IndexedErrors() == 0 { | ||
| err = nil | ||
| } else { | ||
| writeReqs[i].PutRequest = nil | ||
| // Mark all items in subsequent unattempted chunks as | ||
| // failed to prevent silent data loss. | ||
| for k := chunkEnd; k < len(writeReqs); k++ { | ||
| batchErr.Failed(k, headlineErr) | ||
| } | ||
| err = batchErr | ||
| } | ||
| } | ||
| if batchErr.IndexedErrors() == 0 { | ||
| err = nil | ||
| } else { | ||
| err = batchErr | ||
| } | ||
| return err | ||
| } | ||
| return err | ||
| } | ||
|
|
||
| unproc := batchResult.UnprocessedItems[*d.table] | ||
| unprocessedLoop: | ||
| for len(unproc) > 0 { | ||
| wait := boff.NextBackOff() | ||
| if wait == backoff.Stop { | ||
| break unprocessedLoop | ||
| } | ||
| unproc := batchResult.UnprocessedItems[*d.table] | ||
| unprocessedLoop: | ||
| for len(unproc) > 0 { | ||
| wait := boff.NextBackOff() | ||
| if wait == backoff.Stop { | ||
| break unprocessedLoop | ||
| } | ||
|
|
||
| select { | ||
| case <-time.After(wait): | ||
| case <-ctx.Done(): | ||
| break unprocessedLoop | ||
| } | ||
| if batchResult, err = d.client.BatchWriteItem(ctx, &dynamodb.BatchWriteItemInput{ | ||
| RequestItems: map[string][]types.WriteRequest{ | ||
| *d.table: unproc, | ||
| }, | ||
| }); err != nil { | ||
| d.log.Errorf("Write multi error: %v\n", err) | ||
| } else if unproc = batchResult.UnprocessedItems[*d.table]; len(unproc) > 0 { | ||
| err = fmt.Errorf("setting %v items", len(unproc)) | ||
| } else { | ||
| unproc = nil | ||
| select { | ||
| case <-time.After(wait): | ||
| case <-ctx.Done(): | ||
| break unprocessedLoop | ||
| } | ||
| if batchResult, err = d.client.BatchWriteItem(ctx, &dynamodb.BatchWriteItemInput{ | ||
| RequestItems: map[string][]types.WriteRequest{ | ||
| *d.table: unproc, | ||
| }, | ||
| }); err != nil { | ||
| d.log.Errorf("Write multi error: %v\n", err) | ||
| } else if unproc = batchResult.UnprocessedItems[*d.table]; len(unproc) > 0 { | ||
| err = fmt.Errorf("setting %v items", len(unproc)) | ||
| } else { | ||
| unproc = nil | ||
| } | ||
| } | ||
| } | ||
|
|
||
| if len(unproc) > 0 { | ||
| if err == nil { | ||
| err = errors.New("ran out of request retries") | ||
| if len(unproc) > 0 { | ||
| if err == nil { | ||
| err = errors.New("ran out of request retries") | ||
| } | ||
| return err | ||
| } | ||
| } |
There was a problem hiding this comment.
This chunking logic introduces non-trivial behavior changes — index remapping (chunkStart+j), per-chunk backoff resets, and marking subsequent chunks as failed — but no unit tests were added. Consider adding table-driven tests covering: (1) a batch <= 25 items (single chunk, no splitting), (2) a batch > 25 items (multiple chunks, all succeed), (3) a chunk where BatchWriteItem fails and individual fallback partially succeeds (verifying correct global indices in BatchError), and (4) a chunk failure that marks subsequent unattempted chunks as failed.
Per project test patterns: "Flag changed code lacking tests."
|
Commits Review
|
|
Please add integration test |
Summary
BatchWriteItemcalls into groups of 25 items per the DynamoDB API limit, instead of sending all items at once and falling back to expensive one-by-one writes on failureBatchErrorwhen a chunk fails, preventing silent data lossTest plan
go test ./internal/impl/aws/dynamodb/...)Closes #992
🤖 Generated with Claude Code