Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 57 additions & 46 deletions docs/core/parallel.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@

**Branch** - An individual function within a parallel operation. Each branch executes independently and can succeed or fail without affecting other branches.

**BatchResult** - The result object returned by parallel operations, containing successful results, failed results, and execution metadata.
**BatchResult** - The result object returned by parallel operations. It includes a `BatchItem` for each branch plus counts and completion metadata.

**BatchItem** - A per-branch entry with `index`, `status`, `result`, and `error` (if failed).

**Completion strategy** - Configuration that determines when a parallel operation completes (e.g., all successful, first successful, all completed).

Expand Down Expand Up @@ -55,7 +57,7 @@ Use parallel operations to:
- **Independent execution** - Each branch runs in its own child context with isolated state
- **Flexible completion** - Configure when the operation completes (all successful, first successful, etc.)
- **Error isolation** - One branch failing doesn't automatically fail others
- **Result collection** - Automatic collection of successful and failed results
- **Result collection** - Automatic collection of per-branch status, results, and errors
- **Concurrency control** - Limit maximum concurrent branches with `max_concurrency`
- **Checkpointing** - Results are checkpointed as branches complete

Expand Down Expand Up @@ -84,14 +86,14 @@ def handler(event: dict, context: DurableContext) -> list[str]:
result: BatchResult[str] = context.parallel([task1, task2, task3])

# Return successful results
return result.successful_results
return result.get_results()
```

When this function runs:
1. All three tasks execute concurrently
2. Each task runs in its own child context
3. Results are collected as tasks complete
4. The `BatchResult` contains all successful results
4. The `BatchResult` contains per-branch status and results; `get_results()` returns successes

[↑ Back to top](#table-of-contents)

Expand All @@ -114,15 +116,16 @@ def parallel(
- `config` (optional) - A `ParallelConfig` object to configure concurrency limits, completion criteria, and serialization.

**Returns:** A `BatchResult[T]` object containing:
- `successful_results` - List of results from branches that succeeded
- `failed_results` - List of results from branches that failed
- `total_count` - Total number of branches
- `success_count` - Number of successful branches
- `failure_count` - Number of failed branches
- `status` - Overall status of the parallel operation
- `all` - List of `BatchItem` entries (one per branch) with `index`, `status`, `result`, and `error`
- `get_results()` - List of successful branch results
- `get_errors()` - List of `ErrorObject` entries for failed branches
- `succeeded()` / `failed()` / `started()` - `BatchItem` lists filtered by status
- `total_count`, `success_count`, `failure_count`, `started_count` - Branch counts by status
- `status` - Overall `BatchItemStatus` (FAILED if any branch failed)
- `completion_reason` - Why the operation completed
- `throw_if_error()` - Raises the first branch error, if any

**Raises:** Exceptions are captured per branch and included in `failed_results`. The parallel operation itself doesn't raise unless all branches fail (depending on completion configuration).
**Raises:** Branch exceptions are captured in the `BatchResult`. Call `throw_if_error()` if you want to raise the first failure.

[↑ Back to top](#table-of-contents)

Expand Down Expand Up @@ -162,7 +165,7 @@ def handler(event: dict, context: DurableContext) -> dict:
return {
"total": result.total_count,
"successful": result.success_count,
"results": result.successful_results,
"results": result.get_results(),
}
```

Expand Down Expand Up @@ -190,22 +193,25 @@ def handler(event: dict, context: DurableContext) -> dict:

return {
# Successful results only
"successful": result.successful_results,
"successful": result.succeeded(),

# Failed results (if any)
"failed": result.failed_results,
"failed": result.failed(),

# Counts
"total_count": result.total_count,
"success_count": result.success_count,
"failure_count": result.failure_count,
"started_count": result.started_count,

# Status information
"status": result.status.value,
"completion_reason": result.completion_reason.value,
}
```

Use `result.succeeded()`, `result.failed()`, or `result.started()` for `BatchItem` lists filtered by status, and `result.throw_if_error()` to raise the first failure when you want exceptions instead of error objects.

### Accessing individual results

Results are ordered by branch index:
Expand All @@ -232,19 +238,23 @@ def handler(event: dict, context: DurableContext) -> dict:

result: BatchResult[str] = context.parallel([task_a, task_b, task_c])

results = result.get_results()

# Access results by index
first_result = result.successful_results[0] # "Result A"
second_result = result.successful_results[1] # "Result B"
third_result = result.successful_results[2] # "Result C"
first_result = results[0] # "Result A"
second_result = results[1] # "Result B"
third_result = results[2] # "Result C"

return {
"first": first_result,
"second": second_result,
"third": third_result,
"all": result.successful_results,
"all": results,
}
```

If you need branch-indexed access even when failures occur, iterate `result.all` and match on `item.index`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is true. you can also access result.all[index] directly to directly to branch of interest, without iterating.


[↑ Back to top](#table-of-contents)

## Configuration
Expand Down Expand Up @@ -281,11 +291,8 @@ def handler(event: dict, context: DurableContext) -> str:
result: BatchResult[str] = context.parallel(functions, config=config)

# Get the first successful result
first_result = (
result.successful_results[0]
if result.successful_results
else "None"
)
results = result.succeeded()
first_result = results[0] if results else "None"

return f"First successful result: {first_result}"
```
Expand All @@ -303,7 +310,7 @@ config = ParallelConfig(max_concurrency=5)

- `CompletionConfig.all_successful()` - Requires all branches to succeed (default)
- `CompletionConfig.first_successful()` - Completes when any branch succeeds
- `CompletionConfig.all_completed()` - Waits for all branches to complete regardless of success/failure
- `CompletionConfig.all_completed()` - Completes when branches finish; check `started_count` if completion criteria are met early
- Custom configuration with specific success/failure thresholds

```python
Expand All @@ -320,6 +327,8 @@ config = ParallelConfig(

**item_serdes** - Custom serialization for individual branch results. If not provided, uses JSON serialization.

Note: If completion criteria are met early (min success reached or failure tolerance exceeded), unfinished branches are marked `STARTED` in `result.all` and counted in `started_count`.

[↑ Back to top](#table-of-contents)

## Advanced patterns
Expand Down Expand Up @@ -361,8 +370,9 @@ def handler(event: dict, context: DurableContext) -> str:
config=config,
)

if result.successful_results:
return result.successful_results[0]
results = result.get_results()
if results:
return results[0]

return {"error": "All sources failed"}
```
Expand Down Expand Up @@ -400,7 +410,7 @@ def handler(event: dict, context: DurableContext) -> dict:
return {
"processed": result.success_count,
"failed": result.failure_count,
"results": result.successful_results,
"results": result.get_results(),
}
```

Expand Down Expand Up @@ -440,7 +450,7 @@ def handler(event: dict, context: DurableContext) -> dict:

return {
"status": "partial_success",
"successful": result.successful_results,
"successful": result.get_results(),
"failed_count": result.failure_count,
}
```
Expand All @@ -466,7 +476,7 @@ def handler(event: dict, context: DurableContext) -> dict:
task3 = lambda c: c.step(lambda _: "group-a-item-3")

inner_result = ctx.parallel([task1, task2, task3])
return inner_result.successful_results
return inner_result.get_results()

def process_group_b(ctx: DurableContext) -> list:
# Inner parallel operation for group B
Expand All @@ -475,14 +485,14 @@ def handler(event: dict, context: DurableContext) -> dict:
task3 = lambda c: c.step(lambda _: "group-b-item-3")

inner_result = ctx.parallel([task1, task2, task3])
return inner_result.successful_results
return inner_result.get_results()

# Outer parallel operation
result: BatchResult[list] = context.parallel([process_group_a, process_group_b])

return {
"groups_processed": result.success_count,
"results": result.successful_results,
"results": result.get_results(),
}
```

Expand Down Expand Up @@ -518,15 +528,15 @@ def handler(event: dict, context: DurableContext) -> dict:

functions = [successful_task, failing_task, successful_task]

# Use all_completed to wait for all branches
# Use all_completed to collect per-branch status; check started_count for early completion
config = ParallelConfig(
completion_config=CompletionConfig.all_completed()
)

result: BatchResult[str] = context.parallel(functions, config=config)

return {
"successful": result.successful_results,
"successful": result.succeeded(),
"failed_count": result.failure_count,
"status": result.status.value,
}
Expand All @@ -545,14 +555,14 @@ if result.failure_count > 0:
# Some branches failed
return {
"status": "partial_failure",
"successful": result.successful_results,
"successful": result.get_results(),
"failed_count": result.failure_count,
}

# All branches succeeded
return {
"status": "success",
"results": result.successful_results,
"results": result.get_results(),
}
```

Expand All @@ -576,19 +586,19 @@ config = ParallelConfig(
# Ignores failures until at least one succeeds
```

**all_completed()** - Waits for all branches regardless of errors:
**all_completed()** - Waits for branches to complete unless completion criteria are met early:
```python
config = ParallelConfig(
completion_config=CompletionConfig.all_completed()
)
# All branches complete, collect both successes and failures
# If completion criteria are met early, remaining branches are marked STARTED
```

[↑ Back to top](#table-of-contents)

## Result ordering

Results in `successful_results` maintain the same order as the input functions:
Results in `get_results()` maintain the same order as the input functions:

```python
from aws_durable_execution_sdk_python import (
Expand All @@ -610,18 +620,19 @@ def handler(event: dict, context: DurableContext) -> list[str]:
result = context.parallel(functions)

# Results are in the same order as functions
assert result.successful_results[0] == "First"
assert result.successful_results[1] == "Second"
assert result.successful_results[2] == "Third"
results = result.get_results()
assert results[0] == "First"
assert results[1] == "Second"
assert results[2] == "Third"

return result.successful_results
return results
```

**Important:** Even though branches execute concurrently and may complete in any order, the SDK preserves the original order in the results list. This makes it easy to correlate results with inputs.

### Handling partial results

When some branches fail, `successful_results` only contains results from successful branches, but the order is still preserved relative to the input:
When some branches fail, `succeeded()` only contains results from successful branches, but the order is still preserved relative to the input:

```python
# If function at index 1 fails:
Expand Down Expand Up @@ -672,8 +683,8 @@ Choose the right completion strategy for your use case:
- Strict consistency requirements

**all_completed()** - Best for:
- Best-effort operations
- Collecting partial results
- Workflows where you want to observe branch outcomes end-to-end
- Collecting partial results (pair with tolerated failure settings if failures are expected)
- Logging or monitoring tasks

### Checkpointing overhead
Expand Down Expand Up @@ -729,7 +740,7 @@ A: No, branch functions must be synchronous. If you need to call async code, use

**Q: What happens if all branches fail?**

A: The behavior depends on your completion configuration. With `all_successful()`, the operation fails. With `all_completed()`, you get a `BatchResult` with all failures in `failed_results`.
A: The behavior depends on your completion configuration. You always get a `BatchResult`; inspect `get_errors()` or `failed()` to see failures, or call `throw_if_error()` to raise the first error.

**Q: Can I cancel running branches?**

Expand Down
Loading