diff --git a/docs/core/parallel.md b/docs/core/parallel.md index 90b7b62..d638210 100644 --- a/docs/core/parallel.md +++ b/docs/core/parallel.md @@ -27,7 +27,9 @@ **Branch** - An individual function within a parallel operation. Each branch executes independently and can succeed or fail without affecting other branches. -**BatchResult** - The result object returned by parallel operations, containing successful results, failed results, and execution metadata. +**BatchResult** - The result object returned by parallel operations. It includes a `BatchItem` for each branch plus counts and completion metadata. + +**BatchItem** - A per-branch entry with `index`, `status`, `result`, and `error` (if failed). **Completion strategy** - Configuration that determines when a parallel operation completes (e.g., all successful, first successful, all completed). @@ -55,7 +57,7 @@ Use parallel operations to: - **Independent execution** - Each branch runs in its own child context with isolated state - **Flexible completion** - Configure when the operation completes (all successful, first successful, etc.) - **Error isolation** - One branch failing doesn't automatically fail others -- **Result collection** - Automatic collection of successful and failed results +- **Result collection** - Automatic collection of per-branch status, results, and errors - **Concurrency control** - Limit maximum concurrent branches with `max_concurrency` - **Checkpointing** - Results are checkpointed as branches complete @@ -84,14 +86,14 @@ def handler(event: dict, context: DurableContext) -> list[str]: result: BatchResult[str] = context.parallel([task1, task2, task3]) # Return successful results - return result.successful_results + return result.get_results() ``` When this function runs: 1. All three tasks execute concurrently 2. Each task runs in its own child context 3. Results are collected as tasks complete -4. The `BatchResult` contains all successful results +4. The `BatchResult` contains per-branch status and results; `get_results()` returns successes [↑ Back to top](#table-of-contents) @@ -114,15 +116,16 @@ def parallel( - `config` (optional) - A `ParallelConfig` object to configure concurrency limits, completion criteria, and serialization. **Returns:** A `BatchResult[T]` object containing: -- `successful_results` - List of results from branches that succeeded -- `failed_results` - List of results from branches that failed -- `total_count` - Total number of branches -- `success_count` - Number of successful branches -- `failure_count` - Number of failed branches -- `status` - Overall status of the parallel operation +- `all` - List of `BatchItem` entries (one per branch) with `index`, `status`, `result`, and `error` +- `get_results()` - List of successful branch results +- `get_errors()` - List of `ErrorObject` entries for failed branches +- `succeeded()` / `failed()` / `started()` - `BatchItem` lists filtered by status +- `total_count`, `success_count`, `failure_count`, `started_count` - Branch counts by status +- `status` - Overall `BatchItemStatus` (FAILED if any branch failed) - `completion_reason` - Why the operation completed +- `throw_if_error()` - Raises the first branch error, if any -**Raises:** Exceptions are captured per branch and included in `failed_results`. The parallel operation itself doesn't raise unless all branches fail (depending on completion configuration). +**Raises:** Branch exceptions are captured in the `BatchResult`. Call `throw_if_error()` if you want to raise the first failure. [↑ Back to top](#table-of-contents) @@ -162,7 +165,7 @@ def handler(event: dict, context: DurableContext) -> dict: return { "total": result.total_count, "successful": result.success_count, - "results": result.successful_results, + "results": result.get_results(), } ``` @@ -190,15 +193,16 @@ def handler(event: dict, context: DurableContext) -> dict: return { # Successful results only - "successful": result.successful_results, + "successful": result.succeeded(), # Failed results (if any) - "failed": result.failed_results, + "failed": result.failed(), # Counts "total_count": result.total_count, "success_count": result.success_count, "failure_count": result.failure_count, + "started_count": result.started_count, # Status information "status": result.status.value, @@ -206,6 +210,8 @@ def handler(event: dict, context: DurableContext) -> dict: } ``` +Use `result.succeeded()`, `result.failed()`, or `result.started()` for `BatchItem` lists filtered by status, and `result.throw_if_error()` to raise the first failure when you want exceptions instead of error objects. + ### Accessing individual results Results are ordered by branch index: @@ -232,19 +238,23 @@ def handler(event: dict, context: DurableContext) -> dict: result: BatchResult[str] = context.parallel([task_a, task_b, task_c]) + results = result.get_results() + # Access results by index - first_result = result.successful_results[0] # "Result A" - second_result = result.successful_results[1] # "Result B" - third_result = result.successful_results[2] # "Result C" + first_result = results[0] # "Result A" + second_result = results[1] # "Result B" + third_result = results[2] # "Result C" return { "first": first_result, "second": second_result, "third": third_result, - "all": result.successful_results, + "all": results, } ``` +If you need branch-indexed access even when failures occur, iterate `result.all` and match on `item.index`. + [↑ Back to top](#table-of-contents) ## Configuration @@ -281,11 +291,8 @@ def handler(event: dict, context: DurableContext) -> str: result: BatchResult[str] = context.parallel(functions, config=config) # Get the first successful result - first_result = ( - result.successful_results[0] - if result.successful_results - else "None" - ) + results = result.succeeded() + first_result = results[0] if results else "None" return f"First successful result: {first_result}" ``` @@ -303,7 +310,7 @@ config = ParallelConfig(max_concurrency=5) - `CompletionConfig.all_successful()` - Requires all branches to succeed (default) - `CompletionConfig.first_successful()` - Completes when any branch succeeds -- `CompletionConfig.all_completed()` - Waits for all branches to complete regardless of success/failure +- `CompletionConfig.all_completed()` - Completes when branches finish; check `started_count` if completion criteria are met early - Custom configuration with specific success/failure thresholds ```python @@ -320,6 +327,8 @@ config = ParallelConfig( **item_serdes** - Custom serialization for individual branch results. If not provided, uses JSON serialization. +Note: If completion criteria are met early (min success reached or failure tolerance exceeded), unfinished branches are marked `STARTED` in `result.all` and counted in `started_count`. + [↑ Back to top](#table-of-contents) ## Advanced patterns @@ -361,8 +370,9 @@ def handler(event: dict, context: DurableContext) -> str: config=config, ) - if result.successful_results: - return result.successful_results[0] + results = result.get_results() + if results: + return results[0] return {"error": "All sources failed"} ``` @@ -400,7 +410,7 @@ def handler(event: dict, context: DurableContext) -> dict: return { "processed": result.success_count, "failed": result.failure_count, - "results": result.successful_results, + "results": result.get_results(), } ``` @@ -440,7 +450,7 @@ def handler(event: dict, context: DurableContext) -> dict: return { "status": "partial_success", - "successful": result.successful_results, + "successful": result.get_results(), "failed_count": result.failure_count, } ``` @@ -466,7 +476,7 @@ def handler(event: dict, context: DurableContext) -> dict: task3 = lambda c: c.step(lambda _: "group-a-item-3") inner_result = ctx.parallel([task1, task2, task3]) - return inner_result.successful_results + return inner_result.get_results() def process_group_b(ctx: DurableContext) -> list: # Inner parallel operation for group B @@ -475,14 +485,14 @@ def handler(event: dict, context: DurableContext) -> dict: task3 = lambda c: c.step(lambda _: "group-b-item-3") inner_result = ctx.parallel([task1, task2, task3]) - return inner_result.successful_results + return inner_result.get_results() # Outer parallel operation result: BatchResult[list] = context.parallel([process_group_a, process_group_b]) return { "groups_processed": result.success_count, - "results": result.successful_results, + "results": result.get_results(), } ``` @@ -518,7 +528,7 @@ def handler(event: dict, context: DurableContext) -> dict: functions = [successful_task, failing_task, successful_task] - # Use all_completed to wait for all branches + # Use all_completed to collect per-branch status; check started_count for early completion config = ParallelConfig( completion_config=CompletionConfig.all_completed() ) @@ -526,7 +536,7 @@ def handler(event: dict, context: DurableContext) -> dict: result: BatchResult[str] = context.parallel(functions, config=config) return { - "successful": result.successful_results, + "successful": result.succeeded(), "failed_count": result.failure_count, "status": result.status.value, } @@ -545,14 +555,14 @@ if result.failure_count > 0: # Some branches failed return { "status": "partial_failure", - "successful": result.successful_results, + "successful": result.get_results(), "failed_count": result.failure_count, } # All branches succeeded return { "status": "success", - "results": result.successful_results, + "results": result.get_results(), } ``` @@ -576,19 +586,19 @@ config = ParallelConfig( # Ignores failures until at least one succeeds ``` -**all_completed()** - Waits for all branches regardless of errors: +**all_completed()** - Waits for branches to complete unless completion criteria are met early: ```python config = ParallelConfig( completion_config=CompletionConfig.all_completed() ) -# All branches complete, collect both successes and failures +# If completion criteria are met early, remaining branches are marked STARTED ``` [↑ Back to top](#table-of-contents) ## Result ordering -Results in `successful_results` maintain the same order as the input functions: +Results in `get_results()` maintain the same order as the input functions: ```python from aws_durable_execution_sdk_python import ( @@ -610,18 +620,19 @@ def handler(event: dict, context: DurableContext) -> list[str]: result = context.parallel(functions) # Results are in the same order as functions - assert result.successful_results[0] == "First" - assert result.successful_results[1] == "Second" - assert result.successful_results[2] == "Third" + results = result.get_results() + assert results[0] == "First" + assert results[1] == "Second" + assert results[2] == "Third" - return result.successful_results + return results ``` **Important:** Even though branches execute concurrently and may complete in any order, the SDK preserves the original order in the results list. This makes it easy to correlate results with inputs. ### Handling partial results -When some branches fail, `successful_results` only contains results from successful branches, but the order is still preserved relative to the input: +When some branches fail, `succeeded()` only contains results from successful branches, but the order is still preserved relative to the input: ```python # If function at index 1 fails: @@ -672,8 +683,8 @@ Choose the right completion strategy for your use case: - Strict consistency requirements **all_completed()** - Best for: -- Best-effort operations -- Collecting partial results +- Workflows where you want to observe branch outcomes end-to-end +- Collecting partial results (pair with tolerated failure settings if failures are expected) - Logging or monitoring tasks ### Checkpointing overhead @@ -729,7 +740,7 @@ A: No, branch functions must be synchronous. If you need to call async code, use **Q: What happens if all branches fail?** -A: The behavior depends on your completion configuration. With `all_successful()`, the operation fails. With `all_completed()`, you get a `BatchResult` with all failures in `failed_results`. +A: The behavior depends on your completion configuration. You always get a `BatchResult`; inspect `get_errors()` or `failed()` to see failures, or call `throw_if_error()` to raise the first error. **Q: Can I cancel running branches?**