Conversation
…for faster iteration when developing workflows
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Path: .coderabbit.yaml Review profile: CHILL Plan: Pro Plus Run ID: 📒 Files selected for processing (1)
✅ Files skipped from review due to trivial changes (1)
📝 WalkthroughWalkthroughAdds two local Docker-based executors (standalone and Docker Compose), CLI subcommands to invoke them, Bazel build/test targets and test data, extensive unit/integration tests, documentation entries, and .gitignore additions for local development artifacts. Changes
Sequence Diagram(s)sequenceDiagram
participant User as User/CLI
participant CLI as CLI Parser
participant SE as StandaloneExecutor
participant DAG as DAG Builder
participant Docker as Docker Engine
User->>CLI: osmo standalone run (spec)
CLI->>SE: run_workflow_standalone(spec_path, ...)
SE->>SE: Load & validate spec
SE->>DAG: Build dependency DAG
DAG-->>SE: Task graph
loop For each ready task
SE->>SE: Build token map & write inline files
SE->>Docker: docker run (volumes, GPU, shm)
Docker-->>SE: Exit code / logs
SE->>SE: Persist state, cancel downstream on fatal failures
end
SE-->>CLI: Return success/failure
CLI-->>User: Exit code / message
sequenceDiagram
participant User as User/CLI
participant CLI as CLI Parser
participant CE as ComposeExecutor
participant DAG as DAG Builder
participant Compose as Docker Compose
User->>CLI: osmo docker-compose run (spec)
CLI->>CE: run_workflow_compose(spec_path, ...)
CE->>CE: Load & validate spec
CE->>DAG: Build dependency DAG
CE->>CE: Generate docker-compose.yml & networks
loop While ready waves exist
CE->>CE: Find next ready wave
CE->>Compose: docker compose up --no-deps (wave)
Compose-->>CE: Service exit codes (ps)
CE->>CE: Record TaskResults, apply policies
CE->>Compose: docker compose rm -f (wave)
end
CE->>Compose: docker compose down --remove-orphans
CE-->>CLI: Return success/failure
CLI-->>User: Exit code / message
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 5
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/cli/local.py`:
- Around line 52-66: The parsed args.docker_cmd is not forwarded; update the
call in _run_local to pass docker_cmd=args.docker_cmd into
local_executor.run_workflow_locally, add a docker_cmd parameter to
run_workflow_locally in src/utils/local_executor.py, and thread that parameter
into the LocalExecutor instantiation/use so LocalExecutor receives and uses the
docker command (e.g., add docker_cmd to LocalExecutor.__init__ or the specific
method that launches containers and wire it through all intermediate calls).
In `@src/utils/local_executor.py`:
- Around line 273-295: run_workflow_locally currently always removes work_dir on
success which can delete a caller-supplied directory; change it to only remove
the directory when it was created inside the function. Introduce a flag (e.g.,
created_temp_dir) before calling tempfile.mkdtemp and set it true when you
create the temp directory; then only call shutil.rmtree(work_dir, ...) when
success is True, keep_work_dir is False, and created_temp_dir is True. Update
references in run_workflow_locally and any related cleanup logic so
user-provided work_dir is never removed.
- Around line 228-247: The debug log prints the fully expanded docker_args
(including '-e KEY=value' entries), exposing secrets from task_spec.environment;
update the logic that builds or logs docker_args (references: docker_args,
task_spec.environment, _substitute_tokens, logger.debug) to redact environment
values before logging—e.g., replace values for any '-e KEY=...' entries with a
placeholder like 'REDACTED' (or construct a sanitized copy of docker_args that
masks environment values) and log that sanitized list instead of the original;
ensure actual docker_args passed to subprocess remain unchanged so execution is
unaffected.
- Around line 90-113: The method currently treats an empty run as success when
no tasks were executed (e.g., DAG cycle). After the ready loop, check if no
tasks were executed (self._results is empty or a counter is zero) while there
are tasks defined (self._task_nodes not empty); if so, log an error indicating
the workflow is stalled/contains a cycle (include spec.name) and return False.
Use the existing symbols _find_ready_tasks, _results, _task_nodes and spec.name
to implement this early failure path so stalled workflows are reported as
failures.
In `@src/utils/tests/BUILD`:
- Around line 34-42: The py_test target named "test_local_executor" is marked
manual which hides its coverage from CI; remove the tags = ["manual"] attribute
from the "test_local_executor" target so the suite runs in CI, and if you must
keep Docker-only cases manual, split them out of test_local_executor.py into a
separate test file and add a new py_test target (e.g.,
"test_local_executor_docker") that depends on the Docker-only tests and retains
tags = ["manual"] instead of marking the whole "test_local_executor" target
manual.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: ff387238-7afc-4e5c-8e25-06298591e48d
📒 Files selected for processing (7)
src/cli/BUILDsrc/cli/local.pysrc/cli/main_parser.pysrc/utils/BUILDsrc/utils/local_executor.pysrc/utils/tests/BUILDsrc/utils/tests/test_local_executor.py
- Implemented GPU resource handling in LocalExecutor to allow tasks to request GPU resources. - Added a new method to determine the GPU count for tasks based on their resource specifications. - Updated Docker run command to include GPU options when applicable.
There was a problem hiding this comment.
Actionable comments posted: 4
♻️ Duplicate comments (3)
src/utils/local_executor.py (3)
91-114:⚠️ Potential issue | 🔴 CriticalReport stalled DAGs as failures.
If
readybecomes empty before every node has a result, this falls through to the success log even though some tasks never ran. A cycle is the obvious case, but any permanently blocked dependency hits the same path.Suggested fix
while ready: for task_name in ready: node = self._task_nodes[task_name] logger.info('--- Running task: %s (image: %s) ---', task_name, node.spec.image) result = self._run_task(node, spec) @@ ready = self._find_ready_tasks() + unresolved_tasks = sorted(set(self._task_nodes) - set(self._results)) + if unresolved_tasks: + logger.error( + 'Workflow "%s" stalled with unresolved tasks: %s', + spec.name, + ', '.join(unresolved_tasks), + ) + return False + failed = [name for name, r in self._results.items() if r.exit_code != 0]🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/local_executor.py` around lines 91 - 114, The loop exits as success when no ready tasks remain even if some nodes never ran; update the run loop after ready = self._find_ready_tasks() to detect a stalled DAG by comparing self._results keys against all task names in self._task_nodes (or using a helper) and if there are unreached tasks (e.g., due to a cycle or blocked dependency) log an error listing those task names and return False instead of proceeding to success; reference the methods/attributes _find_ready_tasks, _results, _task_nodes, and leave existing failure handling via _cancel_downstream/_run_task intact.
286-290:⚠️ Potential issue | 🔴 CriticalOnly delete work directories created inside this helper.
A successful run currently
rmtree()s anywork_dir, including a caller-supplied path.--work-dirshould never wipe an existing folder on the normal success path.Suggested fix
def run_workflow_locally(spec_path: str, work_dir: str | None = None, keep_work_dir: bool = False) -> bool: - if work_dir is None: + created_temp_work_dir = work_dir is None + if created_temp_work_dir: work_dir = tempfile.mkdtemp(prefix='osmo-local-') logger.info('Using temporary work directory: %s', work_dir) @@ - if not keep_work_dir and success: + if created_temp_work_dir and not keep_work_dir and success: logger.info('Cleaning up work directory: %s', work_dir) shutil.rmtree(work_dir, ignore_errors=True)Also applies to: 306-308
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/local_executor.py` around lines 286 - 290, The current run_workflow_locally function always removes work_dir on success which can delete caller-supplied folders; modify run_workflow_locally to track whether the helper created the directory (e.g., set created_temp_dir = True when assigning work_dir = tempfile.mkdtemp(...)) and only call shutil.rmtree(work_dir) at cleanup when created_temp_dir is True and keep_work_dir is False; ensure all cleanup branches (including the success path and exception paths around the existing logic that references work_dir and keep_work_dir) use this flag so externally provided work_dir is never deleted by the helper.
241-260:⚠️ Potential issue | 🟠 MajorRedact env values before logging the Docker command.
By the time this debug log runs,
docker_argsalready contains resolved-e KEY=valuepairs, so any secret passed throughenvironment:is written to logs verbatim.Suggested fix
- logger.debug('Docker command: %s', ' '.join(docker_args)) + redacted_docker_args = list(docker_args) + for index, value in enumerate(redacted_docker_args[:-1]): + if value == '-e': + key = redacted_docker_args[index + 1].split('=', 1)[0] + redacted_docker_args[index + 1] = f'{key}=***' + logger.debug('Docker command: %s', ' '.join(redacted_docker_args))🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/local_executor.py` around lines 241 - 260, The docker_args list currently includes resolved environment pairs from task_spec.environment and is logged verbatim by logger.debug('Docker command: %s', ' '.join(docker_args)); change this so secrets are redacted before logging by creating a sanitized copy of docker_args that replaces any '-e KEY=value' entries (or items following '-e') with '-e KEY=REDACTED' (or only '-e KEY') and log that sanitized list instead; locate the block that builds docker_args (references: task_spec.environment, docker_args, resolved_value, logger.debug) and use the sanitized copy in the debug call rather than the original docker_args.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/utils/local_executor.py`:
- Around line 162-173: Local execution currently ignores TaskSpec.privileged and
TaskSpec.hostNetwork causing workflows to run with different semantics; update
the local executor to reject these flags like other unsupported features by
checking task_spec.privileged and task_spec.hostNetwork and appending
descriptive messages to the unsupported_features list (use f'Task
"{task_spec.name}": privileged containers are not supported by local executor'
and f'Task "{task_spec.name}": hostNetwork is not supported by local executor')
so callers see and block workflows that rely on those settings; locate checks
around unsupported_features and task_spec in the local_executor code and add
these two checks.
- Around line 295-300: The current Jinja guard in local_executor.py uses the
wrong block marker '{%%' so blocks like '{% ... %}' slip past; update the
template_markers tuple used where spec_text is checked (the template_markers
variable and the any(marker in spec_text for marker in template_markers) check)
to look for the correct Jinja start markers (use '{%' and '{#' and keep
'default-values'), then run the existing any(...) check and keep the same
ValueError raise path so templated specs are rejected earlier with the clear
message.
- Line 2: Add a local pylint suppression for the long SPDX copyright header by
inserting a single-line comment "# pylint: disable=line-too-long" immediately
after the SPDX header (the long copyright line at the top of
src/utils/local_executor.py) so the file-level long-line rule is silenced for
that header only; ensure the comment is placed directly below the header and
does not alter any other lines or logic.
- Around line 208-213: The _task_gpu_count function currently looks up resources
via spec.resources[task_spec.resource] but TaskSpec exposes resources directly;
change _task_gpu_count to use task_spec.resources (the ResourceSpec object) to
read the gpu attribute (keep returning 0 if missing). Also ensure the Docker GPU
CLI argument construction used in subprocess.run keeps the literal quotes around
the device string (pass a single argv element containing the quoted string,
e.g., '"device=0,1"', so Docker receives the quoted value for --gpus).
---
Duplicate comments:
In `@src/utils/local_executor.py`:
- Around line 91-114: The loop exits as success when no ready tasks remain even
if some nodes never ran; update the run loop after ready =
self._find_ready_tasks() to detect a stalled DAG by comparing self._results keys
against all task names in self._task_nodes (or using a helper) and if there are
unreached tasks (e.g., due to a cycle or blocked dependency) log an error
listing those task names and return False instead of proceeding to success;
reference the methods/attributes _find_ready_tasks, _results, _task_nodes, and
leave existing failure handling via _cancel_downstream/_run_task intact.
- Around line 286-290: The current run_workflow_locally function always removes
work_dir on success which can delete caller-supplied folders; modify
run_workflow_locally to track whether the helper created the directory (e.g.,
set created_temp_dir = True when assigning work_dir = tempfile.mkdtemp(...)) and
only call shutil.rmtree(work_dir) at cleanup when created_temp_dir is True and
keep_work_dir is False; ensure all cleanup branches (including the success path
and exception paths around the existing logic that references work_dir and
keep_work_dir) use this flag so externally provided work_dir is never deleted by
the helper.
- Around line 241-260: The docker_args list currently includes resolved
environment pairs from task_spec.environment and is logged verbatim by
logger.debug('Docker command: %s', ' '.join(docker_args)); change this so
secrets are redacted before logging by creating a sanitized copy of docker_args
that replaces any '-e KEY=value' entries (or items following '-e') with '-e
KEY=REDACTED' (or only '-e KEY') and log that sanitized list instead; locate the
block that builds docker_args (references: task_spec.environment, docker_args,
resolved_value, logger.debug) and use the sanitized copy in the debug call
rather than the original docker_args.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: e830cb69-d490-4557-9c39-1898c8b721c2
📒 Files selected for processing (1)
src/utils/local_executor.py
- Adjusted the handling of the resolved_command to correctly set the entrypoint and append arguments. - Ensured that the first element of resolved_command is used as the entrypoint while the rest are appended to the Docker command.
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (5)
src/utils/local_executor.py (5)
91-114:⚠️ Potential issue | 🔴 CriticalStalled/cyclic workflows incorrectly report success.
If the DAG contains a cycle or no tasks are initially ready,
readyis empty on the first pass, the while loop never executes,_resultsstays empty, and the method falls through to the success log at line 113 even though no tasks ran.Proposed fix to detect stalled workflows
while ready: for task_name in ready: node = self._task_nodes[task_name] logger.info('--- Running task: %s (image: %s) ---', task_name, node.spec.image) result = self._run_task(node, spec) self._results[task_name] = result if result.exit_code != 0: logger.error('Task "%s" failed with exit code %d', task_name, result.exit_code) self._cancel_downstream(task_name) return False logger.info('Task "%s" completed successfully', task_name) ready = self._find_ready_tasks() + unresolved_tasks = sorted(set(self._task_nodes) - set(self._results)) + if unresolved_tasks: + logger.error( + 'Workflow "%s" stalled with unresolved tasks (possible cycle): %s', + spec.name, + ', '.join(unresolved_tasks), + ) + return False + failed = [name for name, r in self._results.items() if r.exit_code != 0]🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/local_executor.py` around lines 91 - 114, The run loop can silently succeed when no tasks ever become ready (e.g., cycles) because ready is empty and _results stays empty; modify the logic in the method using _find_ready_tasks, _task_nodes and _results to detect stalled workflows: if ready is empty but there remain tasks not in _results (i.e., pending tasks exist), log an error indicating a stalled/cyclic workflow (include spec.name and the list of pending task names) and return False instead of falling through to success; alternatively detect lack of progress between iterations (no newly completed tasks) and treat that as failure, calling the same error path (and keep existing _cancel_downstream usage where appropriate).
297-302:⚠️ Potential issue | 🟡 MinorFix the Jinja block marker check.
'{%%'never matches a normal{% ... %}block, so templated specs skip this guard and fail later with unclear errors.- template_markers = ('{%%', '{#', 'default-values') + template_markers = ('{%', '{#', 'default-values')🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/local_executor.py` around lines 297 - 302, The guard that detects Jinja templates is using the wrong marker '{%%' so it never matches normal Jinja block syntax and lets templated specs through; update the template_markers used when scanning spec_text (the tuple named template_markers near the ValueError raise) to include the correct block marker '{%' (and also add '{{' for expression markers and keep '{#' and 'default-values') so the any(marker in spec_text for marker in template_markers) check correctly detects Jinja templates and triggers the existing ValueError path.
162-173:⚠️ Potential issue | 🟠 MajorMissing validation for
privilegedandhostNetworkflags.
TaskSpecexposesprivilegedandhostNetworkfields, but local execution ignores them. Workflows relying on either setting will be accepted but run with different container semantics than the cluster path.Proposed fix
if task_spec.volumeMounts: unsupported_features.append( f'Task "{task_spec.name}": volumeMounts require cluster-level host paths') + + if task_spec.privileged: + unsupported_features.append( + f'Task "{task_spec.name}": privileged mode is not supported in local execution') + + if task_spec.hostNetwork: + unsupported_features.append( + f'Task "{task_spec.name}": hostNetwork is not supported in local execution')🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/local_executor.py` around lines 162 - 173, local_executor currently ignores TaskSpec.privileged and TaskSpec.hostNetwork, so workflows with those flags are accepted but executed with different semantics; update the validation in local_executor.py where unsupported_features is built (the block handling task_spec.credentials/checkpoint/volumeMounts) to also detect task_spec.privileged and task_spec.hostNetwork and append clear messages like f'Task "{task_spec.name}": privileged requires cluster-level privileges' and f'Task "{task_spec.name}": hostNetwork requires cluster networking' so these workflows are rejected for local execution the same way other unsupported features are.
288-313:⚠️ Potential issue | 🔴 CriticalDon't delete caller-supplied work directories.
A successful run currently
rmtree()s anywork_dir, not just the temporary directory created inside this helper. If the user points--work-dirat an existing folder, it will be wiped after a normal success path.Proposed fix
def run_workflow_locally(spec_path: str, work_dir: str | None = None, keep_work_dir: bool = False) -> bool: + created_temp_work_dir = work_dir is None if work_dir is None: work_dir = tempfile.mkdtemp(prefix='osmo-local-') logger.info('Using temporary work directory: %s', work_dir) with open(spec_path) as f: spec_text = f.read() template_markers = ('{%', '{#', 'default-values') if any(marker in spec_text for marker in template_markers): raise ValueError( 'This spec uses Jinja templates which require server-side expansion.\n' 'Run "osmo workflow submit --dry-run -f <spec>" first to get the expanded spec,\n' 'then save that output and run it locally.') executor = LocalExecutor(work_dir=work_dir, keep_work_dir=keep_work_dir) spec = executor.load_spec(spec_text) success = executor.execute(spec) - if not keep_work_dir and success: + if created_temp_work_dir and not keep_work_dir and success: logger.info('Cleaning up work directory: %s', work_dir) shutil.rmtree(work_dir, ignore_errors=True) elif not success: logger.info('Work directory preserved for debugging: %s', work_dir) return success🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/local_executor.py` around lines 288 - 313, The helper run_workflow_locally currently always deletes work_dir on success which can wipe a caller-supplied directory; change it to only remove the directory when it was created inside this function (i.e., when you called tempfile.mkdtemp). Add a flag (e.g., created_temp = True/False) set to True right after tempfile.mkdtemp and only call shutil.rmtree(work_dir, ...) when created_temp is True and keep_work_dir is False and execution succeeded; leave existing logging behavior for preserved dirs and ensure created_temp is referenced around the shutil.rmtree and logging so externally provided work_dir is never removed.
262-262:⚠️ Potential issue | 🟠 MajorAvoid logging environment variable values that may contain secrets.
docker_argscontains fully expanded-e KEY=valuepairs from the workflow spec'senvironmentfield. The debug log prints them verbatim, potentially exposing tokens or passwords.Proposed fix to redact environment values
- logger.debug('Docker command: %s', ' '.join(docker_args)) + redacted_args = list(docker_args) + for i, arg in enumerate(redacted_args[:-1]): + if arg == '-e': + key = redacted_args[i + 1].split('=', 1)[0] + redacted_args[i + 1] = f'{key}=***' + logger.debug('Docker command: %s', ' '.join(redacted_args))🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/local_executor.py` at line 262, The debug log prints the full docker_args (including expanded '-e KEY=value' entries) which may expose secrets; update the logger.debug call to log a redacted copy instead: create a redacted_args from docker_args by replacing environment-value parts (e.g. tokens matching '-e...=value' or strings containing '-e' and '=') with just the key and a placeholder (e.g. '-e KEY=REDACTED') and then log ' '.join(redacted_args) instead of the original; use the existing docker_args variable and a small regex or token-based transform (and add import re if needed) where logger.debug('Docker command: %s', ' '.join(docker_args)) is currently called.
🧹 Nitpick comments (1)
src/utils/local_executor.py (1)
264-269: Subprocess call with spec-derived arguments is acceptable for local execution.Static analysis flags S603 (subprocess with untrusted input). However, this is a local execution tool where the user explicitly provides the workflow spec. The trust boundary is appropriate - users running
osmo local runare intentionally executing containers defined in their own spec files.Consider adding a brief comment noting this design decision for future maintainers.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/local_executor.py` around lines 264 - 269, Add a short comment above the subprocess.run call in the local_executor (around the docker_args / subprocess.run usage) explaining that docker_args are derived from the user-provided workflow spec and thus this is an intentional local execution design decision (trusted boundary), and append a linter suppression token (e.g., "# nosec") to the subprocess.run line to document/quiet static analysis S603; reference the docker_args variable, the subprocess.run call, and the surrounding try/except that returns TaskResult.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/utils/local_executor.py`:
- Around line 236-239: The GPU device string is being built with literal double
quotes which become part of the argument passed to Docker; in the GPU handling
branch (see docker_args, _task_gpu_count and logger usage) remove the extra
quotes so the second element appended is device=0,1 (no surrounding ") by
changing the formatted string from including quotes to a plain
f'device={",".join(...) }' so subprocess.run receives the unquoted device list.
---
Duplicate comments:
In `@src/utils/local_executor.py`:
- Around line 91-114: The run loop can silently succeed when no tasks ever
become ready (e.g., cycles) because ready is empty and _results stays empty;
modify the logic in the method using _find_ready_tasks, _task_nodes and _results
to detect stalled workflows: if ready is empty but there remain tasks not in
_results (i.e., pending tasks exist), log an error indicating a stalled/cyclic
workflow (include spec.name and the list of pending task names) and return False
instead of falling through to success; alternatively detect lack of progress
between iterations (no newly completed tasks) and treat that as failure, calling
the same error path (and keep existing _cancel_downstream usage where
appropriate).
- Around line 297-302: The guard that detects Jinja templates is using the wrong
marker '{%%' so it never matches normal Jinja block syntax and lets templated
specs through; update the template_markers used when scanning spec_text (the
tuple named template_markers near the ValueError raise) to include the correct
block marker '{%' (and also add '{{' for expression markers and keep '{#' and
'default-values') so the any(marker in spec_text for marker in template_markers)
check correctly detects Jinja templates and triggers the existing ValueError
path.
- Around line 162-173: local_executor currently ignores TaskSpec.privileged and
TaskSpec.hostNetwork, so workflows with those flags are accepted but executed
with different semantics; update the validation in local_executor.py where
unsupported_features is built (the block handling
task_spec.credentials/checkpoint/volumeMounts) to also detect
task_spec.privileged and task_spec.hostNetwork and append clear messages like
f'Task "{task_spec.name}": privileged requires cluster-level privileges' and
f'Task "{task_spec.name}": hostNetwork requires cluster networking' so these
workflows are rejected for local execution the same way other unsupported
features are.
- Around line 288-313: The helper run_workflow_locally currently always deletes
work_dir on success which can wipe a caller-supplied directory; change it to
only remove the directory when it was created inside this function (i.e., when
you called tempfile.mkdtemp). Add a flag (e.g., created_temp = True/False) set
to True right after tempfile.mkdtemp and only call shutil.rmtree(work_dir, ...)
when created_temp is True and keep_work_dir is False and execution succeeded;
leave existing logging behavior for preserved dirs and ensure created_temp is
referenced around the shutil.rmtree and logging so externally provided work_dir
is never removed.
- Line 262: The debug log prints the full docker_args (including expanded '-e
KEY=value' entries) which may expose secrets; update the logger.debug call to
log a redacted copy instead: create a redacted_args from docker_args by
replacing environment-value parts (e.g. tokens matching '-e...=value' or strings
containing '-e' and '=') with just the key and a placeholder (e.g. '-e
KEY=REDACTED') and then log ' '.join(redacted_args) instead of the original; use
the existing docker_args variable and a small regex or token-based transform
(and add import re if needed) where logger.debug('Docker command: %s', '
'.join(docker_args)) is currently called.
---
Nitpick comments:
In `@src/utils/local_executor.py`:
- Around line 264-269: Add a short comment above the subprocess.run call in the
local_executor (around the docker_args / subprocess.run usage) explaining that
docker_args are derived from the user-provided workflow spec and thus this is an
intentional local execution design decision (trusted boundary), and append a
linter suppression token (e.g., "# nosec") to the subprocess.run line to
document/quiet static analysis S603; reference the docker_args variable, the
subprocess.run call, and the surrounding try/except that returns TaskResult.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: bf7af37f-080d-422d-91a5-301615578ab8
📒 Files selected for processing (1)
src/utils/local_executor.py
- Introduced `--resume` and `--from-step` options in the CLI to allow resuming previous runs. - Implemented state management in `LocalExecutor` to save and restore task results. - Enhanced logging to provide feedback on skipped tasks and remaining tasks during resumption. - Added GPU detection improvements to handle scenarios where requested GPUs are unavailable.
- Added `docker_cmd` parameter to `run_workflow_locally` for customizable Docker command execution. - Improved logging to redact sensitive information in Docker command arguments. - Implemented error handling for unexecuted tasks in `LocalExecutor` to detect potential workflow stalls.
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
src/cli/local.py (1)
73-88: Consider markingservice_clientas unused.The
_run_localfunction receivesservice_clientper the CLI dispatcher contract (seesrc/cli/cli.py:118) but doesn't use it since local execution bypasses the OSMO service. Consider adding an underscore prefix or a brief comment to signal this is intentional:Suggested change
-def _run_local(service_client, args: argparse.Namespace): +def _run_local(_service_client, args: argparse.Namespace): # service_client unused for local runs🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cli/local.py` around lines 73 - 88, The _run_local function accepts a service_client parameter that is intentionally unused; mark it as unused by renaming it to _service_client or _ (or add an inline comment like "# service_client unused for local execution") so linters and readers understand this is deliberate; update the function signature for _run_local and any internal references accordingly to avoid linter warnings while preserving the CLI dispatcher contract.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/cli/local.py`:
- Around line 1-17: The SPDX header docstring at the top of src/cli/local.py
contains lines exceeding 100 characters; add a module-level pylint suppression
for line-too-long to the file so the SPDX lines are allowed unwrapped. Insert a
single "# pylint: disable=line-too-long" comment near the top of the module
(e.g., immediately above or below the existing triple-quoted SPDX docstring) so
the header is exempt from the linter without modifying the license text.
---
Nitpick comments:
In `@src/cli/local.py`:
- Around line 73-88: The _run_local function accepts a service_client parameter
that is intentionally unused; mark it as unused by renaming it to
_service_client or _ (or add an inline comment like "# service_client unused for
local execution") so linters and readers understand this is deliberate; update
the function signature for _run_local and any internal references accordingly to
avoid linter warnings while preserving the CLI dispatcher contract.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: d904529c-e0cd-4ea4-b9f7-ac507838a76d
📒 Files selected for processing (4)
.gitignoresrc/cli/local.pysrc/utils/local_executor.pysrc/utils/tests/BUILD
✅ Files skipped from review due to trivial changes (2)
- src/utils/tests/BUILD
- .gitignore
- Added detailed docstrings to functions and classes in `local.py` and `local_executor.py` to improve code readability and maintainability. - Updated test cases in `test_local_executor.py` with descriptive comments to clarify the purpose of each test. - Ensured consistency in documentation style across the codebase.
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (3)
src/utils/tests/test_local_executor.py (2)
159-160: Prefer specific exception types over bareException.The tests at lines 159, 179, and 189 catch bare
Exception. While this works, asserting a more specific exception type (e.g.,ValueErrororpydantic.ValidationError) would make test failures more informative and verify the correct error path.This is a minor improvement for test quality.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/tests/test_local_executor.py` around lines 159 - 160, Replace the three tests that use self.assertRaises(Exception) to assert a more specific error type when calling executor.load_spec; locate the assertions in the test file where executor.load_spec(spec_text) is expected to fail and change the raised type to the appropriate specific exception (e.g., ValueError or pydantic.ValidationError) that the load_spec implementation actually raises so the tests validate the correct error path.
1-17: Consider adding pylint suppression for long copyright header.Similar to the production file, line 2 exceeds 100 characters. If the same linting rules apply to test files, add the suppression comment.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/tests/test_local_executor.py` around lines 1 - 17, Add the same pylint suppression for long lines at the top of this test module as in production: insert a module-level pylint disable for "line-too-long" (e.g., the same "# pylint: disable=line-too-long" comment used in the production file) so the long copyright/header string in test_local_executor.py won't trigger the linter.src/utils/local_executor.py (1)
182-183: Specify explicit encoding for file operations.Several
open()calls lack an explicit encoding parameter. While Python 3 defaults to the locale encoding, explicitly specifyingencoding='utf-8'ensures consistent behavior across systems.Affected locations: lines 182, 189, 351, 450.
Example fix
- with open(self._state_file_path, 'w') as f: + with open(self._state_file_path, 'w', encoding='utf-8') as f: json.dump(state, f, indent=2)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/local_executor.py` around lines 182 - 183, Update all open() calls in src/utils/local_executor.py to specify encoding='utf-8' to ensure consistent file encoding; for example, in the state writer inside the LocalExecutor class where you call with open(self._state_file_path, 'w') as f: json.dump(...), add encoding='utf-8', and similarly update the other open() calls referenced (the ones around the blocks using self._state_file_path, the read/write at the region near line ~351, and the open call near line ~450) so all reads and writes use open(..., encoding='utf-8').
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/utils/tests/test_local_executor.py`:
- Around line 759-774: The test_jinja_block_detected test currently uses the
nonstandard marker '{%% ... %%}' which mirrors an existing bug; update the spec
string to use real Jinja block syntax '{% ... %}' so the test detects standard
Jinja tags; locate the test_jinja_block_detected function and the temporary spec
created for run_workflow_locally and replace occurrences of '{%%' and '%%}' with
'{%' and '%}' respectively, keeping the rest of the assertion flow unchanged.
---
Nitpick comments:
In `@src/utils/local_executor.py`:
- Around line 182-183: Update all open() calls in src/utils/local_executor.py to
specify encoding='utf-8' to ensure consistent file encoding; for example, in the
state writer inside the LocalExecutor class where you call with
open(self._state_file_path, 'w') as f: json.dump(...), add encoding='utf-8', and
similarly update the other open() calls referenced (the ones around the blocks
using self._state_file_path, the read/write at the region near line ~351, and
the open call near line ~450) so all reads and writes use open(...,
encoding='utf-8').
In `@src/utils/tests/test_local_executor.py`:
- Around line 159-160: Replace the three tests that use
self.assertRaises(Exception) to assert a more specific error type when calling
executor.load_spec; locate the assertions in the test file where
executor.load_spec(spec_text) is expected to fail and change the raised type to
the appropriate specific exception (e.g., ValueError or
pydantic.ValidationError) that the load_spec implementation actually raises so
the tests validate the correct error path.
- Around line 1-17: Add the same pylint suppression for long lines at the top of
this test module as in production: insert a module-level pylint disable for
"line-too-long" (e.g., the same "# pylint: disable=line-too-long" comment used
in the production file) so the long copyright/header string in
test_local_executor.py won't trigger the linter.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 6eefeced-a5c1-410f-95df-3ce162b20399
📒 Files selected for processing (3)
src/cli/local.pysrc/utils/local_executor.pysrc/utils/tests/test_local_executor.py
🚧 Files skipped from review as they are similar to previous changes (1)
- src/cli/local.py
- Updated file operations in `local_executor.py` to explicitly use UTF-8 encoding when reading and writing files, ensuring better compatibility with various text formats. - Adjusted exception handling in `test_local_executor.py` to raise `ValueError` instead of a generic `Exception` for clearer error reporting. - Modified test documentation to reflect the correct Jinja block syntax in error messages.
|
@coderabbitai resume |
✅ Actions performedReviews resumed. |
There was a problem hiding this comment.
Actionable comments posted: 3
♻️ Duplicate comments (4)
src/utils/local_executor.py (3)
280-290:⚠️ Potential issue | 🟠 MajorReject
privilegedandhostNetworkin local mode.Local execution currently accepts these flags but ignores them, so the same workflow can be accepted here with materially different container semantics than on the cluster. They should be added to
unsupported_featuresalongsidevolumeMounts.Suggested fix
if task_spec.volumeMounts: unsupported_features.append( f'Task "{task_spec.name}": volumeMounts require cluster-level host paths') + if task_spec.privileged: + unsupported_features.append( + f'Task "{task_spec.name}": privileged containers are not supported by local executor') + if task_spec.hostNetwork: + unsupported_features.append( + f'Task "{task_spec.name}": hostNetwork is not supported by local executor')🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/local_executor.py` around lines 280 - 290, Add checks that reject task_spec.privileged and task_spec.hostNetwork in local mode by appending explanatory strings to the existing unsupported_features list (same style as the existing checks for volumeMounts, checkpoint, and credentials). Locate the block that inspects task_spec (the code that appends to unsupported_features for task_spec.volumeMounts / task_spec.checkpoint / task_spec.credentials) and add two entries: one like f'Task "{task_spec.name}": privileged containers are not supported in local mode' for task_spec.privileged, and one like f'Task "{task_spec.name}": hostNetwork is not supported in local mode' for task_spec.hostNetwork, so these flags are explicitly rejected rather than ignored.
446-468:⚠️ Potential issue | 🔴 CriticalDon't delete caller-supplied
work_diron success.This helper still
rmtree()s any successfulwork_dir, even when it came from--work-dir. That can wipe user data after a normal run; only the temporary directory created inside this function should be auto-cleaned. This also means the current success-cleanup test is asserting the wrong contract.Suggested fix
- if work_dir is None: + created_temp_work_dir = work_dir is None + if created_temp_work_dir: work_dir = tempfile.mkdtemp(prefix='osmo-local-') logger.info('Using temporary work directory: %s', work_dir) @@ - if not keep_work_dir and success: + if created_temp_work_dir and not keep_work_dir and success: logger.info('Cleaning up work directory: %s', work_dir) shutil.rmtree(work_dir, ignore_errors=True)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/local_executor.py` around lines 446 - 468, The code currently always deletes work_dir on success even when the caller supplied it; modify the logic that creates the temp dir to record ownership (e.g., set a flag like created_work_dir = True only when tempfile.mkdtemp(...) is called), and change the cleanup condition to only rmtree when created_work_dir is True (and not keep_work_dir and success). Update references around LocalExecutor(...), spec = executor.load_spec(...), and the final cleanup block to use that ownership flag and adjust the success-cleanup test to assert that caller-supplied work_dir is preserved.
1-17:⚠️ Potential issue | 🟡 MinorAdd the local
pylintsuppression for the SPDX header.Line 2 is still over the repo's 100-character limit, so this new module needs the file-level
line-too-longsuppression instead of reflowing the copyright text.As per coding guidelines, "If copyright lines exceed 100 characters, add
# pylint: disable=line-too-longcomment instead of breaking into multiple lines".🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/local_executor.py` around lines 1 - 17, Add a file-level pylint suppression for the SPDX header in this module by inserting a "# pylint: disable=line-too-long" comment at the top of local_executor.py (as a module-level suppression, e.g., immediately before or immediately after the module docstring) so the long copyright line is exempt from the 100-character rule.src/cli/local.py (1)
1-17:⚠️ Potential issue | 🟡 MinorAdd the local
pylintsuppression for the SPDX header.Line 2 is still over the repo's 100-character limit, so this new CLI module needs the file-level
line-too-longsuppression.As per coding guidelines, "If copyright lines exceed 100 characters, add
# pylint: disable=line-too-longcomment instead of breaking into multiple lines".🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cli/local.py` around lines 1 - 17, The SPDX header exceeds the 100-character pylint limit; add a file-level pylint suppression for line-too-long in this module by placing a "# pylint: disable=line-too-long" comment at the top of src/cli/local.py (immediately after the module docstring or as the first non-docstring line) so the long copyright/SPDX lines are ignored by pylint; ensure the comment targets "line-too-long" specifically and avoid modifying the existing SPDX text.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/cli/local.py`:
- Around line 76-87: The try/except should also catch filesystem-related errors
from local_executor.run_workflow_locally (e.g., FileNotFoundError and
PermissionError) and handle them the same way as ValueError: print the error
message to stderr and sys.exit(1). Update the except block around the call to
local_executor.run_workflow_locally to catch FileNotFoundError and
PermissionError (either as separate except clauses or combined with the existing
ValueError) and ensure the handler prints f'Error: {error}' to sys.stderr and
exits with code 1.
In `@src/utils/local_executor.py`:
- Around line 116-124: Reset the executor state and ensure task output dirs are
cleaned before scheduling: in execute() clear self._results at the start, then
update _setup_directories() (or call a small helper before scheduling) to remove
and re-create per-task output directories when not doing a full resume; when
resume or from_step is set, call _restore_completed_tasks(from_step) first and
then delete output directories for tasks that are not in the restored completed
set so stale files can't be reused. Reference execute(), _setup_directories(),
_restore_completed_tasks(), and the self._results member when making these
changes.
In `@src/utils/tests/test_local_executor.py`:
- Around line 1-17: Add a file-level pylint suppression for the SPDX header in
the test module test_local_executor.py by inserting a module-level comment to
disable the "line-too-long" lint (i.e. the pylint disable for line-too-long)
near the top of the file so the long copyright line is allowed; ensure the
directive is present before or immediately after the file docstring so pylint
skips the long SPDX lines.
---
Duplicate comments:
In `@src/cli/local.py`:
- Around line 1-17: The SPDX header exceeds the 100-character pylint limit; add
a file-level pylint suppression for line-too-long in this module by placing a "#
pylint: disable=line-too-long" comment at the top of src/cli/local.py
(immediately after the module docstring or as the first non-docstring line) so
the long copyright/SPDX lines are ignored by pylint; ensure the comment targets
"line-too-long" specifically and avoid modifying the existing SPDX text.
In `@src/utils/local_executor.py`:
- Around line 280-290: Add checks that reject task_spec.privileged and
task_spec.hostNetwork in local mode by appending explanatory strings to the
existing unsupported_features list (same style as the existing checks for
volumeMounts, checkpoint, and credentials). Locate the block that inspects
task_spec (the code that appends to unsupported_features for
task_spec.volumeMounts / task_spec.checkpoint / task_spec.credentials) and add
two entries: one like f'Task "{task_spec.name}": privileged containers are not
supported in local mode' for task_spec.privileged, and one like f'Task
"{task_spec.name}": hostNetwork is not supported in local mode' for
task_spec.hostNetwork, so these flags are explicitly rejected rather than
ignored.
- Around line 446-468: The code currently always deletes work_dir on success
even when the caller supplied it; modify the logic that creates the temp dir to
record ownership (e.g., set a flag like created_work_dir = True only when
tempfile.mkdtemp(...) is called), and change the cleanup condition to only
rmtree when created_work_dir is True (and not keep_work_dir and success). Update
references around LocalExecutor(...), spec = executor.load_spec(...), and the
final cleanup block to use that ownership flag and adjust the success-cleanup
test to assert that caller-supplied work_dir is preserved.
- Around line 1-17: Add a file-level pylint suppression for the SPDX header in
this module by inserting a "# pylint: disable=line-too-long" comment at the top
of local_executor.py (as a module-level suppression, e.g., immediately before or
immediately after the module docstring) so the long copyright line is exempt
from the 100-character rule.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: cde946a5-8785-4b48-9d4f-4f11c1022077
📒 Files selected for processing (3)
src/cli/local.pysrc/utils/local_executor.pysrc/utils/tests/test_local_executor.py
…dules - Expanded exception handling in `local.py` to include `FileNotFoundError` and `PermissionError` for improved robustness. - Updated comments in `local_executor.py` to clarify unsupported features in local mode, specifically regarding privileged containers and host networking. - Modified test case in `test_local_executor.py` to ensure caller-supplied work directories are preserved on success, enhancing test accuracy.
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (1)
src/utils/local_executor.py (1)
116-124:⚠️ Potential issue | 🟠 MajorClear
self._resultsat the start ofexecute()to prevent stale state.
execute()builds the DAG and clears_task_nodes, but_resultsis never reset. If an executor instance is reused (e.g., for different specs or repeated runs), stale results from a previous execution could incorrectly mark tasks as completed—especially for tasks with the same name across different workflow specs.Proposed fix
def execute(self, spec: workflow_module.WorkflowSpec, resume: bool = False, from_step: str | None = None) -> bool: """Run all tasks in topological order, returning True if the entire workflow succeeds.""" + self._results.clear() self._build_dag(spec) self._validate_for_local(spec)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/local_executor.py` around lines 116 - 124, The execute() method can reuse stale state in the instance because self._results is not reset; modify execute (the method named execute in this class) to clear self._results (e.g., set to an empty dict) at the start of the method before DAG execution/restoration logic runs so prior run results don't mark tasks as completed, ensuring this reset happens before calling _build_dag, _validate_for_local, _setup_directories and before _restore_completed_tasks/from_step logic.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/utils/tests/test_local_executor.py`:
- Around line 1-17: The SPDX header block in the test file has lines over 100
characters and needs the pylint suppression; add a "# pylint:
disable=line-too-long" comment associated with the SPDX header (e.g.,
immediately after the header block or on the SPDX-License-Identifier line) so
pylint ignores the long copyright lines; target the existing SPDX header /
"SPDX-License-Identifier" lines in src/utils/tests/test_local_executor.py when
applying this change.
---
Duplicate comments:
In `@src/utils/local_executor.py`:
- Around line 116-124: The execute() method can reuse stale state in the
instance because self._results is not reset; modify execute (the method named
execute in this class) to clear self._results (e.g., set to an empty dict) at
the start of the method before DAG execution/restoration logic runs so prior run
results don't mark tasks as completed, ensuring this reset happens before
calling _build_dag, _validate_for_local, _setup_directories and before
_restore_completed_tasks/from_step logic.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 6ff38215-d7e5-45ad-a464-77e6c436e296
📒 Files selected for processing (3)
src/cli/local.pysrc/utils/local_executor.pysrc/utils/tests/test_local_executor.py
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
src/utils/tests/test_local_executor.py (1)
580-746: Cover the remaining_validate_for_local()rejection branches.
src/utils/local_executor.py:262-303also rejectscredentials,checkpoint,volumeMounts,privileged, andhostNetwork, but this suite only exercises dataset/URL inputs/outputs plus the allowedfiles/environmentpaths. A smallsubTesttable for the remaining fields would keep the unsupported-feature matrix from drifting untested.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/tests/test_local_executor.py` around lines 580 - 746, Add unit tests that exercise the remaining rejection branches in LocalExecutor._validate_for_local by creating small spec variants that include each unsupported field: credentials, checkpoint, volumeMounts, privileged, and hostNetwork; for each field use a subTest row that loads the spec via LocalExecutor._make_executor()/load_spec(), calls _build_dag(spec) and asserts _validate_for_local(spec) raises ValueError, and verify the raised message mentions the offending feature (e.g., assertIn('credentials', str(exc))) so all branches in _validate_for_local are covered.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/utils/tests/test_local_executor.py`:
- Around line 817-818: Move the negative-path tests out from under the
class-level `@unittest.skipUnless`(DOCKER_AVAILABLE, SKIP_DOCKER_MSG) decorator so
they run even when Docker is unavailable: locate the TestDockerExecution class
and extract the test methods test_docker_not_found_graceful_failure,
test_unsupported_spec_*, test_template_hello_world, and
test_nonexistent_file_raises into a non-Docker-gated test class (or place
method-level skips only on tests that actually call containers), since these
tests exercise early logic in run_workflow_locally() and _validate_for_local()
(invoked during executor.execute()) and do not require Docker; ensure the moved
tests still import/instantiate the same helper utilities and keep any
Docker-specific assertions confined to tests that truly start containers.
- Around line 1321-1326: The helper _run_cookbook_spec currently calls
self.skipTest when the cookbook spec file is missing; change this to a fail-fast
assertion by replacing the skip with self.assertTrue(os.path.exists(spec_path),
f'Cookbook file not found: {spec_path}') so missing data files cause a test
failure rather than silently skipping; update the logic inside
_run_cookbook_spec (referenced symbol) to assert existence before calling
run_workflow_locally.
---
Nitpick comments:
In `@src/utils/tests/test_local_executor.py`:
- Around line 580-746: Add unit tests that exercise the remaining rejection
branches in LocalExecutor._validate_for_local by creating small spec variants
that include each unsupported field: credentials, checkpoint, volumeMounts,
privileged, and hostNetwork; for each field use a subTest row that loads the
spec via LocalExecutor._make_executor()/load_spec(), calls _build_dag(spec) and
asserts _validate_for_local(spec) raises ValueError, and verify the raised
message mentions the offending feature (e.g., assertIn('credentials', str(exc)))
so all branches in _validate_for_local are covered.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: a22922da-486c-48bc-9539-e19174804a2d
📒 Files selected for processing (1)
src/utils/tests/test_local_executor.py
- Introduced `--shm-size` argument in the CLI for specifying shared memory size for GPU containers, defaulting to 16g. - Updated `LocalExecutor` to accept and utilize the shared memory size during Docker command construction. - Added unit tests to verify correct handling of shared memory size for both default and custom values in GPU tasks, ensuring no shared memory argument is included for non-GPU tasks.
- Created a new `tutorial_specs` filegroup in the `cookbook/tutorials/BUILD` to include YAML specifications. - Updated the `BUILD` file in `src/utils/tests` to include the new `tutorial_specs` as data for local tests. - Added a new test class in `test_local_executor.py` to validate unsupported features in cookbook specifications, ensuring proper error handling for unsupported fields. - Implemented additional tests to verify that specific unsupported features are correctly rejected during local execution.
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (4)
src/cli/local.py (1)
1-3:⚠️ Potential issue | 🟡 MinorMove the pylint directive outside the docstring.
# pylint: disable=line-too-longis currently inside the triple-quoted string, so pylint will not treat it as a directive.Suggested fix
+# pylint: disable=line-too-long """ -SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # pylint: disable=line-too-long +SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.As per coding guidelines, "If copyright lines exceed 100 characters, add
# pylint: disable=line-too-longcomment instead of breaking into multiple lines".🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cli/local.py` around lines 1 - 3, The pylint directive is currently inside the module docstring (the top triple-quoted string) so it is ignored; move the `# pylint: disable=line-too-long` comment outside the docstring (immediately after the closing triple quotes or at the top of the file) so pylint recognizes it, leaving the copyright text unchanged and keeping the directive adjacent to the module docstring for clarity.src/utils/tests/test_local_executor.py (1)
1-3:⚠️ Potential issue | 🟡 MinorPlace the pylint long-line suppression as a real comment, not inside the docstring.
Current placement won’t be interpreted by pylint.
Suggested fix
+# pylint: disable=line-too-long """ -SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # pylint: disable=line-too-long +SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.As per coding guidelines, "If copyright lines exceed 100 characters, add
# pylint: disable=line-too-longcomment instead of breaking into multiple lines".🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/tests/test_local_executor.py` around lines 1 - 3, The copyright header currently embeds "# pylint: disable=line-too-long" inside the module docstring, which pylint will ignore; remove that token from the docstring and place a real inline comment (e.g., "# pylint: disable=line-too-long") on its own line immediately above or below the docstring so the suppression is recognized; update the module-level string in src/utils/tests/test_local_executor.py accordingly.src/utils/local_executor.py (2)
120-129:⚠️ Potential issue | 🟠 MajorReset execution state and clean pending task outputs before scheduling.
execute()currently reusesself._resultsacross runs and does not purge stale output directories for tasks that will run again. This can incorrectly skip tasks or consume stale artifacts.Suggested fix
def execute(self, spec: workflow_module.WorkflowSpec, resume: bool = False, from_step: str | None = None) -> bool: """Run all tasks in topological order, returning True if the entire workflow succeeds.""" + self._results.clear() self._build_dag(spec) self._validate_for_local(spec) - self._setup_directories() if resume or from_step: self._restore_completed_tasks(from_step) + + self._setup_directories() + for task_name in self._task_nodes: + if task_name in self._results: + continue + task_dir = os.path.join(self._work_dir, task_name) + shutil.rmtree(task_dir, ignore_errors=True) + os.makedirs(os.path.join(task_dir, 'output'), exist_ok=True)Also applies to: 309-314
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/local_executor.py` around lines 120 - 129, The execute() method is reusing self._results and leaving stale task output directories, causing skipped or stale artifacts; fix by resetting execution state and cleaning pending outputs before scheduling: in execute (and the analogous flow used around the resume/from_step logic), clear or reinitialize self._results, clear any per-run state used to mark tasks as completed, and remove/clean output directories for tasks that will be re-run (i.e., tasks not restored as completed by _restore_completed_tasks) before creating the scheduling dag; ensure these changes integrate with _build_dag, _setup_directories and _restore_completed_tasks so restored tasks keep their outputs but any pending/rerun tasks have their outputs removed.
1-3:⚠️ Potential issue | 🟡 MinorMake the pylint suppression effective by moving it outside the module docstring.
The current inline text is part of the string literal and won’t suppress linting.
Suggested fix
+# pylint: disable=line-too-long """ -SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # pylint: disable=line-too-long +SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.As per coding guidelines, "If copyright lines exceed 100 characters, add
# pylint: disable=line-too-longcomment instead of breaking into multiple lines".🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/local_executor.py` around lines 1 - 3, The pylint suppression is currently inside the module docstring and therefore not effective; move the "# pylint: disable=line-too-long" token out of the triple-quoted string so it is a real comment. Specifically, edit the top of the module around the module docstring (the triple-quoted string containing the SPDX copyright text) to place the pylint disable comment on its own line either immediately above or immediately below the docstring so that the suppression applies.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/cli/local.py`:
- Around line 27-38: Add entries to the Codebase Structure section of AGENTS.md
documenting the new local-execution components: describe the new CLI subcommand
implemented by setup_parser in src/cli/local.py (the 'local' subcommand with its
nested 'run' action) and the utility module src/utils/local_executor.py (explain
its purpose, e.g., orchestrating local Docker workflow execution and any primary
class or function such as LocalExecutor or execute_locally). Keep the
description concise, include where to find usage examples or flags (mention
'run' subcommand), and note any high-level responsibilities so readers know
these files manage local workflow execution.
In `@src/utils/local_executor.py`:
- Around line 359-364: The code joins file_spec.path into files_dir without
sanitizing, allowing path traversal (e.g., ../../) to escape files_dir; update
the materialization logic (the loop over task_spec.files in local_executor.py
that calls self._substitute_tokens and computes host_path) to: 1) normalize and
resolve the target path using os.path.abspath/os.path.realpath for both
files_dir and the final host_path, 2) ensure the resolved host_path is inside
the resolved files_dir using os.path.commonpath (or os.path.commonprefix
carefully) and reject or raise an exception if it would escape, and 3) only
create/write files when the containment check passes; apply the same
sanitization/containment check to the other inline-file materialization site
(the similar block handling file_spec.path later in the file).
---
Duplicate comments:
In `@src/cli/local.py`:
- Around line 1-3: The pylint directive is currently inside the module docstring
(the top triple-quoted string) so it is ignored; move the `# pylint:
disable=line-too-long` comment outside the docstring (immediately after the
closing triple quotes or at the top of the file) so pylint recognizes it,
leaving the copyright text unchanged and keeping the directive adjacent to the
module docstring for clarity.
In `@src/utils/local_executor.py`:
- Around line 120-129: The execute() method is reusing self._results and leaving
stale task output directories, causing skipped or stale artifacts; fix by
resetting execution state and cleaning pending outputs before scheduling: in
execute (and the analogous flow used around the resume/from_step logic), clear
or reinitialize self._results, clear any per-run state used to mark tasks as
completed, and remove/clean output directories for tasks that will be re-run
(i.e., tasks not restored as completed by _restore_completed_tasks) before
creating the scheduling dag; ensure these changes integrate with _build_dag,
_setup_directories and _restore_completed_tasks so restored tasks keep their
outputs but any pending/rerun tasks have their outputs removed.
- Around line 1-3: The pylint suppression is currently inside the module
docstring and therefore not effective; move the "# pylint:
disable=line-too-long" token out of the triple-quoted string so it is a real
comment. Specifically, edit the top of the module around the module docstring
(the triple-quoted string containing the SPDX copyright text) to place the
pylint disable comment on its own line either immediately above or immediately
below the docstring so that the suppression applies.
In `@src/utils/tests/test_local_executor.py`:
- Around line 1-3: The copyright header currently embeds "# pylint:
disable=line-too-long" inside the module docstring, which pylint will ignore;
remove that token from the docstring and place a real inline comment (e.g., "#
pylint: disable=line-too-long") on its own line immediately above or below the
docstring so the suppression is recognized; update the module-level string in
src/utils/tests/test_local_executor.py accordingly.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 15fe56f2-ae20-4bf5-a292-03af00b2f491
📒 Files selected for processing (5)
cookbook/tutorials/BUILDsrc/cli/local.pysrc/utils/local_executor.pysrc/utils/tests/BUILDsrc/utils/tests/test_local_executor.py
✅ Files skipped from review due to trivial changes (1)
- cookbook/tutorials/BUILD
🚧 Files skipped from review as they are similar to previous changes (1)
- src/utils/tests/BUILD
…traversal - Enhanced the `LocalExecutor` class to validate file paths, ensuring they do not escape the task directory. This prevents potential security risks associated with directory traversal attacks. - Added unit tests in `test_local_executor.py` to verify that invalid file paths raise appropriate exceptions, while valid paths are accepted without errors. - Updated documentation in `AGENTS.md` to include the new local executor functionality for Docker-based workflow execution.
There was a problem hiding this comment.
♻️ Duplicate comments (2)
src/utils/local_executor.py (2)
120-128:⚠️ Potential issue | 🟠 MajorClear
_resultsat the start ofexecute()to support executor reuse.If
execute()is called multiple times on the sameLocalExecutorinstance (e.g., running different specs or retrying), stale results from the previous run persist in_results. This can cause tasks to be incorrectly skipped or produce wrong scheduling decisions. The_build_dag()method clears_task_nodesbut_resultsis never reset.Proposed fix
def execute(self, spec: workflow_module.WorkflowSpec, resume: bool = False, from_step: str | None = None) -> bool: """Run all tasks in topological order, returning True if the entire workflow succeeds.""" + self._results.clear() self._build_dag(spec) self._validate_for_local(spec)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/local_executor.py` around lines 120 - 128, The executor retains stale task outcomes because _results is not reset when execute() runs again; modify LocalExecutor.execute to clear the instance result store (reset self._results) at the start of the method before calling _build_dag/_validate_for_local so previous-run results won't affect scheduling or skipping; ensure this reset happens prior to the resume/from_step logic so _restore_completed_tasks can repopulate _results correctly when resuming.
385-388:⚠️ Potential issue | 🟠 MajorRemove literal quotes from GPU device argument.
When using
subprocess.runwith a list (noshell=True), arguments are passed directly without shell interpretation. The literal double-quotes in the f-string become part of the argument value passed to Docker, e.g.,"device=0,1"instead ofdevice=0,1. Docker's--gpusflag expects the unquoted value when receiving arguments via direct invocation.Proposed fix
elif gpu_count > available: logger.warning( 'Task "%s" requests %d GPU(s) but only %d available — running with %d GPU(s)', node.name, gpu_count, available, available) - docker_args += ['--gpus', f'"device={",".join(str(i) for i in range(available))}"'] + docker_args += ['--gpus', f'device={",".join(str(i) for i in range(available))}'] else: - docker_args += ['--gpus', f'"device={",".join(str(i) for i in range(gpu_count))}"'] + docker_args += ['--gpus', f'device={",".join(str(i) for i in range(gpu_count))}']🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/local_executor.py` around lines 385 - 388, The GPU device string is being built with literal double-quotes so docker_args gets entries like '"device=0,1"' which pass the quotes into Docker; update the two places that append to docker_args (the branches that use available and gpu_count) to remove the surrounding double-quotes in the f-strings so they produce device=0,1 (e.g., change f'"device={...}"' to f'device={...}'), keeping the rest of the list-based subprocess invocation and logger.info(node.name, gpu_count, available) unchanged.
🧹 Nitpick comments (1)
src/utils/tests/test_local_executor.py (1)
751-817: Consider making the class attribute immutable or documenting it's intentionally shared.The
_UNSUPPORTED_SPECSdictionary is a mutable class attribute (RUF012). While this is safe since the tests only read from it, making it a module-level constant or usingtypes.MappingProxyTypewould better express the intent.Optional: Move to module level as a constant
+# Test data for unsupported feature validation +_UNSUPPORTED_SPECS = { + 'credentials': { + 'yaml': textwrap.dedent('''\ + ... + '''), + 'expected_substring': 'credentials', + }, + # ... rest of specs +} + + class TestValidateForLocalRemainingBranches(unittest.TestCase): """Verify that _validate_for_local rejects credentials, checkpoint, volumeMounts, privileged, and hostNetwork.""" - _UNSUPPORTED_SPECS = { - ... - } - def test_unsupported_fields_rejected(self): """Each unsupported task-level field is detected and rejected with a descriptive error.""" - for feature, case in self._UNSUPPORTED_SPECS.items(): + for feature, case in _UNSUPPORTED_SPECS.items():🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/tests/test_local_executor.py` around lines 751 - 817, The _UNSUPPORTED_SPECS class attribute is a mutable dict; make its immutability explicit by either moving it to module-level as a constant (e.g., UNSUPPORTED_SPECS) or wrapping it with types.MappingProxyType before assigning back to _UNSUPPORTED_SPECS so consumers cannot mutate it; update any references to the attribute accordingly and/or add a short comment documenting that this is intentionally shared/read-only.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@src/utils/local_executor.py`:
- Around line 120-128: The executor retains stale task outcomes because _results
is not reset when execute() runs again; modify LocalExecutor.execute to clear
the instance result store (reset self._results) at the start of the method
before calling _build_dag/_validate_for_local so previous-run results won't
affect scheduling or skipping; ensure this reset happens prior to the
resume/from_step logic so _restore_completed_tasks can repopulate _results
correctly when resuming.
- Around line 385-388: The GPU device string is being built with literal
double-quotes so docker_args gets entries like '"device=0,1"' which pass the
quotes into Docker; update the two places that append to docker_args (the
branches that use available and gpu_count) to remove the surrounding
double-quotes in the f-strings so they produce device=0,1 (e.g., change
f'"device={...}"' to f'device={...}'), keeping the rest of the list-based
subprocess invocation and logger.info(node.name, gpu_count, available)
unchanged.
---
Nitpick comments:
In `@src/utils/tests/test_local_executor.py`:
- Around line 751-817: The _UNSUPPORTED_SPECS class attribute is a mutable dict;
make its immutability explicit by either moving it to module-level as a constant
(e.g., UNSUPPORTED_SPECS) or wrapping it with types.MappingProxyType before
assigning back to _UNSUPPORTED_SPECS so consumers cannot mutate it; update any
references to the attribute accordingly and/or add a short comment documenting
that this is intentionally shared/read-only.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 08e56e8c-6b0c-47d6-a623-97e46ae520b2
📒 Files selected for processing (3)
AGENTS.mdsrc/utils/local_executor.pysrc/utils/tests/test_local_executor.py
- Updated the `LocalExecutor` class to remove unnecessary quotes around GPU device specifications in Docker command arguments, ensuring correct formatting. - Cleared previous results at the start of the `execute` method to prevent data carryover between executions.
There was a problem hiding this comment.
Actionable comments posted: 3
♻️ Duplicate comments (2)
src/utils/local_executor.py (2)
120-129:⚠️ Potential issue | 🟠 MajorRestore state before creating task output directories.
_setup_directories()recreates every output directory before_restore_completed_tasks()checksos.path.isdir(info['output_dir']), so resume can skip tasks against brand-new empty dirs. It also leaves stale outputs in place for rerun /from_steptasks.Also applies to: 204-209, 310-314
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/local_executor.py` around lines 120 - 129, The code currently calls _setup_directories() before _restore_completed_tasks(), causing freshly recreated output dirs to prevent detecting completed tasks; change the order in execute (and in the other places noted) so that _restore_completed_tasks(from_step) is invoked before _setup_directories(); search for all call sites where _setup_directories() precedes _restore_completed_tasks() and reorder them (e.g., inside the execute method and the other similar run/restore entrypoints) to ensure restoration of completed-task state happens prior to directory creation.
1-2:⚠️ Potential issue | 🟡 MinorMove the
pylintpragma out of the module string.
# pylint: disable=line-too-longis currently part of the docstring text, so the linter never sees it.Suggested fix
-""" -SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # pylint: disable=line-too-long +# pylint: disable=line-too-long +""" +SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.As per coding guidelines, "If copyright lines exceed 100 characters, add
# pylint: disable=line-too-longcomment instead of breaking into multiple lines".🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/local_executor.py` around lines 1 - 2, The module-level docstring currently contains the pylint pragma "# pylint: disable=line-too-long" as plain text; remove that pragma from inside the triple-quoted string and add it as a standalone comment immediately after the closing docstring (or at top of the file) so pylint will recognize it; locate the module docstring in local_executor.py and move the pragma out of the string into a top-level comment (keeping the copyright text unchanged).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/utils/local_executor.py`:
- Around line 466-489: The temp work_dir is created before validating spec_text,
causing leaked dirs on early failures; either move the work_dir creation (the
created_work_dir/work_dir logic) to after the Jinja/template check (i.e., after
reading spec_text and verifying template_markers) or wrap the creation +
subsequent processing in a try/finally that will call shutil.rmtree(work_dir,
ignore_errors=True) when created_work_dir is true and keep_work_dir is false,
ensuring cleanup on exceptions; update the code paths around created_work_dir,
work_dir, LocalExecutor(...) and executor.load_spec/executor.execute to use the
new placement or the finally cleanup.
- Around line 180-185: The current state serialization in local_executor.py only
writes task names, exit codes and output dirs into the state dict (constructed
from self._results), which allows stale outputs to be reused across different
workflow revisions; update the state written to .osmo-state.json to include a
workflow-level hash (or per-task signature) derived from the current workflow
spec (e.g., spec hash or task signatures) when creating the state dict, and add
logic in the resume/load path that reads that stored hash and compares it to the
current workflow hash, refusing to accept/resume from the saved state if the
hashes differ and returning a clear error; ensure the change touches the state
construction (where state = {...}) and the corresponding state load/validation
code paths referenced around lines 204-221 so mismatched workflows are rejected.
- Around line 373-392: The current logic only appends '--shm-size' when GPUs are
requested, so a user-specified self._shm_size is ignored for CPU-only tasks;
move the shm size handling out of the GPU branch in LocalExecutor (referencing
docker_args, _task_gpu_count, _detect_available_gpus, self._shm_size and
DEFAULT_SHM_SIZE) so that after GPU detection/argument construction you always
compute shm_size = self._shm_size or self.DEFAULT_SHM_SIZE and append
['--shm-size', shm_size] to docker_args; ensure the value is string-compatible
with the docker args.
---
Duplicate comments:
In `@src/utils/local_executor.py`:
- Around line 120-129: The code currently calls _setup_directories() before
_restore_completed_tasks(), causing freshly recreated output dirs to prevent
detecting completed tasks; change the order in execute (and in the other places
noted) so that _restore_completed_tasks(from_step) is invoked before
_setup_directories(); search for all call sites where _setup_directories()
precedes _restore_completed_tasks() and reorder them (e.g., inside the execute
method and the other similar run/restore entrypoints) to ensure restoration of
completed-task state happens prior to directory creation.
- Around line 1-2: The module-level docstring currently contains the pylint
pragma "# pylint: disable=line-too-long" as plain text; remove that pragma from
inside the triple-quoted string and add it as a standalone comment immediately
after the closing docstring (or at top of the file) so pylint will recognize it;
locate the module docstring in local_executor.py and move the pragma out of the
string into a top-level comment (keeping the copyright text unchanged).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: b1e87e8f-5942-4678-9825-714d120cd138
📒 Files selected for processing (1)
src/utils/local_executor.py
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/cli/BUILD (1)
30-50:⚠️ Potential issue | 🟠 MajorWire the new Docker Compose CLI into
cli_lib.This BUILD target includes
standalone.py, but the newsrc/cli/docker_compose.pymodule from this PR is still missing fromsrcs, and//src/utils:compose_executoris missing fromdeps. Ifmain_parser.pyimports that subcommand, Bazel builds/package assembly will fail or ship a CLI without the new command wired in.Proposed fix
srcs = [ "access_token.py", "app.py", "bucket.py", "config.py", "credential.py", "data.py", "dataset.py", + "docker_compose.py", "editor.py", "formatters.py", "standalone.py", "login.py", "main_parser.py", @@ "//src/lib/utils:validation", "//src/lib/utils:version", "//src/lib/utils:workflow", + "//src/utils:compose_executor", "//src/utils:standalone_executor", ], )Also applies to: 51-78
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cli/BUILD` around lines 30 - 50, The BUILD srcs list is missing the new CLI module and its runtime dependency; add "docker_compose.py" alongside "standalone.py" in the srcs array and add the dependency "//src/utils:compose_executor" to the deps for this target so the new Docker Compose subcommand imported by main_parser.py is compiled and packaged correctly; ensure any other identical srcs/deps blocks (lines 51-78 equivalent) are updated the same way to wire the new docker_compose module into the cli_lib target.
♻️ Duplicate comments (2)
src/utils/standalone_executor.py (2)
118-122:⚠️ Potential issue | 🟠 MajorReject non-mapping YAML before unpacking it.
yaml.safe_load()returnsNonefor an empty file and can also return scalars/lists. In those cases,VersionedWorkflowSpec(**raw)throwsTypeError, which skips the normalValueError/stderr path in the CLI and surfaces an internal traceback instead of a clean validation error.Proposed fix
def load_spec(self, spec_text: str) -> workflow_module.WorkflowSpec: """Parse raw YAML text into a validated WorkflowSpec via the versioned spec model.""" raw = yaml.safe_load(spec_text) + if not isinstance(raw, dict): + raise ValueError('Workflow spec must be a YAML mapping with a top-level "workflow" key') versioned = workflow_module.VersionedWorkflowSpec(**raw) return versioned.workflow🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/standalone_executor.py` around lines 118 - 122, The load_spec function should validate the YAML result before unpacking into VersionedWorkflowSpec: check the local variable raw returned by yaml.safe_load(spec_text) and if it is None or not an instance of dict/mapping, raise a ValueError with a clear message about expecting a mapping workflow spec; then only call workflow_module.VersionedWorkflowSpec(**raw) and return versioned.workflow. This ensures load_spec rejects empty/scalar/list YAML inputs instead of letting VersionedWorkflowSpec raise a TypeError.
567-583:⚠️ Potential issue | 🟡 MinorClean up auto-created work dirs when setup or execution raises.
Once
mkdtemp()succeeds, any exception fromStandaloneExecutor(...),load_spec(...), orexecute(...)leaksosmo-standalone-*whenkeep_work_diris false, because cleanup only runs on the normal success/failure path.Proposed fix
created_work_dir = work_dir is None if work_dir is None: work_dir = tempfile.mkdtemp(prefix='osmo-standalone-') logger.info('Using temporary work directory: %s', work_dir) - executor = StandaloneExecutor(work_dir=work_dir, keep_work_dir=keep_work_dir, - docker_cmd=docker_cmd, shm_size=shm_size) - spec = executor.load_spec(spec_text) - success = executor.execute(spec, resume=resume or from_step is not None, - from_step=from_step) + try: + executor = StandaloneExecutor(work_dir=work_dir, keep_work_dir=keep_work_dir, + docker_cmd=docker_cmd, shm_size=shm_size) + spec = executor.load_spec(spec_text) + success = executor.execute(spec, resume=resume or from_step is not None, + from_step=from_step) + except Exception: + if created_work_dir and not keep_work_dir: + shutil.rmtree(work_dir, ignore_errors=True) + raise🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/standalone_executor.py` around lines 567 - 583, The code currently creates a temp dir (mkdtemp) and then calls StandaloneExecutor(...), executor.load_spec(...), and executor.execute(...), but only cleans the dir on the normal success/failure path; wrap the executor creation, load_spec, and execute calls in a try/finally so that any exception triggers cleanup: after creating work_dir (when created_work_dir is True) ensure in the finally block that if not keep_work_dir the directory is removed with shutil.rmtree(work_dir, ignore_errors=True); keep the existing logging (info about using temp dir and preserved dir) but move the final cleanup logic into the finally to guarantee no leak even when StandaloneExecutor, load_spec, or execute raise.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/utils/compose_executor.py`:
- Around line 511-526: After creating a temporary work dir (created_work_dir)
but before constructing ComposeExecutor and calling ComposeExecutor.load_spec /
ComposeExecutor.execute, wrap the executor creation, spec loading and execution
in a try/finally so that if any exception occurs you still remove the
auto-created work dir when keep_work_dir is false; in the finally block check
created_work_dir and not keep_work_dir and then call shutil.rmtree(work_dir,
ignore_errors=True). Ensure variables referenced (created_work_dir, work_dir,
keep_work_dir, ComposeExecutor, load_spec, execute) are in scope and that
logging for preserved/cleaned up dirs is preserved or moved appropriately.
- Around line 86-95: The ComposeExecutor.execute override currently ignores the
resume and from_step flags from StandaloneExecutor.execute causing silent
behavior changes; update ComposeExecutor.execute(spec, resume=False,
from_step=None) to either implement resume semantics or explicitly reject
unsupported flags — e.g., at the top of the method check if resume or from_step
is set and raise a clear exception (ValueError or NotImplementedError)
indicating compose mode does not support resume/from_step yet, or implement the
resume logic by wiring resume/from_step into the DAG traversal in
_build_dag/_validate_for_compose/_generate_compose_file so execution begins at
the requested step; reference the execute method in ComposeExecutor and the
_build_dag/_generate_compose_file helpers to locate where to apply the change.
---
Outside diff comments:
In `@src/cli/BUILD`:
- Around line 30-50: The BUILD srcs list is missing the new CLI module and its
runtime dependency; add "docker_compose.py" alongside "standalone.py" in the
srcs array and add the dependency "//src/utils:compose_executor" to the deps for
this target so the new Docker Compose subcommand imported by main_parser.py is
compiled and packaged correctly; ensure any other identical srcs/deps blocks
(lines 51-78 equivalent) are updated the same way to wire the new docker_compose
module into the cli_lib target.
---
Duplicate comments:
In `@src/utils/standalone_executor.py`:
- Around line 118-122: The load_spec function should validate the YAML result
before unpacking into VersionedWorkflowSpec: check the local variable raw
returned by yaml.safe_load(spec_text) and if it is None or not an instance of
dict/mapping, raise a ValueError with a clear message about expecting a mapping
workflow spec; then only call workflow_module.VersionedWorkflowSpec(**raw) and
return versioned.workflow. This ensures load_spec rejects empty/scalar/list YAML
inputs instead of letting VersionedWorkflowSpec raise a TypeError.
- Around line 567-583: The code currently creates a temp dir (mkdtemp) and then
calls StandaloneExecutor(...), executor.load_spec(...), and
executor.execute(...), but only cleans the dir on the normal success/failure
path; wrap the executor creation, load_spec, and execute calls in a try/finally
so that any exception triggers cleanup: after creating work_dir (when
created_work_dir is True) ensure in the finally block that if not keep_work_dir
the directory is removed with shutil.rmtree(work_dir, ignore_errors=True); keep
the existing logging (info about using temp dir and preserved dir) but move the
final cleanup logic into the finally to guarantee no leak even when
StandaloneExecutor, load_spec, or execute raise.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 96a9a10d-7d76-4af0-8773-526ae76dc345
📒 Files selected for processing (11)
AGENTS.mdsrc/cli/BUILDsrc/cli/docker_compose.pysrc/cli/main_parser.pysrc/cli/standalone.pysrc/utils/BUILDsrc/utils/compose_executor.pysrc/utils/standalone_executor.pysrc/utils/tests/BUILDsrc/utils/tests/test_compose_executor.pysrc/utils/tests/test_standalone_executor.py
✅ Files skipped from review due to trivial changes (3)
- src/cli/main_parser.py
- src/utils/tests/BUILD
- AGENTS.md
…nd cleanup - Added error handling in `ComposeExecutor` to raise `NotImplementedError` for unsupported resume functionality. - Refactored `run_workflow_compose` and `run_workflow_standalone` to ensure proper cleanup of temporary work directories using a `try-finally` block. - Introduced validation in `StandaloneExecutor` to check for correct YAML mapping in workflow specifications, raising a `ValueError` for invalid formats. - Updated build configurations to include the new `docker_compose.py` file and the `compose_executor` library.
There was a problem hiding this comment.
♻️ Duplicate comments (3)
src/utils/standalone_executor.py (3)
462-468:⚠️ Potential issue | 🟠 MajorQuote multi-GPU selectors before passing
--gpusto Docker.Official NVIDIA/Docker examples use a quoted selector for specific devices, and the unquoted
device=0,1form is documented as failing withcannot set both Count and DeviceIDs. Here the argv element is emitted as plaindevice=0,1, so multi-GPU tasks can fail to start. (docs.nvidia.com)Suggested fix
- docker_args += ['--gpus', f'device={",".join(str(i) for i in range(available))}'] + docker_args += ['--gpus', f'"device={",".join(str(i) for i in range(available))}"'] else: - docker_args += ['--gpus', f'device={",".join(str(i) for i in range(gpu_count))}'] + docker_args += ['--gpus', f'"device={",".join(str(i) for i in range(gpu_count))}"']🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/standalone_executor.py` around lines 462 - 468, The multi-GPU selector passed to Docker is unquoted and can trigger "cannot set both Count and DeviceIDs" errors; update both places where docker_args is appended (the branch handling gpu_count > available and the else branch) so the selector value is wrapped in quotes (i.e., pass a quoted "device=0,1" string instead of device=0,1) when adding ['--gpus', ...], ensuring node.name/gpu_count/available logic remains the same.
417-423:⚠️ Potential issue | 🟠 MajorHonor inline
resources.gpuin_task_gpu_count().This helper only consults
spec.resources[task_spec.resource], so tasks that declare GPUs inline underresources:are treated as CPU-only.ComposeExecutor._build_compose_service()inherits the same bug through this helper.Suggested fix
def _task_gpu_count(self, task_spec: task_module.TaskSpec, spec: workflow_module.WorkflowSpec) -> int: """Return the number of GPUs requested by a task's resource spec, defaulting to 0.""" - resource_spec = spec.resources.get(task_spec.resource) + if task_spec.resources and task_spec.resources.gpu: + return task_spec.resources.gpu + resource_spec = spec.resources.get(task_spec.resource) if resource_spec and resource_spec.gpu: return resource_spec.gpu return 0🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/standalone_executor.py` around lines 417 - 423, _task_gpu_count currently only looks up GPUs via spec.resources.get(task_spec.resource) and ignores inline declarations on the TaskSpec; update _task_gpu_count to first inspect task_spec.resources (e.g., task_spec.resources.gpu or task_spec.resources.get("gpu") depending on structure) and return that value if present, then fall back to resolving spec.resources.get(task_spec.resource) as before; also ensure ComposeExecutor._build_compose_service (which uses _task_gpu_count) will then honor inline task-level GPU requests without further changes.
131-137:⚠️ Potential issue | 🟠 MajorClean output directories for tasks that will be re-executed.
_setup_directories()only creates missing folders. On--resume/--from-step, any task not restored intoself._resultskeeps its previous<work_dir>/<task>/outputcontents, so a rerun can read stale artifacts from{{output}}before it writes new ones.Suggested fix
self._validate_for_standalone(spec) self._setup_directories() if resume or from_step: self._restore_completed_tasks(from_step) + for task_name in self._task_nodes: + if task_name in self._results: + continue + task_output_dir = os.path.join(self._work_dir, task_name, 'output') + shutil.rmtree(task_output_dir, ignore_errors=True) + os.makedirs(task_output_dir, exist_ok=True)Also applies to: 372-376
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/standalone_executor.py` around lines 131 - 137, When resuming/from-step, output directories for tasks that will be re-executed are not cleaned because _setup_directories only creates missing folders; after calling _restore_completed_tasks(...) you must identify tasks that remain to run (e.g., all task ids in the DAG minus keys in self._results) and remove or empty their <work_dir>/<task>/output directories so no stale artifacts are read; implement this cleanup right after _restore_completed_tasks(...) in the block that calls _build_dag, _validate_for_standalone, _setup_directories (and replicate the same cleanup in the analogous block around lines 372-376) to ensure tasks selected for re-execution start with a clean output directory.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@src/utils/standalone_executor.py`:
- Around line 462-468: The multi-GPU selector passed to Docker is unquoted and
can trigger "cannot set both Count and DeviceIDs" errors; update both places
where docker_args is appended (the branch handling gpu_count > available and the
else branch) so the selector value is wrapped in quotes (i.e., pass a quoted
"device=0,1" string instead of device=0,1) when adding ['--gpus', ...], ensuring
node.name/gpu_count/available logic remains the same.
- Around line 417-423: _task_gpu_count currently only looks up GPUs via
spec.resources.get(task_spec.resource) and ignores inline declarations on the
TaskSpec; update _task_gpu_count to first inspect task_spec.resources (e.g.,
task_spec.resources.gpu or task_spec.resources.get("gpu") depending on
structure) and return that value if present, then fall back to resolving
spec.resources.get(task_spec.resource) as before; also ensure
ComposeExecutor._build_compose_service (which uses _task_gpu_count) will then
honor inline task-level GPU requests without further changes.
- Around line 131-137: When resuming/from-step, output directories for tasks
that will be re-executed are not cleaned because _setup_directories only creates
missing folders; after calling _restore_completed_tasks(...) you must identify
tasks that remain to run (e.g., all task ids in the DAG minus keys in
self._results) and remove or empty their <work_dir>/<task>/output directories so
no stale artifacts are read; implement this cleanup right after
_restore_completed_tasks(...) in the block that calls _build_dag,
_validate_for_standalone, _setup_directories (and replicate the same cleanup in
the analogous block around lines 372-376) to ensure tasks selected for
re-execution start with a clean output directory.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: af05a982-9fd9-494e-8e5f-7bf2cc94808d
📒 Files selected for processing (3)
src/cli/BUILDsrc/utils/compose_executor.pysrc/utils/standalone_executor.py
🚧 Files skipped from review as they are similar to previous changes (1)
- src/cli/BUILD
- Introduced a new method `_clean_rerun_output_dirs` to remove output directories for tasks that will be re-executed, ensuring no stale artifacts remain. - Updated the `run_workflow_standalone` method to call the cleanup function when resuming or starting from a specific step. - Adjusted GPU device specification formatting in Docker arguments to ensure correct syntax with quotes around device lists.
- Added support for `--credential`, `--set`, and `--set-string` arguments in both `docker_compose.py` and `standalone.py` to allow users to map credential names to local directories and override default values in workflow specifications. - Implemented `_parse_credentials` function to validate and parse credential inputs. - Updated `ComposeExecutor` and `StandaloneExecutor` to handle new credential and variable parameters, ensuring proper binding of directories and values during execution. - Enhanced error handling for unsupported credential configurations in workflow specifications.
|
@coderabbitai resume |
✅ Actions performedReviews resumed. |
There was a problem hiding this comment.
Actionable comments posted: 6
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/utils/compose_executor.py`:
- Around line 193-197: Validation currently accepts any declared credential name
by checking task_spec.credentials against self._credentials, but
_build_compose_service() only mounts credentials whose value is a string, so
dict-style credential mappings silently get dropped; update the validator in
compose mode (the loop referencing task_spec.credentials and self._credentials)
to also check that self._credentials[cred_name] is a str and reject/raise an
error if it's not (e.g., is a dict), and make the same change in the other
validator block referenced around lines 368-371 so both validation sites enforce
string-only credential values.
- Around line 542-546: The finally block currently removes auto-created work
dirs even on failure; update the cleanup logic in compose_executor.py so that
shutil.rmtree(work_dir, ...) only runs when created_work_dir is true,
keep_work_dir is false, AND success is true (i.e., delete only on successful
runs), and ensure when not success you log/preserve the work_dir via
logger.info('Work directory preserved for debugging: %s', work_dir); adjust the
conditionals around created_work_dir, keep_work_dir, success, and the logger
call accordingly to preserve generated docker-compose.yml on failure.
In `@src/utils/standalone_executor.py`:
- Around line 78-81: The docstring listing unsupported features is out of date:
it still claims "Credentials" are unsupported even though StandaloneExecutor
accepts --credential mounts and binds them in the _run_task() method; update the
text in the unsupported-features block to remove "Credentials" (or explicitly
mark them as supported via --credential) and, if helpful, add a short note
referencing the --credential flag and that binding happens in
StandaloneExecutor._run_task() so the comment matches the implementation.
- Around line 357-361: The loop that validates TaskSpec.credentials currently
treats each credential as a string and silently ignores dict-style credentials;
update the validation in the credentials-checking loop (iterating over
task_spec.credentials) to detect dict items (isinstance(item, dict)) and add a
clear rejection message to unsupported_features (e.g., 'Task "{task_spec.name}":
credential "{key}" uses dict-style mapping which the standalone executor does
not support; provide credentials as NAME=/path or flatten the mapping') instead
of treating them as missing, and apply the same explicit rejection check where
credentials are mounted (the block around the mount code that references
self._credentials) so dict credential entries are rejected consistently rather
than skipped.
- Around line 673-677: The finally block currently deletes auto-created work
dirs whenever created_work_dir is True and keep_work_dir is False, even on
failure; change the logic so auto-created dirs are removed only when the run
succeeded and keep_work_dir is False. Specifically, update the condition around
shutil.rmtree in the finally block (referencing created_work_dir, keep_work_dir,
success, work_dir) to run only when success is True (e.g., if created_work_dir
and not keep_work_dir and success) and ensure the branch that logs preservation
(logger.info('Work directory preserved for debugging: %s', work_dir)) runs when
the run failed (not success) so that auto-created dirs are always kept on
failure.
- Around line 580-636: _spec_has_templates currently only looks for '{%', '{#',
and 'default-values' so specs that only use '{{ ... }}' never trigger local
expansion; additionally _OSMO_TOKEN_PATTERN only matches unspaced tokens like
'{{output}}' so spaced tokens like '{{ output }}' are not protected and Jinja
raises StrictUndefined. Update _OSMO_TOKEN_PATTERN to allow optional whitespace
inside the braces and to match the full token (e.g.
r'\{\{\s*(uuid|workflow_id|output|input:[^}]+|host:[^}]+)\s*\}\}'), and in
_expand_jinja_locally use the match.group(0) (the full token including spaces)
as original_token when building placeholder_map so replacements preserve
spacing; also update _spec_has_templates to return True if '{{' appears (in
addition to '{%' and '{#' and 'default-values') so plain '{{ var }}' triggers
local Jinja expansion.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 0c881831-6032-4c86-b602-7a0c4af46766
📒 Files selected for processing (8)
.gitignoreAGENTS.mdsrc/cli/BUILDsrc/cli/docker_compose.pysrc/cli/standalone.pysrc/utils/BUILDsrc/utils/compose_executor.pysrc/utils/standalone_executor.py
✅ Files skipped from review due to trivial changes (1)
- .gitignore
🚧 Files skipped from review as they are similar to previous changes (3)
- src/cli/BUILD
- src/utils/BUILD
- AGENTS.md
- Updated credential processing to support dict-style mappings, raising clear errors for unsupported configurations. - Improved error messages for missing credentials, guiding users on the correct format for credential specification. - Adjusted cleanup logic in `run_workflow_compose` and `run_workflow_standalone` to ensure work directories are only removed on successful execution.
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (3)
src/utils/compose_executor.py (1)
77-78: Project name may be invalid if workflow name starts with non-alphanumeric.If the workflow name starts with a special character (e.g.,
_myworkflow), the resulting project nameosmo--myworkflowhas a double hyphen but is valid. However, if a workflow name is all special characters (unlikely but possible), the result could be problematic.This is a minor edge case, but consider validating the workflow name earlier or stripping leading/trailing hyphens:
def _compose_project_name(self, spec: workflow_module.WorkflowSpec) -> str: - return f'osmo-{re.sub(r"[^a-z0-9-]", "-", spec.name.lower())}' + sanitized = re.sub(r"[^a-z0-9-]", "-", spec.name.lower()).strip('-') + return f'osmo-{sanitized or "workflow"}'🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/compose_executor.py` around lines 77 - 78, The _compose_project_name function can produce leading/trailing or repeated hyphens (or an empty name) when spec.name contains only non-alphanumeric chars; update the logic in _compose_project_name to first replace non [a-z0-9] with hyphens, collapse consecutive hyphens into a single hyphen, strip leading/trailing hyphens, and if the result is empty fallback to a safe default like "osmo-default" (all while keeping the "osmo-" prefix and using spec.name.lower() as the input); locate and update the _compose_project_name method in compose_executor.py to implement these transformations and the fallback.src/utils/standalone_executor.py (2)
508-509: Consider addingstrict=Truetozip()for defensive coding.While
task_spec.environment.keys()andresolved_env_valuesare guaranteed to have the same length (built from the same dict), addingstrict=Truemakes this invariant explicit and would catch any future refactoring bugs:- for env_key, resolved_value in zip(task_spec.environment.keys(), resolved_env_values): + for env_key, resolved_value in zip(task_spec.environment.keys(), resolved_env_values, strict=True):🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/standalone_executor.py` around lines 508 - 509, The loop that builds docker_args uses zip(task_spec.environment.keys(), resolved_env_values) — change it to zip(task_spec.environment.keys(), resolved_env_values, strict=True) to enforce the invariant that both iterables have the same length; update the for-loop over (env_key, resolved_value) accordingly (still appending to docker_args), and run tests/CI to ensure Python runtime supports zip(..., strict=True).
203-214: State file does not include workflow version/hash.The state file only persists task results without any workflow fingerprint. If the spec changes between runs (different images, args, etc.) but task names stay the same,
--resumewill skip incompatible tasks and reuse stale outputs.The PR discussion indicates this was a deliberate design decision, but consider at minimum logging a warning when resuming that the user is responsible for ensuring spec consistency.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/standalone_executor.py` around lines 203 - 214, The state file currently saved by _save_state lacks any workflow fingerprint, so add a deterministic workflow/version hash to the saved JSON (e.g., compute from the workflow spec, image names, args -- implement a helper like _compute_workflow_fingerprint and include its result under a top-level key such as "workflow_fingerprint" alongside "tasks"), update _save_state to write that fingerprint to self._state_file_path, and then update the resume/load path (the method that reads the state, e.g., _load_state or the resume logic that calls it) to compare the saved fingerprint with the current computed fingerprint and emit a clear processLogger.warning (or similar logger used in this class) when fingerprints are missing or do not match so users are warned before reusing stale outputs.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/utils/standalone_executor.py`:
- Around line 499-501: The docker_args construction in standalone_executor.py is
adding literal double-quote characters into the GPU spec (in the f-strings used
to build the '--gpus' argument), which breaks Docker when invoking
subprocess.run() with list mode; update both branches that set docker_args (the
branch using available and the branch using gpu_count) to remove the surrounding
quotes so the value is e.g. device=0,1 instead of "device=0,1". Locate the code
that appends to docker_args (the two f'"device=..."' occurrences) and change
them to unquoted f'device=...'; keep using subprocess.run() list invocation
unchanged.
---
Nitpick comments:
In `@src/utils/compose_executor.py`:
- Around line 77-78: The _compose_project_name function can produce
leading/trailing or repeated hyphens (or an empty name) when spec.name contains
only non-alphanumeric chars; update the logic in _compose_project_name to first
replace non [a-z0-9] with hyphens, collapse consecutive hyphens into a single
hyphen, strip leading/trailing hyphens, and if the result is empty fallback to a
safe default like "osmo-default" (all while keeping the "osmo-" prefix and using
spec.name.lower() as the input); locate and update the _compose_project_name
method in compose_executor.py to implement these transformations and the
fallback.
In `@src/utils/standalone_executor.py`:
- Around line 508-509: The loop that builds docker_args uses
zip(task_spec.environment.keys(), resolved_env_values) — change it to
zip(task_spec.environment.keys(), resolved_env_values, strict=True) to enforce
the invariant that both iterables have the same length; update the for-loop over
(env_key, resolved_value) accordingly (still appending to docker_args), and run
tests/CI to ensure Python runtime supports zip(..., strict=True).
- Around line 203-214: The state file currently saved by _save_state lacks any
workflow fingerprint, so add a deterministic workflow/version hash to the saved
JSON (e.g., compute from the workflow spec, image names, args -- implement a
helper like _compute_workflow_fingerprint and include its result under a
top-level key such as "workflow_fingerprint" alongside "tasks"), update
_save_state to write that fingerprint to self._state_file_path, and then update
the resume/load path (the method that reads the state, e.g., _load_state or the
resume logic that calls it) to compare the saved fingerprint with the current
computed fingerprint and emit a clear processLogger.warning (or similar logger
used in this class) when fingerprints are missing or do not match so users are
warned before reusing stale outputs.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 8e89b1d9-9eb4-425d-bcbf-3ba7b3572583
📒 Files selected for processing (2)
src/utils/compose_executor.pysrc/utils/standalone_executor.py
- Introduced a new method `_compute_workflow_fingerprint` to generate a SHA-256 hash of task specifications, allowing for detection of workflow changes across runs. - Updated the state management to include the workflow fingerprint, enabling verification of spec consistency between runs and warning users of potential stale outputs. - Improved GPU device specification formatting in Docker arguments to ensure correct syntax without unnecessary quotes.
There was a problem hiding this comment.
🧹 Nitpick comments (3)
src/utils/standalone_executor.py (2)
280-290: Minor: Consider usingcollections.dequefor BFS queue.
list.pop(0)is O(n) per operation. For large workflows,collections.deque.popleft()provides O(1) performance. This also applies to_cancel_downstreamat lines 469-477.Suggested improvement
+from collections import deque + def _get_downstream_tasks(self, task_name: str) -> Set[str]: """Return all transitive downstream dependents of the given task via BFS.""" visited: Set[str] = set() - queue = [task_name] + queue = deque([task_name]) while queue: - current = queue.pop(0) + current = queue.popleft() for downstream in self._task_nodes[current].downstream: if downstream not in visited: visited.add(downstream) queue.append(downstream) return visited🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/standalone_executor.py` around lines 280 - 290, Replace the BFS queue implementation that uses a list and pop(0) with a collections.deque to get O(1) pops: import deque from collections, initialize queue = deque([task_name]) in _get_downstream_tasks and use queue.popleft() instead of pop(0); make the same change in the _cancel_downstream function (initialize its queue as a deque and call popleft()) to avoid O(n) behavior on large workflows while preserving the existing visited/append logic.
126-141: Consider includingfilescontents in the workflow fingerprint.The fingerprint captures task name, image, command, args, environment, inputs, and resource but omits
filescontents. If a user changes inline file contents without changing other fields, the fingerprint won't detect it and--resumemay reuse stale outputs.Suggested improvement
fingerprint_data.append({ 'name': name, 'image': spec.image, 'command': spec.command, 'args': spec.args, 'environment': dict(sorted(spec.environment.items())), 'inputs': [str(i) for i in spec.inputs], 'resource': spec.resource, + 'files': [(f.path, f.contents) for f in spec.files], })🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/standalone_executor.py` around lines 126 - 141, The _compute_workflow_fingerprint function currently omits task file contents; update it to include each task spec's files in the fingerprint by adding a deterministic representation of spec.files (e.g., a sorted list of {path, content} or a map path->content) into fingerprint_data for each task in _task_nodes. Ensure ordering is stable (sort file paths), handle missing/None safely, and convert file contents to strings (or a content hash) so the JSON dump stays deterministic before hashing; keep using json.dumps(sort_keys=True, separators=(',', ':')) and hashlib.sha256 to produce the final hex digest.src/utils/compose_executor.py (1)
493-506: Consider logging a hint when JSON parsing fails to find the service.The JSON parsing handles both single-object and array formats from
docker compose ps, which is good. However, if the output format changes in future Docker Compose versions, parsing could silently fail and return exit code 1, making debugging difficult.Suggested improvement
for line in result.stdout.strip().splitlines(): line = line.strip() if not line: continue try: container_info = json.loads(line) except json.JSONDecodeError: + logger.debug('Could not parse compose ps output line: %s', line[:100]) continue🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/compose_executor.py` around lines 493 - 506, The loop that parses result.stdout with json.loads can silently skip malformed lines and return 1, so update the parsing in compose_executor.py (the block iterating over result.stdout.splitlines and using json.loads) to log a hint when JSONDecodeError occurs and when no matching service is found: catch json.JSONDecodeError and call the module logger (e.g., logger.error or process_logger.error) including the exception and the raw line from result.stdout, and after the loop, if no entry matched service_name, log a warning that the service was not found in parsed container_info along with the full result.stdout to aid debugging.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@src/utils/compose_executor.py`:
- Around line 493-506: The loop that parses result.stdout with json.loads can
silently skip malformed lines and return 1, so update the parsing in
compose_executor.py (the block iterating over result.stdout.splitlines and using
json.loads) to log a hint when JSONDecodeError occurs and when no matching
service is found: catch json.JSONDecodeError and call the module logger (e.g.,
logger.error or process_logger.error) including the exception and the raw line
from result.stdout, and after the loop, if no entry matched service_name, log a
warning that the service was not found in parsed container_info along with the
full result.stdout to aid debugging.
In `@src/utils/standalone_executor.py`:
- Around line 280-290: Replace the BFS queue implementation that uses a list and
pop(0) with a collections.deque to get O(1) pops: import deque from collections,
initialize queue = deque([task_name]) in _get_downstream_tasks and use
queue.popleft() instead of pop(0); make the same change in the
_cancel_downstream function (initialize its queue as a deque and call popleft())
to avoid O(n) behavior on large workflows while preserving the existing
visited/append logic.
- Around line 126-141: The _compute_workflow_fingerprint function currently
omits task file contents; update it to include each task spec's files in the
fingerprint by adding a deterministic representation of spec.files (e.g., a
sorted list of {path, content} or a map path->content) into fingerprint_data for
each task in _task_nodes. Ensure ordering is stable (sort file paths), handle
missing/None safely, and convert file contents to strings (or a content hash) so
the JSON dump stays deterministic before hashing; keep using
json.dumps(sort_keys=True, separators=(',', ':')) and hashlib.sha256 to produce
the final hex digest.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: d75af904-9945-450c-8df8-dabef5dbcd16
📒 Files selected for processing (2)
src/utils/compose_executor.pysrc/utils/standalone_executor.py
- Added error logging for JSON parsing failures in `ComposeExecutor` to aid in debugging. - Enhanced warning messages in `ComposeExecutor` to include the full Docker compose output when no container info is found. - Updated `StandaloneExecutor` to use `deque` for improved performance in task processing queues.
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/utils/compose_executor.py`:
- Around line 82-86: The _compose_base_cmd function currently uses
self._compose_cmd.split(), which mishandles quoted paths; replace that call with
shlex.split(self._compose_cmd) so quoted/space-containing arguments are parsed
correctly, and ensure shlex is imported; keep the rest of the return
construction (adding '-p', self._compose_project_name(spec), '-f',
self._compose_file_path) unchanged.
In `@src/utils/standalone_executor.py`:
- Around line 127-147: The fingerprint only records spec.resource by name;
update _compute_workflow_fingerprint to include the resolved/effective resource
configuration so changes in GPU/CPU/memory alter the hash. For each task node in
_compute_workflow_fingerprint, replace the simple 'resource': spec.resource
entry with the full resource dict resolved from spec.resources (e.g.,
spec.resources.get(spec.resource) or spec.effective_resource if available),
normalizing it to a deterministic order (sorted keys) and serializing numeric
values so the SHA-256 reflects actual resource settings.
- Around line 228-246: Make saving and loading the resume state atomic and
tolerant of partial/corrupt files: in _save_state use a temp file (e.g.,
self._state_file_path + ".tmp"), write JSON to it and fsync the file and its
directory, then atomically replace the real file with os.replace so truncation
cannot leave a broken file; in _load_state catch JSONDecodeError (and possibly
IOError) when reading self._state_file_path and treat it as no state (return
None) after optionally renaming the corrupt file to a .corrupt timestamped
backup so the run can start fresh and the bad file is preserved for debugging;
keep references to
_save_state/_load_state/_state_file_path/_workflow_fingerprint/_results when
making the changes.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 3eebc53e-3c27-4b6b-abba-4cd68bef171f
📒 Files selected for processing (2)
src/utils/compose_executor.pysrc/utils/standalone_executor.py
…ng and state management - Replaced string splitting with `shlex.split` in `ComposeExecutor` to ensure correct parsing of command arguments. - Enhanced `_compute_workflow_fingerprint` in `StandaloneExecutor` to accept `WorkflowSpec` directly, improving clarity and functionality. - Updated state file handling in `StandaloneExecutor` to use a temporary file for safer writes and added error handling for corrupt state files.
There was a problem hiding this comment.
🧹 Nitpick comments (2)
src/utils/standalone_executor.py (2)
369-396: Consider using iterable unpacking for cycle path construction.Minor style improvement at line 380.
Suggested change
if state[name] == IN_PROGRESS: cycle_start = path.index(name) - return path[cycle_start:] + [name] + return [*path[cycle_start:], name]🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/standalone_executor.py` around lines 369 - 396, In _check_for_cycles (inner function visit) replace the list concatenation used to build the cycle path (path[cycle_start:] + [name]) with iterable unpacking to improve clarity and performance; locate the visit function inside the _check_for_cycles method and return the cycle as [*path[cycle_start:], name] instead of using + to join the slices.
694-700: Consider using SHA-256 instead of MD5 for placeholder key generation.While MD5 is only used here for generating unique placeholder names (not security), using
hashlib.sha256would be consistent with the fingerprint computation elsewhere in this file and silence the static analysis warning.Suggested change
placeholder_map: Dict[str, str] = {} for match in _OSMO_TOKEN_PATTERN.finditer(file_text): field = match.group(1).strip() - hash_key = 'hash' + str(int(hashlib.md5(field.encode('utf-8')).hexdigest(), 16)) + hash_key = 'osmo_' + hashlib.sha256(field.encode('utf-8')).hexdigest()[:16] original_token = match.group(0) template_data[hash_key] = original_token placeholder_map[original_token] = hash_key🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/standalone_executor.py` around lines 694 - 700, The placeholder key generation currently uses hashlib.md5 on the matched field inside the loop that builds placeholder_map and template_data (see _OSMO_TOKEN_PATTERN, placeholder_map, template_data, hash_key, original_token); replace hashlib.md5(...) with hashlib.sha256(...) and keep the same .hexdigest() -> int(...) conversion so the produced hash_key remains a deterministic string (e.g., 'hash' + str(int(hashlib.sha256(field.encode('utf-8')).hexdigest(), 16))). This aligns with the fingerprint code elsewhere and removes the MD5 usage.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@src/utils/standalone_executor.py`:
- Around line 369-396: In _check_for_cycles (inner function visit) replace the
list concatenation used to build the cycle path (path[cycle_start:] + [name])
with iterable unpacking to improve clarity and performance; locate the visit
function inside the _check_for_cycles method and return the cycle as
[*path[cycle_start:], name] instead of using + to join the slices.
- Around line 694-700: The placeholder key generation currently uses hashlib.md5
on the matched field inside the loop that builds placeholder_map and
template_data (see _OSMO_TOKEN_PATTERN, placeholder_map, template_data,
hash_key, original_token); replace hashlib.md5(...) with hashlib.sha256(...) and
keep the same .hexdigest() -> int(...) conversion so the produced hash_key
remains a deterministic string (e.g., 'hash' +
str(int(hashlib.sha256(field.encode('utf-8')).hexdigest(), 16))). This aligns
with the fingerprint code elsewhere and removes the MD5 usage.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 3043a7fb-1201-4911-b8eb-8297aa4403a6
📒 Files selected for processing (2)
src/utils/compose_executor.pysrc/utils/standalone_executor.py
…ction logic - Updated the cycle detection logic to use list unpacking for clarity in `StandaloneExecutor`. - Changed hash generation from MD5 to SHA-256 for improved security in `_expand_jinja_locally`, ensuring better uniqueness and collision resistance.
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (1)
src/utils/standalone_executor.py (1)
165-176:⚠️ Potential issue | 🟠 MajorFresh reruns still reuse stale task outputs.
_clean_rerun_output_dirs()only runs in theresume/from_steppath. If the same executor orwork_diris used for a normal rerun,_setup_directories()leaves<work_dir>/<task>/outputintact, so a supposedly fresh run can consume or publish stale artifacts.♻️ Proposed fix
def execute(self, spec: workflow_module.WorkflowSpec, resume: bool = False, from_step: str | None = None) -> bool: """Run all tasks in topological order, returning True if the entire workflow succeeds.""" self._results.clear() self._build_dag(spec) self._workflow_fingerprint = self._compute_workflow_fingerprint(spec) self._validate_for_standalone(spec) - self._setup_directories() + self._setup_directories(clean_outputs=not (resume or from_step)) if resume or from_step: self._restore_completed_tasks(from_step) self._clean_rerun_output_dirs() @@ - def _setup_directories(self): + def _setup_directories(self, clean_outputs: bool = False): """Create the work directory and per-task output directories on the host filesystem.""" os.makedirs(self._work_dir, exist_ok=True) for task_name in self._task_nodes: - os.makedirs(os.path.join(self._work_dir, task_name, 'output'), exist_ok=True) + output_dir = os.path.join(self._work_dir, task_name, 'output') + if clean_outputs and os.path.isdir(output_dir): + shutil.rmtree(output_dir) + os.makedirs(output_dir, exist_ok=True)Also applies to: 468-472
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/standalone_executor.py` around lines 165 - 176, The bug is that _clean_rerun_output_dirs() is only invoked for resume/from_step flows so fresh reruns can reuse stale outputs; fix by invoking _clean_rerun_output_dirs() after _setup_directories() for all runs (or at minimum when not resuming) so execute(...) always clears previous task output dirs before running; update the execute method to call _clean_rerun_output_dirs() unconditionally (or add an else branch when resume/from_step is false) so _setup_directories() + _clean_rerun_output_dirs() always run together.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/utils/standalone_executor.py`:
- Around line 533-541: The loop that writes inline files treats all files as
UTF-8 text; change it to honor file_spec.base64 by checking that flag after
resolving tokens with _substitute_tokens: if file_spec.base64 is truthy,
base64-decode the resolved_contents and open host_path in binary mode ('wb') to
write the bytes, otherwise continue writing as UTF-8 text ('w',
encoding='utf-8'). Keep the same path-safety checks (host_path, files_dir,
node.name) and directory creation logic (os.makedirs) but branch the file
open/write based on file_spec.base64 so binary payloads are correctly decoded
and written.
---
Duplicate comments:
In `@src/utils/standalone_executor.py`:
- Around line 165-176: The bug is that _clean_rerun_output_dirs() is only
invoked for resume/from_step flows so fresh reruns can reuse stale outputs; fix
by invoking _clean_rerun_output_dirs() after _setup_directories() for all runs
(or at minimum when not resuming) so execute(...) always clears previous task
output dirs before running; update the execute method to call
_clean_rerun_output_dirs() unconditionally (or add an else branch when
resume/from_step is false) so _setup_directories() + _clean_rerun_output_dirs()
always run together.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 3c919781-bfd4-4f35-acf2-c230b97e6189
📒 Files selected for processing (1)
src/utils/standalone_executor.py
…ncoding - Introduced base64 decoding for file writing in `StandaloneExecutor`, allowing for proper handling of encoded content. - Ensured cleanup of rerun output directories is executed when resuming tasks, improving state management.
|
@coderabbitai resume |
✅ Actions performedReviews resumed. |
There was a problem hiding this comment.
Actionable comments posted: 3
♻️ Duplicate comments (1)
src/utils/standalone_executor.py (1)
173-176:⚠️ Potential issue | 🟠 MajorRestore completed state before recreating task outputs.
_setup_directories()recreates every<work_dir>/<task>/outputbefore_restore_completed_tasks()runs, so Line 300’sos.path.isdir(info['output_dir'])check no longer proves that prior output actually exists. A cleaned or partially deleted work dir can therefore skip tasks and feed empty outputs downstream on--resume/--from-step.Also applies to: 297-301
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/standalone_executor.py` around lines 173 - 176, The code currently calls self._setup_directories() before restoring completed state, which recreates every <work_dir>/<task>/output and makes the subsequent os.path.isdir(info['output_dir']) check in _restore_completed_tasks unreliable; move the call to self._restore_completed_tasks(from_step) to run before self._setup_directories() when resume or from_step is set (or alternatively modify _setup_directories to skip recreating task output dirs when restoring) so that existing outputs are detected correctly by _restore_completed_tasks; update any references to resume/from_step handling to reflect the new call order and ensure downstream logic uses the restored state.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/utils/standalone_executor.py`:
- Around line 402-461: The standalone executor currently ignores
WorkflowSpec.timeout; update _validate_for_standalone to detect a non-null
spec.timeout and treat it as unsupported: inside _validate_for_standalone (the
method performing feature checks), add a check for spec.timeout and append an
explanatory message to unsupported_features (e.g. 'WorkflowSpec.timeout is not
supported in standalone mode; use the service executor or remove the timeout'),
so workflows that declare a timeout are rejected rather than silently ignored.
- Around line 129-154: The fingerprint currently omits credential-backed inputs
so changes to credential names, mount paths, or supplied directories won't
change the hash; update _compute_workflow_fingerprint to include
task_spec.credentials and the resolved credential mapping used at runtime (the
mapping that _run_task mounts) by serializing credential names, mount paths, and
the resolved source/path for each credential referenced by task_spec.credentials
into the fingerprint_data (use deterministic ordering and stable serialization
like sorted keys and json.dumps as done for other fields); also apply the same
change to the other fingerprinting code path that computes per-task resume
fingerprints so credential changes invalidate cached outputs.
- Around line 344-369: Standalone executor currently ignores group.barrier
semantics; add validation to reject barrier groups in standalone mode. Update
_validate_for_standalone to iterate the workflow spec's groups (same groups used
by _build_dag) and raise a clear error (e.g., ValueError) when any group has
barrier == True, referencing the group name in the message; this prevents silent
acceptance of barrier groups since _build_dag only respects TaskInputOutput
edges and cannot enforce barrier synchronization. Ensure the check uses the same
group iteration helper (the _groups method) and triggers before DAG
construction/_build_dag is called.
---
Duplicate comments:
In `@src/utils/standalone_executor.py`:
- Around line 173-176: The code currently calls self._setup_directories() before
restoring completed state, which recreates every <work_dir>/<task>/output and
makes the subsequent os.path.isdir(info['output_dir']) check in
_restore_completed_tasks unreliable; move the call to
self._restore_completed_tasks(from_step) to run before self._setup_directories()
when resume or from_step is set (or alternatively modify _setup_directories to
skip recreating task output dirs when restoring) so that existing outputs are
detected correctly by _restore_completed_tasks; update any references to
resume/from_step handling to reflect the new call order and ensure downstream
logic uses the restored state.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: dd9fe1cd-5f10-415c-9c90-b4ec9e4d2997
📒 Files selected for processing (1)
src/utils/standalone_executor.py
| def _build_dag(self, spec: workflow_module.WorkflowSpec): | ||
| """Construct the internal DAG of TaskNodes from the workflow spec's tasks and input dependencies.""" | ||
| self._task_nodes.clear() | ||
| self._group_specs.clear() | ||
|
|
||
| for group in self._groups(spec): | ||
| self._group_specs[group.name] = group | ||
| for task_spec in group.tasks: | ||
| self._task_nodes[task_spec.name] = TaskNode( | ||
| name=task_spec.name, | ||
| spec=task_spec, | ||
| group=group.name, | ||
| ) | ||
|
|
||
| for group in self._groups(spec): | ||
| for task_spec in group.tasks: | ||
| for input_source in task_spec.inputs: | ||
| if isinstance(input_source, task_module.TaskInputOutput): | ||
| upstream_task = input_source.task | ||
| if upstream_task not in self._task_nodes: | ||
| raise ValueError( | ||
| f'Task "{task_spec.name}" depends on unknown task "{upstream_task}"') | ||
| self._task_nodes[task_spec.name].upstream.add(upstream_task) | ||
| self._task_nodes[upstream_task].downstream.add(task_spec.name) | ||
|
|
||
| self._check_for_cycles() |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== TaskGroupSpec semantics ==="
rg -n --type=py -C3 '\b(barrier|ignoreNonleadStatus)\b' src/utils/job src/utils src/lib
echo
echo "=== Barrier handling in schedulers/executors ==="
rg -n --type=py -C4 '\bbarrier\b|_build_dag|_find_ready_tasks' src/utils src/libRepository: NVIDIA/OSMO
Length of output: 50367
🏁 Script executed:
# Check StandaloneExecutor class docstring and any barrier-related limitations
head -100 src/utils/standalone_executor.py | grep -A 20 "class StandaloneExecutor"Repository: NVIDIA/OSMO
Length of output: 1029
🏁 Script executed:
# Search for any explicit limitation comments about barrier support
rg -n "barrier|standalone" src/utils/standalone_executor.py | head -30Repository: NVIDIA/OSMO
Length of output: 1231
🏁 Script executed:
# Check if compose executor has any barrier handling
rg -n "barrier" src/utils/compose_executor.pyRepository: NVIDIA/OSMO
Length of output: 37
🏁 Script executed:
# Search for validation that might reject barrier specs in standalone mode
rg -n "_validate|barrier.*error|not.*support.*barrier" src/utils/standalone_executor.py src/utils/compose_executor.pyRepository: NVIDIA/OSMO
Length of output: 628
🏁 Script executed:
# Read the full _validate_for_standalone implementation
sed -n '402,500p' src/utils/standalone_executor.pyRepository: NVIDIA/OSMO
Length of output: 5493
🏁 Script executed:
# Search for tests that use barrier with standalone executor
rg -n "barrier.*standalone|standalone.*barrier" src/utils/tests/test_standalone_executor.pyRepository: NVIDIA/OSMO
Length of output: 37
🏁 Script executed:
# Check if there are any tests that test barrier specs in standalone mode
rg -B5 -A10 "barrier.*true\|barrier.*false" src/utils/tests/test_standalone_executor.py | head -50Repository: NVIDIA/OSMO
Length of output: 37
🏁 Script executed:
# Check how _notify_barrier works and when it's called
rg -B10 -A15 "_notify_barrier" src/utils/job/jobs.py | head -60Repository: NVIDIA/OSMO
Length of output: 2723
🏁 Script executed:
# Search for when barrier is checked in readiness logic
rg -B5 -A5 "has_group_barrier\|barrier" src/utils/job/jobs.py | grep -A5 -B5 "ready\|schedule\|start"Repository: NVIDIA/OSMO
Length of output: 37
🏁 Script executed:
# Check if multi-task groups have implicit within-group dependencies
rg -n "lead.*task|lead_task" src/utils/standalone_executor.py src/utils/compose_executor.py | head -20Repository: NVIDIA/OSMO
Length of output: 942
🏁 Script executed:
# Look for how groups are scheduled - is there group-level synchronization?
rg -B3 -A3 "multi.*task.*group|group.*ready|_find_ready" src/utils/standalone_executor.py | head -40Repository: NVIDIA/OSMO
Length of output: 909
🏁 Script executed:
# Check if there are implicit within-group dependencies or ordering
rg -B5 -A10 "for task_spec in group.tasks|tasks in group" src/utils/standalone_executor.py | head -40Repository: NVIDIA/OSMO
Length of output: 1993
🏁 Script executed:
# Verify compose executor also lacks barrier handling
rg -n "_build_dag|_find_ready" src/utils/compose_executor.py | head -20Repository: NVIDIA/OSMO
Length of output: 239
🏁 Script executed:
# Check if there's test coverage for multi-task groups in standalone mode
rg -B10 -A10 "multi.*task.*group|group.*tasks.*\>" src/utils/tests/test_standalone_executor.py | grep -A10 "def test" | head -50Repository: NVIDIA/OSMO
Length of output: 189
🏁 Script executed:
# Look for any within-group task ordering
rg -B3 -A3 "GROUP_BARRIER_NAME|barrier.*=.*true" src/utils/standalone_executor.py src/utils/compose_executor.pyRepository: NVIDIA/OSMO
Length of output: 37
🏁 Script executed:
# Check if there's any documentation about barrier support in standalone
rg -i "barrier" src/utils/standalone_executor.pyRepository: NVIDIA/OSMO
Length of output: 37
🏁 Script executed:
# Check how compose executor handles multi-task groups
sed -n '414,450p' src/utils/compose_executor.pyRepository: NVIDIA/OSMO
Length of output: 1406
🏁 Script executed:
# Look for integration tests that use multi-task groups with barriers
rg -B5 -A15 "barrier.*true|has_group_barrier" src/utils/tests/test_standalone_executor.py | head -60Repository: NVIDIA/OSMO
Length of output: 37
Add validation to reject barrier groups in standalone mode.
_build_dag() reads only explicit TaskInputOutput dependencies and never checks the barrier field in task groups. Barriers are used in the Kubernetes job scheduler (via Redis) to synchronize task completion within groups, but there is no equivalent mechanism in standalone execution.
Additionally, _validate_for_standalone() does not reject barrier: true specs, so groups with barriers are silently accepted but their barrier semantics are ignored. Tasks in a barrier group with barrier: true will run in parallel as soon as their explicit task-input dependencies are satisfied, rather than being held until all group members complete.
Either:
- Add a validation check to explicitly reject groups with
barrier: true, or - Document in the class docstring that barrier semantics are not enforced in standalone mode.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/utils/standalone_executor.py` around lines 344 - 369, Standalone executor
currently ignores group.barrier semantics; add validation to reject barrier
groups in standalone mode. Update _validate_for_standalone to iterate the
workflow spec's groups (same groups used by _build_dag) and raise a clear error
(e.g., ValueError) when any group has barrier == True, referencing the group
name in the message; this prevents silent acceptance of barrier groups since
_build_dag only respects TaskInputOutput edges and cannot enforce barrier
synchronization. Ensure the check uses the same group iteration helper (the
_groups method) and triggers before DAG construction/_build_dag is called.
✅ Actions performedReviews resumed. |
- Added support for credential data mapping in `_compute_workflow_fingerprint`, allowing for better management of task-specific credentials. - Updated `_validate_for_standalone` to raise a ValueError for unsupported timeout features in standalone mode, improving error handling and user guidance. - Ensured directory setup is executed correctly during workflow execution, enhancing state management.
There was a problem hiding this comment.
🧹 Nitpick comments (1)
src/utils/standalone_executor.py (1)
199-221: Note: Sequential task execution.Ready tasks are processed sequentially in the
for task_name in readyloop rather than in parallel. This is a reasonable choice for local standalone execution (simpler implementation, predictable resource usage), but could be a performance consideration for workflows with many independent tasks.If parallel execution becomes desirable in the future, consider using
concurrent.futures.ThreadPoolExecutorwith a configurable worker count.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/standalone_executor.py` around lines 199 - 221, The loop processing ready tasks currently runs them sequentially (for task_name in ready) which limits parallelism; to make execution parallel, submit each ready task to a concurrent.futures.ThreadPoolExecutor (configurable max_workers) where each worker calls _run_task(node, spec), then gather futures and iterate over completed futures to update self._results[task_name], call self._save_state(), and perform the same exit_code checks (_is_nonlead_failure_ignorable, _cancel_downstream) and logging; ensure access to shared state (_results and any other mutated attributes) and calls to _save_state are thread-safe (use a threading.Lock) and preserve the existing failure-handling semantics when processing completed futures.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@src/utils/standalone_executor.py`:
- Around line 199-221: The loop processing ready tasks currently runs them
sequentially (for task_name in ready) which limits parallelism; to make
execution parallel, submit each ready task to a
concurrent.futures.ThreadPoolExecutor (configurable max_workers) where each
worker calls _run_task(node, spec), then gather futures and iterate over
completed futures to update self._results[task_name], call self._save_state(),
and perform the same exit_code checks (_is_nonlead_failure_ignorable,
_cancel_downstream) and logging; ensure access to shared state (_results and any
other mutated attributes) and calls to _save_state are thread-safe (use a
threading.Lock) and preserve the existing failure-handling semantics when
processing completed futures.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: bfa0157c-cd9f-4228-8498-c07245251390
📒 Files selected for processing (1)
src/utils/standalone_executor.py
Description
A new osmo local run CLI subcommand that executes OSMO workflow specification YAMLs locally using Docker containers, bypassing the Kubernetes control plane entirely. This eliminates the need for a running OSMO service, a Kubernetes cluster, Redis, PostgreSQL, or any other platform infrastructure.
Issue #800
Checklist
Summary by CodeRabbit
New Features
Tests
Documentation
Chores