Skip to content

#800 - Local workflow execution#802

Open
maufrancom wants to merge 37 commits intoNVIDIA:mainfrom
maufrancom:mfrancomarti/local-execution
Open

#800 - Local workflow execution#802
maufrancom wants to merge 37 commits intoNVIDIA:mainfrom
maufrancom:mfrancomarti/local-execution

Conversation

@maufrancom
Copy link
Copy Markdown

@maufrancom maufrancom commented Apr 3, 2026

Description

A new osmo local run CLI subcommand that executes OSMO workflow specification YAMLs locally using Docker containers, bypassing the Kubernetes control plane entirely. This eliminates the need for a running OSMO service, a Kubernetes cluster, Redis, PostgreSQL, or any other platform infrastructure.

Issue #800

Checklist

  • I am familiar with the Contributing Guidelines.
  • New or existing tests cover these changes.
  • The documentation is up to date with these changes.

Summary by CodeRabbit

  • New Features

    • Added standalone and Docker‑Compose local runners and corresponding CLI commands to run/resume workflows with variable overrides, credential mappings, shm‑size, GPU support, and optional work‑dir retention.
  • Tests

    • Extensive unit and Docker‑gated integration tests for both runners: scheduling, token substitution, GPU handling, filesystem safety, compose YAML generation, and error cases.
  • Documentation

    • Updated docs describing standalone and compose modes, networking, GPU and credential behaviors.
  • Chores

    • Added tutorial spec aggregation, build/test targets, and updated .gitignore.

@maufrancom maufrancom requested a review from a team as a code owner April 3, 2026 20:56
@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Apr 3, 2026

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro Plus

Run ID: 67c703a1-808b-4b29-a9b7-f3ea02c492df

📥 Commits

Reviewing files that changed from the base of the PR and between fda5dbb and 55d83cf.

📒 Files selected for processing (1)
  • src/utils/BUILD
✅ Files skipped from review due to trivial changes (1)
  • src/utils/BUILD

📝 Walkthrough

Walkthrough

Adds two local Docker-based executors (standalone and Docker Compose), CLI subcommands to invoke them, Bazel build/test targets and test data, extensive unit/integration tests, documentation entries, and .gitignore additions for local development artifacts.

Changes

Cohort / File(s) Summary
Executors (core logic)
src/utils/standalone_executor.py, src/utils/compose_executor.py
New StandaloneExecutor and ComposeExecutor plus top-level helpers (run_workflow_standalone, run_workflow_compose): spec loading/validation, DAG building, token substitution, scheduling, Docker/Docker‑Compose execution, GPU/shm handling, inline-file handling, credential mounts, and teardown.
CLI integration
src/cli/standalone.py, src/cli/docker_compose.py, src/cli/main_parser.py, src/cli/BUILD
Added standalone and docker-compose subcommands, parser wiring, credential/variable parsing, error handling, and updated cli_lib BUILD to include new sources and deps.
Build files & test wiring
src/utils/BUILD, src/utils/tests/BUILD, cookbook/tutorials/BUILD, src/cli/BUILD
New Bazel library targets standalone_executor and compose_executor (public), py_test targets for executors (local), and tutorial_specs filegroup exposing *.yaml to tests; CLI BUILD updated to depend on new executors.
Tests
src/utils/tests/test_standalone_executor.py, src/utils/tests/test_compose_executor.py
Large new test suites: unit tests for parsing/validation/DAG/tokenization/scheduling and Docker-gated integration tests exercising end-to-end workflows, compose YAML generation, GPU/shm behavior, failure/cancellation semantics, and workdir preservation.
Docs & config
AGENTS.md, .gitignore
Documentation entries added for new executor modules and CLI commands; .gitignore now ignores .venv/, build/, and *.egg-info.

Sequence Diagram(s)

sequenceDiagram
    participant User as User/CLI
    participant CLI as CLI Parser
    participant SE as StandaloneExecutor
    participant DAG as DAG Builder
    participant Docker as Docker Engine

    User->>CLI: osmo standalone run (spec)
    CLI->>SE: run_workflow_standalone(spec_path, ...)
    SE->>SE: Load & validate spec
    SE->>DAG: Build dependency DAG
    DAG-->>SE: Task graph
    loop For each ready task
        SE->>SE: Build token map & write inline files
        SE->>Docker: docker run (volumes, GPU, shm)
        Docker-->>SE: Exit code / logs
        SE->>SE: Persist state, cancel downstream on fatal failures
    end
    SE-->>CLI: Return success/failure
    CLI-->>User: Exit code / message
Loading
sequenceDiagram
    participant User as User/CLI
    participant CLI as CLI Parser
    participant CE as ComposeExecutor
    participant DAG as DAG Builder
    participant Compose as Docker Compose

    User->>CLI: osmo docker-compose run (spec)
    CLI->>CE: run_workflow_compose(spec_path, ...)
    CE->>CE: Load & validate spec
    CE->>DAG: Build dependency DAG
    CE->>CE: Generate docker-compose.yml & networks
    loop While ready waves exist
        CE->>CE: Find next ready wave
        CE->>Compose: docker compose up --no-deps (wave)
        Compose-->>CE: Service exit codes (ps)
        CE->>CE: Record TaskResults, apply policies
        CE->>Compose: docker compose rm -f (wave)
    end
    CE->>Compose: docker compose down --remove-orphans
    CE-->>CLI: Return success/failure
    CLI-->>User: Exit code / message
Loading

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~120 minutes

Poem

🐇 I chewed the specs and stitched two lanes, one host and one composed sea,
Standalone hums on local soil, Compose waves tasks with glee,
Tokens tucked, files safe, GPUs wink a bright green eye,
Tests hop in rows, workdirs kept — the workflows leap and try.

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title '#800 - Local workflow execution' accurately describes the main change: adding local workflow execution capability via Docker without Kubernetes.
Docstring Coverage ✅ Passed Docstring coverage is 91.91% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/cli/local.py`:
- Around line 52-66: The parsed args.docker_cmd is not forwarded; update the
call in _run_local to pass docker_cmd=args.docker_cmd into
local_executor.run_workflow_locally, add a docker_cmd parameter to
run_workflow_locally in src/utils/local_executor.py, and thread that parameter
into the LocalExecutor instantiation/use so LocalExecutor receives and uses the
docker command (e.g., add docker_cmd to LocalExecutor.__init__ or the specific
method that launches containers and wire it through all intermediate calls).

In `@src/utils/local_executor.py`:
- Around line 273-295: run_workflow_locally currently always removes work_dir on
success which can delete a caller-supplied directory; change it to only remove
the directory when it was created inside the function. Introduce a flag (e.g.,
created_temp_dir) before calling tempfile.mkdtemp and set it true when you
create the temp directory; then only call shutil.rmtree(work_dir, ...) when
success is True, keep_work_dir is False, and created_temp_dir is True. Update
references in run_workflow_locally and any related cleanup logic so
user-provided work_dir is never removed.
- Around line 228-247: The debug log prints the fully expanded docker_args
(including '-e KEY=value' entries), exposing secrets from task_spec.environment;
update the logic that builds or logs docker_args (references: docker_args,
task_spec.environment, _substitute_tokens, logger.debug) to redact environment
values before logging—e.g., replace values for any '-e KEY=...' entries with a
placeholder like 'REDACTED' (or construct a sanitized copy of docker_args that
masks environment values) and log that sanitized list instead of the original;
ensure actual docker_args passed to subprocess remain unchanged so execution is
unaffected.
- Around line 90-113: The method currently treats an empty run as success when
no tasks were executed (e.g., DAG cycle). After the ready loop, check if no
tasks were executed (self._results is empty or a counter is zero) while there
are tasks defined (self._task_nodes not empty); if so, log an error indicating
the workflow is stalled/contains a cycle (include spec.name) and return False.
Use the existing symbols _find_ready_tasks, _results, _task_nodes and spec.name
to implement this early failure path so stalled workflows are reported as
failures.

In `@src/utils/tests/BUILD`:
- Around line 34-42: The py_test target named "test_local_executor" is marked
manual which hides its coverage from CI; remove the tags = ["manual"] attribute
from the "test_local_executor" target so the suite runs in CI, and if you must
keep Docker-only cases manual, split them out of test_local_executor.py into a
separate test file and add a new py_test target (e.g.,
"test_local_executor_docker") that depends on the Docker-only tests and retains
tags = ["manual"] instead of marking the whole "test_local_executor" target
manual.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: ff387238-7afc-4e5c-8e25-06298591e48d

📥 Commits

Reviewing files that changed from the base of the PR and between 762ea59 and cfbba1d.

📒 Files selected for processing (7)
  • src/cli/BUILD
  • src/cli/local.py
  • src/cli/main_parser.py
  • src/utils/BUILD
  • src/utils/local_executor.py
  • src/utils/tests/BUILD
  • src/utils/tests/test_local_executor.py

Comment thread src/cli/standalone.py
Comment thread src/utils/standalone_executor.py
Comment thread src/utils/local_executor.py Outdated
Comment thread src/utils/local_executor.py Outdated
Comment thread src/utils/tests/BUILD
- Implemented GPU resource handling in LocalExecutor to allow tasks to request GPU resources.
- Added a new method to determine the GPU count for tasks based on their resource specifications.
- Updated Docker run command to include GPU options when applicable.
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

♻️ Duplicate comments (3)
src/utils/local_executor.py (3)

91-114: ⚠️ Potential issue | 🔴 Critical

Report stalled DAGs as failures.

If ready becomes empty before every node has a result, this falls through to the success log even though some tasks never ran. A cycle is the obvious case, but any permanently blocked dependency hits the same path.

Suggested fix
         while ready:
             for task_name in ready:
                 node = self._task_nodes[task_name]
                 logger.info('--- Running task: %s (image: %s) ---', task_name, node.spec.image)
                 result = self._run_task(node, spec)
@@
 
             ready = self._find_ready_tasks()
 
+        unresolved_tasks = sorted(set(self._task_nodes) - set(self._results))
+        if unresolved_tasks:
+            logger.error(
+                'Workflow "%s" stalled with unresolved tasks: %s',
+                spec.name,
+                ', '.join(unresolved_tasks),
+            )
+            return False
+
         failed = [name for name, r in self._results.items() if r.exit_code != 0]
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/utils/local_executor.py` around lines 91 - 114, The loop exits as success
when no ready tasks remain even if some nodes never ran; update the run loop
after ready = self._find_ready_tasks() to detect a stalled DAG by comparing
self._results keys against all task names in self._task_nodes (or using a
helper) and if there are unreached tasks (e.g., due to a cycle or blocked
dependency) log an error listing those task names and return False instead of
proceeding to success; reference the methods/attributes _find_ready_tasks,
_results, _task_nodes, and leave existing failure handling via
_cancel_downstream/_run_task intact.

286-290: ⚠️ Potential issue | 🔴 Critical

Only delete work directories created inside this helper.

A successful run currently rmtree()s any work_dir, including a caller-supplied path. --work-dir should never wipe an existing folder on the normal success path.

Suggested fix
 def run_workflow_locally(spec_path: str, work_dir: str | None = None,
                          keep_work_dir: bool = False) -> bool:
-    if work_dir is None:
+    created_temp_work_dir = work_dir is None
+    if created_temp_work_dir:
         work_dir = tempfile.mkdtemp(prefix='osmo-local-')
         logger.info('Using temporary work directory: %s', work_dir)
@@
-    if not keep_work_dir and success:
+    if created_temp_work_dir and not keep_work_dir and success:
         logger.info('Cleaning up work directory: %s', work_dir)
         shutil.rmtree(work_dir, ignore_errors=True)

Also applies to: 306-308

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/utils/local_executor.py` around lines 286 - 290, The current
run_workflow_locally function always removes work_dir on success which can
delete caller-supplied folders; modify run_workflow_locally to track whether the
helper created the directory (e.g., set created_temp_dir = True when assigning
work_dir = tempfile.mkdtemp(...)) and only call shutil.rmtree(work_dir) at
cleanup when created_temp_dir is True and keep_work_dir is False; ensure all
cleanup branches (including the success path and exception paths around the
existing logic that references work_dir and keep_work_dir) use this flag so
externally provided work_dir is never deleted by the helper.

241-260: ⚠️ Potential issue | 🟠 Major

Redact env values before logging the Docker command.

By the time this debug log runs, docker_args already contains resolved -e KEY=value pairs, so any secret passed through environment: is written to logs verbatim.

Suggested fix
-        logger.debug('Docker command: %s', ' '.join(docker_args))
+        redacted_docker_args = list(docker_args)
+        for index, value in enumerate(redacted_docker_args[:-1]):
+            if value == '-e':
+                key = redacted_docker_args[index + 1].split('=', 1)[0]
+                redacted_docker_args[index + 1] = f'{key}=***'
+        logger.debug('Docker command: %s', ' '.join(redacted_docker_args))
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/utils/local_executor.py` around lines 241 - 260, The docker_args list
currently includes resolved environment pairs from task_spec.environment and is
logged verbatim by logger.debug('Docker command: %s', ' '.join(docker_args));
change this so secrets are redacted before logging by creating a sanitized copy
of docker_args that replaces any '-e KEY=value' entries (or items following
'-e') with '-e KEY=REDACTED' (or only '-e KEY') and log that sanitized list
instead; locate the block that builds docker_args (references:
task_spec.environment, docker_args, resolved_value, logger.debug) and use the
sanitized copy in the debug call rather than the original docker_args.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/utils/local_executor.py`:
- Around line 162-173: Local execution currently ignores TaskSpec.privileged and
TaskSpec.hostNetwork causing workflows to run with different semantics; update
the local executor to reject these flags like other unsupported features by
checking task_spec.privileged and task_spec.hostNetwork and appending
descriptive messages to the unsupported_features list (use f'Task
"{task_spec.name}": privileged containers are not supported by local executor'
and f'Task "{task_spec.name}": hostNetwork is not supported by local executor')
so callers see and block workflows that rely on those settings; locate checks
around unsupported_features and task_spec in the local_executor code and add
these two checks.
- Around line 295-300: The current Jinja guard in local_executor.py uses the
wrong block marker '{%%' so blocks like '{% ... %}' slip past; update the
template_markers tuple used where spec_text is checked (the template_markers
variable and the any(marker in spec_text for marker in template_markers) check)
to look for the correct Jinja start markers (use '{%' and '{#' and keep
'default-values'), then run the existing any(...) check and keep the same
ValueError raise path so templated specs are rejected earlier with the clear
message.
- Line 2: Add a local pylint suppression for the long SPDX copyright header by
inserting a single-line comment "# pylint: disable=line-too-long" immediately
after the SPDX header (the long copyright line at the top of
src/utils/local_executor.py) so the file-level long-line rule is silenced for
that header only; ensure the comment is placed directly below the header and
does not alter any other lines or logic.
- Around line 208-213: The _task_gpu_count function currently looks up resources
via spec.resources[task_spec.resource] but TaskSpec exposes resources directly;
change _task_gpu_count to use task_spec.resources (the ResourceSpec object) to
read the gpu attribute (keep returning 0 if missing). Also ensure the Docker GPU
CLI argument construction used in subprocess.run keeps the literal quotes around
the device string (pass a single argv element containing the quoted string,
e.g., '"device=0,1"', so Docker receives the quoted value for --gpus).

---

Duplicate comments:
In `@src/utils/local_executor.py`:
- Around line 91-114: The loop exits as success when no ready tasks remain even
if some nodes never ran; update the run loop after ready =
self._find_ready_tasks() to detect a stalled DAG by comparing self._results keys
against all task names in self._task_nodes (or using a helper) and if there are
unreached tasks (e.g., due to a cycle or blocked dependency) log an error
listing those task names and return False instead of proceeding to success;
reference the methods/attributes _find_ready_tasks, _results, _task_nodes, and
leave existing failure handling via _cancel_downstream/_run_task intact.
- Around line 286-290: The current run_workflow_locally function always removes
work_dir on success which can delete caller-supplied folders; modify
run_workflow_locally to track whether the helper created the directory (e.g.,
set created_temp_dir = True when assigning work_dir = tempfile.mkdtemp(...)) and
only call shutil.rmtree(work_dir) at cleanup when created_temp_dir is True and
keep_work_dir is False; ensure all cleanup branches (including the success path
and exception paths around the existing logic that references work_dir and
keep_work_dir) use this flag so externally provided work_dir is never deleted by
the helper.
- Around line 241-260: The docker_args list currently includes resolved
environment pairs from task_spec.environment and is logged verbatim by
logger.debug('Docker command: %s', ' '.join(docker_args)); change this so
secrets are redacted before logging by creating a sanitized copy of docker_args
that replaces any '-e KEY=value' entries (or items following '-e') with '-e
KEY=REDACTED' (or only '-e KEY') and log that sanitized list instead; locate the
block that builds docker_args (references: task_spec.environment, docker_args,
resolved_value, logger.debug) and use the sanitized copy in the debug call
rather than the original docker_args.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: e830cb69-d490-4557-9c39-1898c8b721c2

📥 Commits

Reviewing files that changed from the base of the PR and between cfbba1d and 7f16d4f.

📒 Files selected for processing (1)
  • src/utils/local_executor.py

Comment thread src/utils/local_executor.py Outdated
Comment thread src/utils/standalone_executor.py Outdated
Comment thread src/utils/standalone_executor.py
Comment thread src/utils/local_executor.py Outdated
- Adjusted the handling of the resolved_command to correctly set the entrypoint and append arguments.
- Ensured that the first element of resolved_command is used as the entrypoint while the rest are appended to the Docker command.
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (5)
src/utils/local_executor.py (5)

91-114: ⚠️ Potential issue | 🔴 Critical

Stalled/cyclic workflows incorrectly report success.

If the DAG contains a cycle or no tasks are initially ready, ready is empty on the first pass, the while loop never executes, _results stays empty, and the method falls through to the success log at line 113 even though no tasks ran.

Proposed fix to detect stalled workflows
         while ready:
             for task_name in ready:
                 node = self._task_nodes[task_name]
                 logger.info('--- Running task: %s (image: %s) ---', task_name, node.spec.image)
                 result = self._run_task(node, spec)
                 self._results[task_name] = result

                 if result.exit_code != 0:
                     logger.error('Task "%s" failed with exit code %d', task_name, result.exit_code)
                     self._cancel_downstream(task_name)
                     return False

                 logger.info('Task "%s" completed successfully', task_name)

             ready = self._find_ready_tasks()

+        unresolved_tasks = sorted(set(self._task_nodes) - set(self._results))
+        if unresolved_tasks:
+            logger.error(
+                'Workflow "%s" stalled with unresolved tasks (possible cycle): %s',
+                spec.name,
+                ', '.join(unresolved_tasks),
+            )
+            return False
+
         failed = [name for name, r in self._results.items() if r.exit_code != 0]
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/utils/local_executor.py` around lines 91 - 114, The run loop can silently
succeed when no tasks ever become ready (e.g., cycles) because ready is empty
and _results stays empty; modify the logic in the method using
_find_ready_tasks, _task_nodes and _results to detect stalled workflows: if
ready is empty but there remain tasks not in _results (i.e., pending tasks
exist), log an error indicating a stalled/cyclic workflow (include spec.name and
the list of pending task names) and return False instead of falling through to
success; alternatively detect lack of progress between iterations (no newly
completed tasks) and treat that as failure, calling the same error path (and
keep existing _cancel_downstream usage where appropriate).

297-302: ⚠️ Potential issue | 🟡 Minor

Fix the Jinja block marker check.

'{%%' never matches a normal {% ... %} block, so templated specs skip this guard and fail later with unclear errors.

-    template_markers = ('{%%', '{#', 'default-values')
+    template_markers = ('{%', '{#', 'default-values')
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/utils/local_executor.py` around lines 297 - 302, The guard that detects
Jinja templates is using the wrong marker '{%%' so it never matches normal Jinja
block syntax and lets templated specs through; update the template_markers used
when scanning spec_text (the tuple named template_markers near the ValueError
raise) to include the correct block marker '{%' (and also add '{{' for
expression markers and keep '{#' and 'default-values') so the any(marker in
spec_text for marker in template_markers) check correctly detects Jinja
templates and triggers the existing ValueError path.

162-173: ⚠️ Potential issue | 🟠 Major

Missing validation for privileged and hostNetwork flags.

TaskSpec exposes privileged and hostNetwork fields, but local execution ignores them. Workflows relying on either setting will be accepted but run with different container semantics than the cluster path.

Proposed fix
                 if task_spec.volumeMounts:
                     unsupported_features.append(
                         f'Task "{task_spec.name}": volumeMounts require cluster-level host paths')
+
+                if task_spec.privileged:
+                    unsupported_features.append(
+                        f'Task "{task_spec.name}": privileged mode is not supported in local execution')
+
+                if task_spec.hostNetwork:
+                    unsupported_features.append(
+                        f'Task "{task_spec.name}": hostNetwork is not supported in local execution')
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/utils/local_executor.py` around lines 162 - 173, local_executor currently
ignores TaskSpec.privileged and TaskSpec.hostNetwork, so workflows with those
flags are accepted but executed with different semantics; update the validation
in local_executor.py where unsupported_features is built (the block handling
task_spec.credentials/checkpoint/volumeMounts) to also detect
task_spec.privileged and task_spec.hostNetwork and append clear messages like
f'Task "{task_spec.name}": privileged requires cluster-level privileges' and
f'Task "{task_spec.name}": hostNetwork requires cluster networking' so these
workflows are rejected for local execution the same way other unsupported
features are.

288-313: ⚠️ Potential issue | 🔴 Critical

Don't delete caller-supplied work directories.

A successful run currently rmtree()s any work_dir, not just the temporary directory created inside this helper. If the user points --work-dir at an existing folder, it will be wiped after a normal success path.

Proposed fix
 def run_workflow_locally(spec_path: str, work_dir: str | None = None,
                          keep_work_dir: bool = False) -> bool:
+    created_temp_work_dir = work_dir is None
     if work_dir is None:
         work_dir = tempfile.mkdtemp(prefix='osmo-local-')
         logger.info('Using temporary work directory: %s', work_dir)
 
     with open(spec_path) as f:
         spec_text = f.read()
 
     template_markers = ('{%', '{#', 'default-values')
     if any(marker in spec_text for marker in template_markers):
         raise ValueError(
             'This spec uses Jinja templates which require server-side expansion.\n'
             'Run "osmo workflow submit --dry-run -f <spec>" first to get the expanded spec,\n'
             'then save that output and run it locally.')
 
     executor = LocalExecutor(work_dir=work_dir, keep_work_dir=keep_work_dir)
     spec = executor.load_spec(spec_text)
     success = executor.execute(spec)
 
-    if not keep_work_dir and success:
+    if created_temp_work_dir and not keep_work_dir and success:
         logger.info('Cleaning up work directory: %s', work_dir)
         shutil.rmtree(work_dir, ignore_errors=True)
     elif not success:
         logger.info('Work directory preserved for debugging: %s', work_dir)
 
     return success
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/utils/local_executor.py` around lines 288 - 313, The helper
run_workflow_locally currently always deletes work_dir on success which can wipe
a caller-supplied directory; change it to only remove the directory when it was
created inside this function (i.e., when you called tempfile.mkdtemp). Add a
flag (e.g., created_temp = True/False) set to True right after tempfile.mkdtemp
and only call shutil.rmtree(work_dir, ...) when created_temp is True and
keep_work_dir is False and execution succeeded; leave existing logging behavior
for preserved dirs and ensure created_temp is referenced around the
shutil.rmtree and logging so externally provided work_dir is never removed.

262-262: ⚠️ Potential issue | 🟠 Major

Avoid logging environment variable values that may contain secrets.

docker_args contains fully expanded -e KEY=value pairs from the workflow spec's environment field. The debug log prints them verbatim, potentially exposing tokens or passwords.

Proposed fix to redact environment values
-        logger.debug('Docker command: %s', ' '.join(docker_args))
+        redacted_args = list(docker_args)
+        for i, arg in enumerate(redacted_args[:-1]):
+            if arg == '-e':
+                key = redacted_args[i + 1].split('=', 1)[0]
+                redacted_args[i + 1] = f'{key}=***'
+        logger.debug('Docker command: %s', ' '.join(redacted_args))
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/utils/local_executor.py` at line 262, The debug log prints the full
docker_args (including expanded '-e KEY=value' entries) which may expose
secrets; update the logger.debug call to log a redacted copy instead: create a
redacted_args from docker_args by replacing environment-value parts (e.g. tokens
matching '-e...=value' or strings containing '-e' and '=') with just the key and
a placeholder (e.g. '-e KEY=REDACTED') and then log ' '.join(redacted_args)
instead of the original; use the existing docker_args variable and a small regex
or token-based transform (and add import re if needed) where
logger.debug('Docker command: %s', ' '.join(docker_args)) is currently called.
🧹 Nitpick comments (1)
src/utils/local_executor.py (1)

264-269: Subprocess call with spec-derived arguments is acceptable for local execution.

Static analysis flags S603 (subprocess with untrusted input). However, this is a local execution tool where the user explicitly provides the workflow spec. The trust boundary is appropriate - users running osmo local run are intentionally executing containers defined in their own spec files.

Consider adding a brief comment noting this design decision for future maintainers.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/utils/local_executor.py` around lines 264 - 269, Add a short comment
above the subprocess.run call in the local_executor (around the docker_args /
subprocess.run usage) explaining that docker_args are derived from the
user-provided workflow spec and thus this is an intentional local execution
design decision (trusted boundary), and append a linter suppression token (e.g.,
"# nosec") to the subprocess.run line to document/quiet static analysis S603;
reference the docker_args variable, the subprocess.run call, and the surrounding
try/except that returns TaskResult.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/utils/local_executor.py`:
- Around line 236-239: The GPU device string is being built with literal double
quotes which become part of the argument passed to Docker; in the GPU handling
branch (see docker_args, _task_gpu_count and logger usage) remove the extra
quotes so the second element appended is device=0,1 (no surrounding ") by
changing the formatted string from including quotes to a plain
f'device={",".join(...) }' so subprocess.run receives the unquoted device list.

---

Duplicate comments:
In `@src/utils/local_executor.py`:
- Around line 91-114: The run loop can silently succeed when no tasks ever
become ready (e.g., cycles) because ready is empty and _results stays empty;
modify the logic in the method using _find_ready_tasks, _task_nodes and _results
to detect stalled workflows: if ready is empty but there remain tasks not in
_results (i.e., pending tasks exist), log an error indicating a stalled/cyclic
workflow (include spec.name and the list of pending task names) and return False
instead of falling through to success; alternatively detect lack of progress
between iterations (no newly completed tasks) and treat that as failure, calling
the same error path (and keep existing _cancel_downstream usage where
appropriate).
- Around line 297-302: The guard that detects Jinja templates is using the wrong
marker '{%%' so it never matches normal Jinja block syntax and lets templated
specs through; update the template_markers used when scanning spec_text (the
tuple named template_markers near the ValueError raise) to include the correct
block marker '{%' (and also add '{{' for expression markers and keep '{#' and
'default-values') so the any(marker in spec_text for marker in template_markers)
check correctly detects Jinja templates and triggers the existing ValueError
path.
- Around line 162-173: local_executor currently ignores TaskSpec.privileged and
TaskSpec.hostNetwork, so workflows with those flags are accepted but executed
with different semantics; update the validation in local_executor.py where
unsupported_features is built (the block handling
task_spec.credentials/checkpoint/volumeMounts) to also detect
task_spec.privileged and task_spec.hostNetwork and append clear messages like
f'Task "{task_spec.name}": privileged requires cluster-level privileges' and
f'Task "{task_spec.name}": hostNetwork requires cluster networking' so these
workflows are rejected for local execution the same way other unsupported
features are.
- Around line 288-313: The helper run_workflow_locally currently always deletes
work_dir on success which can wipe a caller-supplied directory; change it to
only remove the directory when it was created inside this function (i.e., when
you called tempfile.mkdtemp). Add a flag (e.g., created_temp = True/False) set
to True right after tempfile.mkdtemp and only call shutil.rmtree(work_dir, ...)
when created_temp is True and keep_work_dir is False and execution succeeded;
leave existing logging behavior for preserved dirs and ensure created_temp is
referenced around the shutil.rmtree and logging so externally provided work_dir
is never removed.
- Line 262: The debug log prints the full docker_args (including expanded '-e
KEY=value' entries) which may expose secrets; update the logger.debug call to
log a redacted copy instead: create a redacted_args from docker_args by
replacing environment-value parts (e.g. tokens matching '-e...=value' or strings
containing '-e' and '=') with just the key and a placeholder (e.g. '-e
KEY=REDACTED') and then log ' '.join(redacted_args) instead of the original; use
the existing docker_args variable and a small regex or token-based transform
(and add import re if needed) where logger.debug('Docker command: %s', '
'.join(docker_args)) is currently called.

---

Nitpick comments:
In `@src/utils/local_executor.py`:
- Around line 264-269: Add a short comment above the subprocess.run call in the
local_executor (around the docker_args / subprocess.run usage) explaining that
docker_args are derived from the user-provided workflow spec and thus this is an
intentional local execution design decision (trusted boundary), and append a
linter suppression token (e.g., "# nosec") to the subprocess.run line to
document/quiet static analysis S603; reference the docker_args variable, the
subprocess.run call, and the surrounding try/except that returns TaskResult.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: bf7af37f-080d-422d-91a5-301615578ab8

📥 Commits

Reviewing files that changed from the base of the PR and between 7f16d4f and 1c71133.

📒 Files selected for processing (1)
  • src/utils/local_executor.py

Comment thread src/utils/local_executor.py Outdated
- Introduced `--resume` and `--from-step` options in the CLI to allow resuming previous runs.
- Implemented state management in `LocalExecutor` to save and restore task results.
- Enhanced logging to provide feedback on skipped tasks and remaining tasks during resumption.
- Added GPU detection improvements to handle scenarios where requested GPUs are unavailable.
- Added `docker_cmd` parameter to `run_workflow_locally` for customizable Docker command execution.
- Improved logging to redact sensitive information in Docker command arguments.
- Implemented error handling for unexecuted tasks in `LocalExecutor` to detect potential workflow stalls.
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
src/cli/local.py (1)

73-88: Consider marking service_client as unused.

The _run_local function receives service_client per the CLI dispatcher contract (see src/cli/cli.py:118) but doesn't use it since local execution bypasses the OSMO service. Consider adding an underscore prefix or a brief comment to signal this is intentional:

Suggested change
-def _run_local(service_client, args: argparse.Namespace):
+def _run_local(_service_client, args: argparse.Namespace):  # service_client unused for local runs
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/cli/local.py` around lines 73 - 88, The _run_local function accepts a
service_client parameter that is intentionally unused; mark it as unused by
renaming it to _service_client or _ (or add an inline comment like "#
service_client unused for local execution") so linters and readers understand
this is deliberate; update the function signature for _run_local and any
internal references accordingly to avoid linter warnings while preserving the
CLI dispatcher contract.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/cli/local.py`:
- Around line 1-17: The SPDX header docstring at the top of src/cli/local.py
contains lines exceeding 100 characters; add a module-level pylint suppression
for line-too-long to the file so the SPDX lines are allowed unwrapped. Insert a
single "# pylint: disable=line-too-long" comment near the top of the module
(e.g., immediately above or below the existing triple-quoted SPDX docstring) so
the header is exempt from the linter without modifying the license text.

---

Nitpick comments:
In `@src/cli/local.py`:
- Around line 73-88: The _run_local function accepts a service_client parameter
that is intentionally unused; mark it as unused by renaming it to
_service_client or _ (or add an inline comment like "# service_client unused for
local execution") so linters and readers understand this is deliberate; update
the function signature for _run_local and any internal references accordingly to
avoid linter warnings while preserving the CLI dispatcher contract.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: d904529c-e0cd-4ea4-b9f7-ac507838a76d

📥 Commits

Reviewing files that changed from the base of the PR and between 1c71133 and ffcdd72.

📒 Files selected for processing (4)
  • .gitignore
  • src/cli/local.py
  • src/utils/local_executor.py
  • src/utils/tests/BUILD
✅ Files skipped from review due to trivial changes (2)
  • src/utils/tests/BUILD
  • .gitignore

Comment thread src/cli/standalone.py
- Added detailed docstrings to functions and classes in `local.py` and `local_executor.py` to improve code readability and maintainability.
- Updated test cases in `test_local_executor.py` with descriptive comments to clarify the purpose of each test.
- Ensured consistency in documentation style across the codebase.
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (3)
src/utils/tests/test_local_executor.py (2)

159-160: Prefer specific exception types over bare Exception.

The tests at lines 159, 179, and 189 catch bare Exception. While this works, asserting a more specific exception type (e.g., ValueError or pydantic.ValidationError) would make test failures more informative and verify the correct error path.

This is a minor improvement for test quality.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/utils/tests/test_local_executor.py` around lines 159 - 160, Replace the
three tests that use self.assertRaises(Exception) to assert a more specific
error type when calling executor.load_spec; locate the assertions in the test
file where executor.load_spec(spec_text) is expected to fail and change the
raised type to the appropriate specific exception (e.g., ValueError or
pydantic.ValidationError) that the load_spec implementation actually raises so
the tests validate the correct error path.

1-17: Consider adding pylint suppression for long copyright header.

Similar to the production file, line 2 exceeds 100 characters. If the same linting rules apply to test files, add the suppression comment.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/utils/tests/test_local_executor.py` around lines 1 - 17, Add the same
pylint suppression for long lines at the top of this test module as in
production: insert a module-level pylint disable for "line-too-long" (e.g., the
same "# pylint: disable=line-too-long" comment used in the production file) so
the long copyright/header string in test_local_executor.py won't trigger the
linter.
src/utils/local_executor.py (1)

182-183: Specify explicit encoding for file operations.

Several open() calls lack an explicit encoding parameter. While Python 3 defaults to the locale encoding, explicitly specifying encoding='utf-8' ensures consistent behavior across systems.

Affected locations: lines 182, 189, 351, 450.

Example fix
-        with open(self._state_file_path, 'w') as f:
+        with open(self._state_file_path, 'w', encoding='utf-8') as f:
             json.dump(state, f, indent=2)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/utils/local_executor.py` around lines 182 - 183, Update all open() calls
in src/utils/local_executor.py to specify encoding='utf-8' to ensure consistent
file encoding; for example, in the state writer inside the LocalExecutor class
where you call with open(self._state_file_path, 'w') as f: json.dump(...), add
encoding='utf-8', and similarly update the other open() calls referenced (the
ones around the blocks using self._state_file_path, the read/write at the region
near line ~351, and the open call near line ~450) so all reads and writes use
open(..., encoding='utf-8').
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/utils/tests/test_local_executor.py`:
- Around line 759-774: The test_jinja_block_detected test currently uses the
nonstandard marker '{%% ... %%}' which mirrors an existing bug; update the spec
string to use real Jinja block syntax '{% ... %}' so the test detects standard
Jinja tags; locate the test_jinja_block_detected function and the temporary spec
created for run_workflow_locally and replace occurrences of '{%%' and '%%}' with
'{%' and '%}' respectively, keeping the rest of the assertion flow unchanged.

---

Nitpick comments:
In `@src/utils/local_executor.py`:
- Around line 182-183: Update all open() calls in src/utils/local_executor.py to
specify encoding='utf-8' to ensure consistent file encoding; for example, in the
state writer inside the LocalExecutor class where you call with
open(self._state_file_path, 'w') as f: json.dump(...), add encoding='utf-8', and
similarly update the other open() calls referenced (the ones around the blocks
using self._state_file_path, the read/write at the region near line ~351, and
the open call near line ~450) so all reads and writes use open(...,
encoding='utf-8').

In `@src/utils/tests/test_local_executor.py`:
- Around line 159-160: Replace the three tests that use
self.assertRaises(Exception) to assert a more specific error type when calling
executor.load_spec; locate the assertions in the test file where
executor.load_spec(spec_text) is expected to fail and change the raised type to
the appropriate specific exception (e.g., ValueError or
pydantic.ValidationError) that the load_spec implementation actually raises so
the tests validate the correct error path.
- Around line 1-17: Add the same pylint suppression for long lines at the top of
this test module as in production: insert a module-level pylint disable for
"line-too-long" (e.g., the same "# pylint: disable=line-too-long" comment used
in the production file) so the long copyright/header string in
test_local_executor.py won't trigger the linter.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 6eefeced-a5c1-410f-95df-3ce162b20399

📥 Commits

Reviewing files that changed from the base of the PR and between ffcdd72 and 0bf8bd5.

📒 Files selected for processing (3)
  • src/cli/local.py
  • src/utils/local_executor.py
  • src/utils/tests/test_local_executor.py
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/cli/local.py

Comment thread src/utils/tests/test_standalone_executor.py
- Updated file operations in `local_executor.py` to explicitly use UTF-8 encoding when reading and writing files, ensuring better compatibility with various text formats.
- Adjusted exception handling in `test_local_executor.py` to raise `ValueError` instead of a generic `Exception` for clearer error reporting.
- Modified test documentation to reflect the correct Jinja block syntax in error messages.
@maufrancom
Copy link
Copy Markdown
Author

@coderabbitai resume

@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Apr 4, 2026

✅ Actions performed

Reviews resumed.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

♻️ Duplicate comments (4)
src/utils/local_executor.py (3)

280-290: ⚠️ Potential issue | 🟠 Major

Reject privileged and hostNetwork in local mode.

Local execution currently accepts these flags but ignores them, so the same workflow can be accepted here with materially different container semantics than on the cluster. They should be added to unsupported_features alongside volumeMounts.

Suggested fix
                 if task_spec.volumeMounts:
                     unsupported_features.append(
                         f'Task "{task_spec.name}": volumeMounts require cluster-level host paths')
+                if task_spec.privileged:
+                    unsupported_features.append(
+                        f'Task "{task_spec.name}": privileged containers are not supported by local executor')
+                if task_spec.hostNetwork:
+                    unsupported_features.append(
+                        f'Task "{task_spec.name}": hostNetwork is not supported by local executor')
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/utils/local_executor.py` around lines 280 - 290, Add checks that reject
task_spec.privileged and task_spec.hostNetwork in local mode by appending
explanatory strings to the existing unsupported_features list (same style as the
existing checks for volumeMounts, checkpoint, and credentials). Locate the block
that inspects task_spec (the code that appends to unsupported_features for
task_spec.volumeMounts / task_spec.checkpoint / task_spec.credentials) and add
two entries: one like f'Task "{task_spec.name}": privileged containers are not
supported in local mode' for task_spec.privileged, and one like f'Task
"{task_spec.name}": hostNetwork is not supported in local mode' for
task_spec.hostNetwork, so these flags are explicitly rejected rather than
ignored.

446-468: ⚠️ Potential issue | 🔴 Critical

Don't delete caller-supplied work_dir on success.

This helper still rmtree()s any successful work_dir, even when it came from --work-dir. That can wipe user data after a normal run; only the temporary directory created inside this function should be auto-cleaned. This also means the current success-cleanup test is asserting the wrong contract.

Suggested fix
-    if work_dir is None:
+    created_temp_work_dir = work_dir is None
+    if created_temp_work_dir:
         work_dir = tempfile.mkdtemp(prefix='osmo-local-')
         logger.info('Using temporary work directory: %s', work_dir)
@@
-    if not keep_work_dir and success:
+    if created_temp_work_dir and not keep_work_dir and success:
         logger.info('Cleaning up work directory: %s', work_dir)
         shutil.rmtree(work_dir, ignore_errors=True)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/utils/local_executor.py` around lines 446 - 468, The code currently
always deletes work_dir on success even when the caller supplied it; modify the
logic that creates the temp dir to record ownership (e.g., set a flag like
created_work_dir = True only when tempfile.mkdtemp(...) is called), and change
the cleanup condition to only rmtree when created_work_dir is True (and not
keep_work_dir and success). Update references around LocalExecutor(...), spec =
executor.load_spec(...), and the final cleanup block to use that ownership flag
and adjust the success-cleanup test to assert that caller-supplied work_dir is
preserved.

1-17: ⚠️ Potential issue | 🟡 Minor

Add the local pylint suppression for the SPDX header.

Line 2 is still over the repo's 100-character limit, so this new module needs the file-level line-too-long suppression instead of reflowing the copyright text.

As per coding guidelines, "If copyright lines exceed 100 characters, add # pylint: disable=line-too-long comment instead of breaking into multiple lines".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/utils/local_executor.py` around lines 1 - 17, Add a file-level pylint
suppression for the SPDX header in this module by inserting a "# pylint:
disable=line-too-long" comment at the top of local_executor.py (as a
module-level suppression, e.g., immediately before or immediately after the
module docstring) so the long copyright line is exempt from the 100-character
rule.
src/cli/local.py (1)

1-17: ⚠️ Potential issue | 🟡 Minor

Add the local pylint suppression for the SPDX header.

Line 2 is still over the repo's 100-character limit, so this new CLI module needs the file-level line-too-long suppression.

As per coding guidelines, "If copyright lines exceed 100 characters, add # pylint: disable=line-too-long comment instead of breaking into multiple lines".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/cli/local.py` around lines 1 - 17, The SPDX header exceeds the
100-character pylint limit; add a file-level pylint suppression for
line-too-long in this module by placing a "# pylint: disable=line-too-long"
comment at the top of src/cli/local.py (immediately after the module docstring
or as the first non-docstring line) so the long copyright/SPDX lines are ignored
by pylint; ensure the comment targets "line-too-long" specifically and avoid
modifying the existing SPDX text.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/cli/local.py`:
- Around line 76-87: The try/except should also catch filesystem-related errors
from local_executor.run_workflow_locally (e.g., FileNotFoundError and
PermissionError) and handle them the same way as ValueError: print the error
message to stderr and sys.exit(1). Update the except block around the call to
local_executor.run_workflow_locally to catch FileNotFoundError and
PermissionError (either as separate except clauses or combined with the existing
ValueError) and ensure the handler prints f'Error: {error}' to sys.stderr and
exits with code 1.

In `@src/utils/local_executor.py`:
- Around line 116-124: Reset the executor state and ensure task output dirs are
cleaned before scheduling: in execute() clear self._results at the start, then
update _setup_directories() (or call a small helper before scheduling) to remove
and re-create per-task output directories when not doing a full resume; when
resume or from_step is set, call _restore_completed_tasks(from_step) first and
then delete output directories for tasks that are not in the restored completed
set so stale files can't be reused. Reference execute(), _setup_directories(),
_restore_completed_tasks(), and the self._results member when making these
changes.

In `@src/utils/tests/test_local_executor.py`:
- Around line 1-17: Add a file-level pylint suppression for the SPDX header in
the test module test_local_executor.py by inserting a module-level comment to
disable the "line-too-long" lint (i.e. the pylint disable for line-too-long)
near the top of the file so the long copyright line is allowed; ensure the
directive is present before or immediately after the file docstring so pylint
skips the long SPDX lines.

---

Duplicate comments:
In `@src/cli/local.py`:
- Around line 1-17: The SPDX header exceeds the 100-character pylint limit; add
a file-level pylint suppression for line-too-long in this module by placing a "#
pylint: disable=line-too-long" comment at the top of src/cli/local.py
(immediately after the module docstring or as the first non-docstring line) so
the long copyright/SPDX lines are ignored by pylint; ensure the comment targets
"line-too-long" specifically and avoid modifying the existing SPDX text.

In `@src/utils/local_executor.py`:
- Around line 280-290: Add checks that reject task_spec.privileged and
task_spec.hostNetwork in local mode by appending explanatory strings to the
existing unsupported_features list (same style as the existing checks for
volumeMounts, checkpoint, and credentials). Locate the block that inspects
task_spec (the code that appends to unsupported_features for
task_spec.volumeMounts / task_spec.checkpoint / task_spec.credentials) and add
two entries: one like f'Task "{task_spec.name}": privileged containers are not
supported in local mode' for task_spec.privileged, and one like f'Task
"{task_spec.name}": hostNetwork is not supported in local mode' for
task_spec.hostNetwork, so these flags are explicitly rejected rather than
ignored.
- Around line 446-468: The code currently always deletes work_dir on success
even when the caller supplied it; modify the logic that creates the temp dir to
record ownership (e.g., set a flag like created_work_dir = True only when
tempfile.mkdtemp(...) is called), and change the cleanup condition to only
rmtree when created_work_dir is True (and not keep_work_dir and success). Update
references around LocalExecutor(...), spec = executor.load_spec(...), and the
final cleanup block to use that ownership flag and adjust the success-cleanup
test to assert that caller-supplied work_dir is preserved.
- Around line 1-17: Add a file-level pylint suppression for the SPDX header in
this module by inserting a "# pylint: disable=line-too-long" comment at the top
of local_executor.py (as a module-level suppression, e.g., immediately before or
immediately after the module docstring) so the long copyright line is exempt
from the 100-character rule.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: cde946a5-8785-4b48-9d4f-4f11c1022077

📥 Commits

Reviewing files that changed from the base of the PR and between ffcdd72 and a79bca8.

📒 Files selected for processing (3)
  • src/cli/local.py
  • src/utils/local_executor.py
  • src/utils/tests/test_local_executor.py

Comment thread src/cli/standalone.py
Comment thread src/utils/standalone_executor.py
…dules

- Expanded exception handling in `local.py` to include `FileNotFoundError` and `PermissionError` for improved robustness.
- Updated comments in `local_executor.py` to clarify unsupported features in local mode, specifically regarding privileged containers and host networking.
- Modified test case in `test_local_executor.py` to ensure caller-supplied work directories are preserved on success, enhancing test accuracy.
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
src/utils/local_executor.py (1)

116-124: ⚠️ Potential issue | 🟠 Major

Clear self._results at the start of execute() to prevent stale state.

execute() builds the DAG and clears _task_nodes, but _results is never reset. If an executor instance is reused (e.g., for different specs or repeated runs), stale results from a previous execution could incorrectly mark tasks as completed—especially for tasks with the same name across different workflow specs.

Proposed fix
     def execute(self, spec: workflow_module.WorkflowSpec,
                 resume: bool = False, from_step: str | None = None) -> bool:
         """Run all tasks in topological order, returning True if the entire workflow succeeds."""
+        self._results.clear()
         self._build_dag(spec)
         self._validate_for_local(spec)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/utils/local_executor.py` around lines 116 - 124, The execute() method can
reuse stale state in the instance because self._results is not reset; modify
execute (the method named execute in this class) to clear self._results (e.g.,
set to an empty dict) at the start of the method before DAG
execution/restoration logic runs so prior run results don't mark tasks as
completed, ensuring this reset happens before calling _build_dag,
_validate_for_local, _setup_directories and before
_restore_completed_tasks/from_step logic.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/utils/tests/test_local_executor.py`:
- Around line 1-17: The SPDX header block in the test file has lines over 100
characters and needs the pylint suppression; add a "# pylint:
disable=line-too-long" comment associated with the SPDX header (e.g.,
immediately after the header block or on the SPDX-License-Identifier line) so
pylint ignores the long copyright lines; target the existing SPDX header /
"SPDX-License-Identifier" lines in src/utils/tests/test_local_executor.py when
applying this change.

---

Duplicate comments:
In `@src/utils/local_executor.py`:
- Around line 116-124: The execute() method can reuse stale state in the
instance because self._results is not reset; modify execute (the method named
execute in this class) to clear self._results (e.g., set to an empty dict) at
the start of the method before DAG execution/restoration logic runs so prior run
results don't mark tasks as completed, ensuring this reset happens before
calling _build_dag, _validate_for_local, _setup_directories and before
_restore_completed_tasks/from_step logic.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 6ff38215-d7e5-45ad-a464-77e6c436e296

📥 Commits

Reviewing files that changed from the base of the PR and between a79bca8 and 27424cd.

📒 Files selected for processing (3)
  • src/cli/local.py
  • src/utils/local_executor.py
  • src/utils/tests/test_local_executor.py

Comment thread src/utils/tests/test_standalone_executor.py
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
src/utils/tests/test_local_executor.py (1)

580-746: Cover the remaining _validate_for_local() rejection branches.

src/utils/local_executor.py:262-303 also rejects credentials, checkpoint, volumeMounts, privileged, and hostNetwork, but this suite only exercises dataset/URL inputs/outputs plus the allowed files/environment paths. A small subTest table for the remaining fields would keep the unsupported-feature matrix from drifting untested.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/utils/tests/test_local_executor.py` around lines 580 - 746, Add unit
tests that exercise the remaining rejection branches in
LocalExecutor._validate_for_local by creating small spec variants that include
each unsupported field: credentials, checkpoint, volumeMounts, privileged, and
hostNetwork; for each field use a subTest row that loads the spec via
LocalExecutor._make_executor()/load_spec(), calls _build_dag(spec) and asserts
_validate_for_local(spec) raises ValueError, and verify the raised message
mentions the offending feature (e.g., assertIn('credentials', str(exc))) so all
branches in _validate_for_local are covered.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/utils/tests/test_local_executor.py`:
- Around line 817-818: Move the negative-path tests out from under the
class-level `@unittest.skipUnless`(DOCKER_AVAILABLE, SKIP_DOCKER_MSG) decorator so
they run even when Docker is unavailable: locate the TestDockerExecution class
and extract the test methods test_docker_not_found_graceful_failure,
test_unsupported_spec_*, test_template_hello_world, and
test_nonexistent_file_raises into a non-Docker-gated test class (or place
method-level skips only on tests that actually call containers), since these
tests exercise early logic in run_workflow_locally() and _validate_for_local()
(invoked during executor.execute()) and do not require Docker; ensure the moved
tests still import/instantiate the same helper utilities and keep any
Docker-specific assertions confined to tests that truly start containers.
- Around line 1321-1326: The helper _run_cookbook_spec currently calls
self.skipTest when the cookbook spec file is missing; change this to a fail-fast
assertion by replacing the skip with self.assertTrue(os.path.exists(spec_path),
f'Cookbook file not found: {spec_path}') so missing data files cause a test
failure rather than silently skipping; update the logic inside
_run_cookbook_spec (referenced symbol) to assert existence before calling
run_workflow_locally.

---

Nitpick comments:
In `@src/utils/tests/test_local_executor.py`:
- Around line 580-746: Add unit tests that exercise the remaining rejection
branches in LocalExecutor._validate_for_local by creating small spec variants
that include each unsupported field: credentials, checkpoint, volumeMounts,
privileged, and hostNetwork; for each field use a subTest row that loads the
spec via LocalExecutor._make_executor()/load_spec(), calls _build_dag(spec) and
asserts _validate_for_local(spec) raises ValueError, and verify the raised
message mentions the offending feature (e.g., assertIn('credentials', str(exc)))
so all branches in _validate_for_local are covered.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: a22922da-486c-48bc-9539-e19174804a2d

📥 Commits

Reviewing files that changed from the base of the PR and between 27424cd and 313466b.

📒 Files selected for processing (1)
  • src/utils/tests/test_local_executor.py

Comment thread src/utils/tests/test_standalone_executor.py
Comment thread src/utils/tests/test_local_executor.py Outdated
- Introduced `--shm-size` argument in the CLI for specifying shared memory size for GPU containers, defaulting to 16g.
- Updated `LocalExecutor` to accept and utilize the shared memory size during Docker command construction.
- Added unit tests to verify correct handling of shared memory size for both default and custom values in GPU tasks, ensuring no shared memory argument is included for non-GPU tasks.
- Created a new `tutorial_specs` filegroup in the `cookbook/tutorials/BUILD` to include YAML specifications.
- Updated the `BUILD` file in `src/utils/tests` to include the new `tutorial_specs` as data for local tests.
- Added a new test class in `test_local_executor.py` to validate unsupported features in cookbook specifications, ensuring proper error handling for unsupported fields.
- Implemented additional tests to verify that specific unsupported features are correctly rejected during local execution.
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (4)
src/cli/local.py (1)

1-3: ⚠️ Potential issue | 🟡 Minor

Move the pylint directive outside the docstring.

# pylint: disable=line-too-long is currently inside the triple-quoted string, so pylint will not treat it as a directive.

Suggested fix
+# pylint: disable=line-too-long
 """
-SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.  # pylint: disable=line-too-long
+SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.

As per coding guidelines, "If copyright lines exceed 100 characters, add # pylint: disable=line-too-long comment instead of breaking into multiple lines".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/cli/local.py` around lines 1 - 3, The pylint directive is currently
inside the module docstring (the top triple-quoted string) so it is ignored;
move the `# pylint: disable=line-too-long` comment outside the docstring
(immediately after the closing triple quotes or at the top of the file) so
pylint recognizes it, leaving the copyright text unchanged and keeping the
directive adjacent to the module docstring for clarity.
src/utils/tests/test_local_executor.py (1)

1-3: ⚠️ Potential issue | 🟡 Minor

Place the pylint long-line suppression as a real comment, not inside the docstring.

Current placement won’t be interpreted by pylint.

Suggested fix
+# pylint: disable=line-too-long
 """
-SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.  # pylint: disable=line-too-long
+SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.

As per coding guidelines, "If copyright lines exceed 100 characters, add # pylint: disable=line-too-long comment instead of breaking into multiple lines".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/utils/tests/test_local_executor.py` around lines 1 - 3, The copyright
header currently embeds "# pylint: disable=line-too-long" inside the module
docstring, which pylint will ignore; remove that token from the docstring and
place a real inline comment (e.g., "# pylint: disable=line-too-long") on its own
line immediately above or below the docstring so the suppression is recognized;
update the module-level string in src/utils/tests/test_local_executor.py
accordingly.
src/utils/local_executor.py (2)

120-129: ⚠️ Potential issue | 🟠 Major

Reset execution state and clean pending task outputs before scheduling.

execute() currently reuses self._results across runs and does not purge stale output directories for tasks that will run again. This can incorrectly skip tasks or consume stale artifacts.

Suggested fix
 def execute(self, spec: workflow_module.WorkflowSpec,
             resume: bool = False, from_step: str | None = None) -> bool:
     """Run all tasks in topological order, returning True if the entire workflow succeeds."""
+    self._results.clear()
     self._build_dag(spec)
     self._validate_for_local(spec)
-    self._setup_directories()
 
     if resume or from_step:
         self._restore_completed_tasks(from_step)
+
+    self._setup_directories()
+    for task_name in self._task_nodes:
+        if task_name in self._results:
+            continue
+        task_dir = os.path.join(self._work_dir, task_name)
+        shutil.rmtree(task_dir, ignore_errors=True)
+        os.makedirs(os.path.join(task_dir, 'output'), exist_ok=True)

Also applies to: 309-314

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/utils/local_executor.py` around lines 120 - 129, The execute() method is
reusing self._results and leaving stale task output directories, causing skipped
or stale artifacts; fix by resetting execution state and cleaning pending
outputs before scheduling: in execute (and the analogous flow used around the
resume/from_step logic), clear or reinitialize self._results, clear any per-run
state used to mark tasks as completed, and remove/clean output directories for
tasks that will be re-run (i.e., tasks not restored as completed by
_restore_completed_tasks) before creating the scheduling dag; ensure these
changes integrate with _build_dag, _setup_directories and
_restore_completed_tasks so restored tasks keep their outputs but any
pending/rerun tasks have their outputs removed.

1-3: ⚠️ Potential issue | 🟡 Minor

Make the pylint suppression effective by moving it outside the module docstring.

The current inline text is part of the string literal and won’t suppress linting.

Suggested fix
+# pylint: disable=line-too-long
 """
-SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.  # pylint: disable=line-too-long
+SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.

As per coding guidelines, "If copyright lines exceed 100 characters, add # pylint: disable=line-too-long comment instead of breaking into multiple lines".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/utils/local_executor.py` around lines 1 - 3, The pylint suppression is
currently inside the module docstring and therefore not effective; move the "#
pylint: disable=line-too-long" token out of the triple-quoted string so it is a
real comment. Specifically, edit the top of the module around the module
docstring (the triple-quoted string containing the SPDX copyright text) to place
the pylint disable comment on its own line either immediately above or
immediately below the docstring so that the suppression applies.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/cli/local.py`:
- Around line 27-38: Add entries to the Codebase Structure section of AGENTS.md
documenting the new local-execution components: describe the new CLI subcommand
implemented by setup_parser in src/cli/local.py (the 'local' subcommand with its
nested 'run' action) and the utility module src/utils/local_executor.py (explain
its purpose, e.g., orchestrating local Docker workflow execution and any primary
class or function such as LocalExecutor or execute_locally). Keep the
description concise, include where to find usage examples or flags (mention
'run' subcommand), and note any high-level responsibilities so readers know
these files manage local workflow execution.

In `@src/utils/local_executor.py`:
- Around line 359-364: The code joins file_spec.path into files_dir without
sanitizing, allowing path traversal (e.g., ../../) to escape files_dir; update
the materialization logic (the loop over task_spec.files in local_executor.py
that calls self._substitute_tokens and computes host_path) to: 1) normalize and
resolve the target path using os.path.abspath/os.path.realpath for both
files_dir and the final host_path, 2) ensure the resolved host_path is inside
the resolved files_dir using os.path.commonpath (or os.path.commonprefix
carefully) and reject or raise an exception if it would escape, and 3) only
create/write files when the containment check passes; apply the same
sanitization/containment check to the other inline-file materialization site
(the similar block handling file_spec.path later in the file).

---

Duplicate comments:
In `@src/cli/local.py`:
- Around line 1-3: The pylint directive is currently inside the module docstring
(the top triple-quoted string) so it is ignored; move the `# pylint:
disable=line-too-long` comment outside the docstring (immediately after the
closing triple quotes or at the top of the file) so pylint recognizes it,
leaving the copyright text unchanged and keeping the directive adjacent to the
module docstring for clarity.

In `@src/utils/local_executor.py`:
- Around line 120-129: The execute() method is reusing self._results and leaving
stale task output directories, causing skipped or stale artifacts; fix by
resetting execution state and cleaning pending outputs before scheduling: in
execute (and the analogous flow used around the resume/from_step logic), clear
or reinitialize self._results, clear any per-run state used to mark tasks as
completed, and remove/clean output directories for tasks that will be re-run
(i.e., tasks not restored as completed by _restore_completed_tasks) before
creating the scheduling dag; ensure these changes integrate with _build_dag,
_setup_directories and _restore_completed_tasks so restored tasks keep their
outputs but any pending/rerun tasks have their outputs removed.
- Around line 1-3: The pylint suppression is currently inside the module
docstring and therefore not effective; move the "# pylint:
disable=line-too-long" token out of the triple-quoted string so it is a real
comment. Specifically, edit the top of the module around the module docstring
(the triple-quoted string containing the SPDX copyright text) to place the
pylint disable comment on its own line either immediately above or immediately
below the docstring so that the suppression applies.

In `@src/utils/tests/test_local_executor.py`:
- Around line 1-3: The copyright header currently embeds "# pylint:
disable=line-too-long" inside the module docstring, which pylint will ignore;
remove that token from the docstring and place a real inline comment (e.g., "#
pylint: disable=line-too-long") on its own line immediately above or below the
docstring so the suppression is recognized; update the module-level string in
src/utils/tests/test_local_executor.py accordingly.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 15fe56f2-ae20-4bf5-a292-03af00b2f491

📥 Commits

Reviewing files that changed from the base of the PR and between 313466b and e5adf29.

📒 Files selected for processing (5)
  • cookbook/tutorials/BUILD
  • src/cli/local.py
  • src/utils/local_executor.py
  • src/utils/tests/BUILD
  • src/utils/tests/test_local_executor.py
✅ Files skipped from review due to trivial changes (1)
  • cookbook/tutorials/BUILD
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/utils/tests/BUILD

Comment thread src/cli/standalone.py
…traversal

- Enhanced the `LocalExecutor` class to validate file paths, ensuring they do not escape the task directory. This prevents potential security risks associated with directory traversal attacks.
- Added unit tests in `test_local_executor.py` to verify that invalid file paths raise appropriate exceptions, while valid paths are accepted without errors.
- Updated documentation in `AGENTS.md` to include the new local executor functionality for Docker-based workflow execution.
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (2)
src/utils/local_executor.py (2)

120-128: ⚠️ Potential issue | 🟠 Major

Clear _results at the start of execute() to support executor reuse.

If execute() is called multiple times on the same LocalExecutor instance (e.g., running different specs or retrying), stale results from the previous run persist in _results. This can cause tasks to be incorrectly skipped or produce wrong scheduling decisions. The _build_dag() method clears _task_nodes but _results is never reset.

Proposed fix
     def execute(self, spec: workflow_module.WorkflowSpec,
                 resume: bool = False, from_step: str | None = None) -> bool:
         """Run all tasks in topological order, returning True if the entire workflow succeeds."""
+        self._results.clear()
         self._build_dag(spec)
         self._validate_for_local(spec)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/utils/local_executor.py` around lines 120 - 128, The executor retains
stale task outcomes because _results is not reset when execute() runs again;
modify LocalExecutor.execute to clear the instance result store (reset
self._results) at the start of the method before calling
_build_dag/_validate_for_local so previous-run results won't affect scheduling
or skipping; ensure this reset happens prior to the resume/from_step logic so
_restore_completed_tasks can repopulate _results correctly when resuming.

385-388: ⚠️ Potential issue | 🟠 Major

Remove literal quotes from GPU device argument.

When using subprocess.run with a list (no shell=True), arguments are passed directly without shell interpretation. The literal double-quotes in the f-string become part of the argument value passed to Docker, e.g., "device=0,1" instead of device=0,1. Docker's --gpus flag expects the unquoted value when receiving arguments via direct invocation.

Proposed fix
             elif gpu_count > available:
                 logger.warning(
                     'Task "%s" requests %d GPU(s) but only %d available — running with %d GPU(s)',
                     node.name, gpu_count, available, available)
-                docker_args += ['--gpus', f'"device={",".join(str(i) for i in range(available))}"']
+                docker_args += ['--gpus', f'device={",".join(str(i) for i in range(available))}']
             else:
-                docker_args += ['--gpus', f'"device={",".join(str(i) for i in range(gpu_count))}"']
+                docker_args += ['--gpus', f'device={",".join(str(i) for i in range(gpu_count))}']
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/utils/local_executor.py` around lines 385 - 388, The GPU device string is
being built with literal double-quotes so docker_args gets entries like
'"device=0,1"' which pass the quotes into Docker; update the two places that
append to docker_args (the branches that use available and gpu_count) to remove
the surrounding double-quotes in the f-strings so they produce device=0,1 (e.g.,
change f'"device={...}"' to f'device={...}'), keeping the rest of the list-based
subprocess invocation and logger.info(node.name, gpu_count, available)
unchanged.
🧹 Nitpick comments (1)
src/utils/tests/test_local_executor.py (1)

751-817: Consider making the class attribute immutable or documenting it's intentionally shared.

The _UNSUPPORTED_SPECS dictionary is a mutable class attribute (RUF012). While this is safe since the tests only read from it, making it a module-level constant or using types.MappingProxyType would better express the intent.

Optional: Move to module level as a constant
+# Test data for unsupported feature validation
+_UNSUPPORTED_SPECS = {
+    'credentials': {
+        'yaml': textwrap.dedent('''\
+            ...
+        '''),
+        'expected_substring': 'credentials',
+    },
+    # ... rest of specs
+}
+
+
 class TestValidateForLocalRemainingBranches(unittest.TestCase):
     """Verify that _validate_for_local rejects credentials, checkpoint, volumeMounts, privileged, and hostNetwork."""
 
-    _UNSUPPORTED_SPECS = {
-        ...
-    }
-
     def test_unsupported_fields_rejected(self):
         """Each unsupported task-level field is detected and rejected with a descriptive error."""
-        for feature, case in self._UNSUPPORTED_SPECS.items():
+        for feature, case in _UNSUPPORTED_SPECS.items():
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/utils/tests/test_local_executor.py` around lines 751 - 817, The
_UNSUPPORTED_SPECS class attribute is a mutable dict; make its immutability
explicit by either moving it to module-level as a constant (e.g.,
UNSUPPORTED_SPECS) or wrapping it with types.MappingProxyType before assigning
back to _UNSUPPORTED_SPECS so consumers cannot mutate it; update any references
to the attribute accordingly and/or add a short comment documenting that this is
intentionally shared/read-only.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Duplicate comments:
In `@src/utils/local_executor.py`:
- Around line 120-128: The executor retains stale task outcomes because _results
is not reset when execute() runs again; modify LocalExecutor.execute to clear
the instance result store (reset self._results) at the start of the method
before calling _build_dag/_validate_for_local so previous-run results won't
affect scheduling or skipping; ensure this reset happens prior to the
resume/from_step logic so _restore_completed_tasks can repopulate _results
correctly when resuming.
- Around line 385-388: The GPU device string is being built with literal
double-quotes so docker_args gets entries like '"device=0,1"' which pass the
quotes into Docker; update the two places that append to docker_args (the
branches that use available and gpu_count) to remove the surrounding
double-quotes in the f-strings so they produce device=0,1 (e.g., change
f'"device={...}"' to f'device={...}'), keeping the rest of the list-based
subprocess invocation and logger.info(node.name, gpu_count, available)
unchanged.

---

Nitpick comments:
In `@src/utils/tests/test_local_executor.py`:
- Around line 751-817: The _UNSUPPORTED_SPECS class attribute is a mutable dict;
make its immutability explicit by either moving it to module-level as a constant
(e.g., UNSUPPORTED_SPECS) or wrapping it with types.MappingProxyType before
assigning back to _UNSUPPORTED_SPECS so consumers cannot mutate it; update any
references to the attribute accordingly and/or add a short comment documenting
that this is intentionally shared/read-only.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 08e56e8c-6b0c-47d6-a623-97e46ae520b2

📥 Commits

Reviewing files that changed from the base of the PR and between e5adf29 and 429aa84.

📒 Files selected for processing (3)
  • AGENTS.md
  • src/utils/local_executor.py
  • src/utils/tests/test_local_executor.py

- Updated the `LocalExecutor` class to remove unnecessary quotes around GPU device specifications in Docker command arguments, ensuring correct formatting.
- Cleared previous results at the start of the `execute` method to prevent data carryover between executions.
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

♻️ Duplicate comments (2)
src/utils/local_executor.py (2)

120-129: ⚠️ Potential issue | 🟠 Major

Restore state before creating task output directories.

_setup_directories() recreates every output directory before _restore_completed_tasks() checks os.path.isdir(info['output_dir']), so resume can skip tasks against brand-new empty dirs. It also leaves stale outputs in place for rerun / from_step tasks.

Also applies to: 204-209, 310-314

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/utils/local_executor.py` around lines 120 - 129, The code currently calls
_setup_directories() before _restore_completed_tasks(), causing freshly
recreated output dirs to prevent detecting completed tasks; change the order in
execute (and in the other places noted) so that
_restore_completed_tasks(from_step) is invoked before _setup_directories();
search for all call sites where _setup_directories() precedes
_restore_completed_tasks() and reorder them (e.g., inside the execute method and
the other similar run/restore entrypoints) to ensure restoration of
completed-task state happens prior to directory creation.

1-2: ⚠️ Potential issue | 🟡 Minor

Move the pylint pragma out of the module string.

# pylint: disable=line-too-long is currently part of the docstring text, so the linter never sees it.

Suggested fix
-"""
-SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.  # pylint: disable=line-too-long
+# pylint: disable=line-too-long
+"""
+SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.

As per coding guidelines, "If copyright lines exceed 100 characters, add # pylint: disable=line-too-long comment instead of breaking into multiple lines".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/utils/local_executor.py` around lines 1 - 2, The module-level docstring
currently contains the pylint pragma "# pylint: disable=line-too-long" as plain
text; remove that pragma from inside the triple-quoted string and add it as a
standalone comment immediately after the closing docstring (or at top of the
file) so pylint will recognize it; locate the module docstring in
local_executor.py and move the pragma out of the string into a top-level comment
(keeping the copyright text unchanged).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/utils/local_executor.py`:
- Around line 466-489: The temp work_dir is created before validating spec_text,
causing leaked dirs on early failures; either move the work_dir creation (the
created_work_dir/work_dir logic) to after the Jinja/template check (i.e., after
reading spec_text and verifying template_markers) or wrap the creation +
subsequent processing in a try/finally that will call shutil.rmtree(work_dir,
ignore_errors=True) when created_work_dir is true and keep_work_dir is false,
ensuring cleanup on exceptions; update the code paths around created_work_dir,
work_dir, LocalExecutor(...) and executor.load_spec/executor.execute to use the
new placement or the finally cleanup.
- Around line 180-185: The current state serialization in local_executor.py only
writes task names, exit codes and output dirs into the state dict (constructed
from self._results), which allows stale outputs to be reused across different
workflow revisions; update the state written to .osmo-state.json to include a
workflow-level hash (or per-task signature) derived from the current workflow
spec (e.g., spec hash or task signatures) when creating the state dict, and add
logic in the resume/load path that reads that stored hash and compares it to the
current workflow hash, refusing to accept/resume from the saved state if the
hashes differ and returning a clear error; ensure the change touches the state
construction (where state = {...}) and the corresponding state load/validation
code paths referenced around lines 204-221 so mismatched workflows are rejected.
- Around line 373-392: The current logic only appends '--shm-size' when GPUs are
requested, so a user-specified self._shm_size is ignored for CPU-only tasks;
move the shm size handling out of the GPU branch in LocalExecutor (referencing
docker_args, _task_gpu_count, _detect_available_gpus, self._shm_size and
DEFAULT_SHM_SIZE) so that after GPU detection/argument construction you always
compute shm_size = self._shm_size or self.DEFAULT_SHM_SIZE and append
['--shm-size', shm_size] to docker_args; ensure the value is string-compatible
with the docker args.

---

Duplicate comments:
In `@src/utils/local_executor.py`:
- Around line 120-129: The code currently calls _setup_directories() before
_restore_completed_tasks(), causing freshly recreated output dirs to prevent
detecting completed tasks; change the order in execute (and in the other places
noted) so that _restore_completed_tasks(from_step) is invoked before
_setup_directories(); search for all call sites where _setup_directories()
precedes _restore_completed_tasks() and reorder them (e.g., inside the execute
method and the other similar run/restore entrypoints) to ensure restoration of
completed-task state happens prior to directory creation.
- Around line 1-2: The module-level docstring currently contains the pylint
pragma "# pylint: disable=line-too-long" as plain text; remove that pragma from
inside the triple-quoted string and add it as a standalone comment immediately
after the closing docstring (or at top of the file) so pylint will recognize it;
locate the module docstring in local_executor.py and move the pragma out of the
string into a top-level comment (keeping the copyright text unchanged).
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: b1e87e8f-5942-4678-9825-714d120cd138

📥 Commits

Reviewing files that changed from the base of the PR and between 429aa84 and d08bf9b.

📒 Files selected for processing (1)
  • src/utils/local_executor.py

Comment thread src/utils/standalone_executor.py Outdated
Comment thread src/utils/local_executor.py Outdated
Comment thread src/utils/local_executor.py Outdated
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/cli/BUILD (1)

30-50: ⚠️ Potential issue | 🟠 Major

Wire the new Docker Compose CLI into cli_lib.

This BUILD target includes standalone.py, but the new src/cli/docker_compose.py module from this PR is still missing from srcs, and //src/utils:compose_executor is missing from deps. If main_parser.py imports that subcommand, Bazel builds/package assembly will fail or ship a CLI without the new command wired in.

Proposed fix
     srcs = [
         "access_token.py",
         "app.py",
         "bucket.py",
         "config.py",
         "credential.py",
         "data.py",
         "dataset.py",
+        "docker_compose.py",
         "editor.py",
         "formatters.py",
         "standalone.py",
         "login.py",
         "main_parser.py",
@@
         "//src/lib/utils:validation",
         "//src/lib/utils:version",
         "//src/lib/utils:workflow",
+        "//src/utils:compose_executor",
         "//src/utils:standalone_executor",
     ],
 )

Also applies to: 51-78

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/cli/BUILD` around lines 30 - 50, The BUILD srcs list is missing the new
CLI module and its runtime dependency; add "docker_compose.py" alongside
"standalone.py" in the srcs array and add the dependency
"//src/utils:compose_executor" to the deps for this target so the new Docker
Compose subcommand imported by main_parser.py is compiled and packaged
correctly; ensure any other identical srcs/deps blocks (lines 51-78 equivalent)
are updated the same way to wire the new docker_compose module into the cli_lib
target.
♻️ Duplicate comments (2)
src/utils/standalone_executor.py (2)

118-122: ⚠️ Potential issue | 🟠 Major

Reject non-mapping YAML before unpacking it.

yaml.safe_load() returns None for an empty file and can also return scalars/lists. In those cases, VersionedWorkflowSpec(**raw) throws TypeError, which skips the normal ValueError/stderr path in the CLI and surfaces an internal traceback instead of a clean validation error.

Proposed fix
     def load_spec(self, spec_text: str) -> workflow_module.WorkflowSpec:
         """Parse raw YAML text into a validated WorkflowSpec via the versioned spec model."""
         raw = yaml.safe_load(spec_text)
+        if not isinstance(raw, dict):
+            raise ValueError('Workflow spec must be a YAML mapping with a top-level "workflow" key')
         versioned = workflow_module.VersionedWorkflowSpec(**raw)
         return versioned.workflow
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/utils/standalone_executor.py` around lines 118 - 122, The load_spec
function should validate the YAML result before unpacking into
VersionedWorkflowSpec: check the local variable raw returned by
yaml.safe_load(spec_text) and if it is None or not an instance of dict/mapping,
raise a ValueError with a clear message about expecting a mapping workflow spec;
then only call workflow_module.VersionedWorkflowSpec(**raw) and return
versioned.workflow. This ensures load_spec rejects empty/scalar/list YAML inputs
instead of letting VersionedWorkflowSpec raise a TypeError.

567-583: ⚠️ Potential issue | 🟡 Minor

Clean up auto-created work dirs when setup or execution raises.

Once mkdtemp() succeeds, any exception from StandaloneExecutor(...), load_spec(...), or execute(...) leaks osmo-standalone-* when keep_work_dir is false, because cleanup only runs on the normal success/failure path.

Proposed fix
     created_work_dir = work_dir is None
     if work_dir is None:
         work_dir = tempfile.mkdtemp(prefix='osmo-standalone-')
         logger.info('Using temporary work directory: %s', work_dir)
 
-    executor = StandaloneExecutor(work_dir=work_dir, keep_work_dir=keep_work_dir,
-                                   docker_cmd=docker_cmd, shm_size=shm_size)
-    spec = executor.load_spec(spec_text)
-    success = executor.execute(spec, resume=resume or from_step is not None,
-                               from_step=from_step)
+    try:
+        executor = StandaloneExecutor(work_dir=work_dir, keep_work_dir=keep_work_dir,
+                                      docker_cmd=docker_cmd, shm_size=shm_size)
+        spec = executor.load_spec(spec_text)
+        success = executor.execute(spec, resume=resume or from_step is not None,
+                                   from_step=from_step)
+    except Exception:
+        if created_work_dir and not keep_work_dir:
+            shutil.rmtree(work_dir, ignore_errors=True)
+        raise
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/utils/standalone_executor.py` around lines 567 - 583, The code currently
creates a temp dir (mkdtemp) and then calls StandaloneExecutor(...),
executor.load_spec(...), and executor.execute(...), but only cleans the dir on
the normal success/failure path; wrap the executor creation, load_spec, and
execute calls in a try/finally so that any exception triggers cleanup: after
creating work_dir (when created_work_dir is True) ensure in the finally block
that if not keep_work_dir the directory is removed with shutil.rmtree(work_dir,
ignore_errors=True); keep the existing logging (info about using temp dir and
preserved dir) but move the final cleanup logic into the finally to guarantee no
leak even when StandaloneExecutor, load_spec, or execute raise.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/utils/compose_executor.py`:
- Around line 511-526: After creating a temporary work dir (created_work_dir)
but before constructing ComposeExecutor and calling ComposeExecutor.load_spec /
ComposeExecutor.execute, wrap the executor creation, spec loading and execution
in a try/finally so that if any exception occurs you still remove the
auto-created work dir when keep_work_dir is false; in the finally block check
created_work_dir and not keep_work_dir and then call shutil.rmtree(work_dir,
ignore_errors=True). Ensure variables referenced (created_work_dir, work_dir,
keep_work_dir, ComposeExecutor, load_spec, execute) are in scope and that
logging for preserved/cleaned up dirs is preserved or moved appropriately.
- Around line 86-95: The ComposeExecutor.execute override currently ignores the
resume and from_step flags from StandaloneExecutor.execute causing silent
behavior changes; update ComposeExecutor.execute(spec, resume=False,
from_step=None) to either implement resume semantics or explicitly reject
unsupported flags — e.g., at the top of the method check if resume or from_step
is set and raise a clear exception (ValueError or NotImplementedError)
indicating compose mode does not support resume/from_step yet, or implement the
resume logic by wiring resume/from_step into the DAG traversal in
_build_dag/_validate_for_compose/_generate_compose_file so execution begins at
the requested step; reference the execute method in ComposeExecutor and the
_build_dag/_generate_compose_file helpers to locate where to apply the change.

---

Outside diff comments:
In `@src/cli/BUILD`:
- Around line 30-50: The BUILD srcs list is missing the new CLI module and its
runtime dependency; add "docker_compose.py" alongside "standalone.py" in the
srcs array and add the dependency "//src/utils:compose_executor" to the deps for
this target so the new Docker Compose subcommand imported by main_parser.py is
compiled and packaged correctly; ensure any other identical srcs/deps blocks
(lines 51-78 equivalent) are updated the same way to wire the new docker_compose
module into the cli_lib target.

---

Duplicate comments:
In `@src/utils/standalone_executor.py`:
- Around line 118-122: The load_spec function should validate the YAML result
before unpacking into VersionedWorkflowSpec: check the local variable raw
returned by yaml.safe_load(spec_text) and if it is None or not an instance of
dict/mapping, raise a ValueError with a clear message about expecting a mapping
workflow spec; then only call workflow_module.VersionedWorkflowSpec(**raw) and
return versioned.workflow. This ensures load_spec rejects empty/scalar/list YAML
inputs instead of letting VersionedWorkflowSpec raise a TypeError.
- Around line 567-583: The code currently creates a temp dir (mkdtemp) and then
calls StandaloneExecutor(...), executor.load_spec(...), and
executor.execute(...), but only cleans the dir on the normal success/failure
path; wrap the executor creation, load_spec, and execute calls in a try/finally
so that any exception triggers cleanup: after creating work_dir (when
created_work_dir is True) ensure in the finally block that if not keep_work_dir
the directory is removed with shutil.rmtree(work_dir, ignore_errors=True); keep
the existing logging (info about using temp dir and preserved dir) but move the
final cleanup logic into the finally to guarantee no leak even when
StandaloneExecutor, load_spec, or execute raise.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 96a9a10d-7d76-4af0-8773-526ae76dc345

📥 Commits

Reviewing files that changed from the base of the PR and between 9051d1f and 53a18c7.

📒 Files selected for processing (11)
  • AGENTS.md
  • src/cli/BUILD
  • src/cli/docker_compose.py
  • src/cli/main_parser.py
  • src/cli/standalone.py
  • src/utils/BUILD
  • src/utils/compose_executor.py
  • src/utils/standalone_executor.py
  • src/utils/tests/BUILD
  • src/utils/tests/test_compose_executor.py
  • src/utils/tests/test_standalone_executor.py
✅ Files skipped from review due to trivial changes (3)
  • src/cli/main_parser.py
  • src/utils/tests/BUILD
  • AGENTS.md

Comment thread src/utils/compose_executor.py
Comment thread src/utils/compose_executor.py
…nd cleanup

- Added error handling in `ComposeExecutor` to raise `NotImplementedError` for unsupported resume functionality.
- Refactored `run_workflow_compose` and `run_workflow_standalone` to ensure proper cleanup of temporary work directories using a `try-finally` block.
- Introduced validation in `StandaloneExecutor` to check for correct YAML mapping in workflow specifications, raising a `ValueError` for invalid formats.
- Updated build configurations to include the new `docker_compose.py` file and the `compose_executor` library.
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (3)
src/utils/standalone_executor.py (3)

462-468: ⚠️ Potential issue | 🟠 Major

Quote multi-GPU selectors before passing --gpus to Docker.

Official NVIDIA/Docker examples use a quoted selector for specific devices, and the unquoted device=0,1 form is documented as failing with cannot set both Count and DeviceIDs. Here the argv element is emitted as plain device=0,1, so multi-GPU tasks can fail to start. (docs.nvidia.com)

Suggested fix
-                docker_args += ['--gpus', f'device={",".join(str(i) for i in range(available))}']
+                docker_args += ['--gpus', f'"device={",".join(str(i) for i in range(available))}"']
             else:
-                docker_args += ['--gpus', f'device={",".join(str(i) for i in range(gpu_count))}']
+                docker_args += ['--gpus', f'"device={",".join(str(i) for i in range(gpu_count))}"']
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/utils/standalone_executor.py` around lines 462 - 468, The multi-GPU
selector passed to Docker is unquoted and can trigger "cannot set both Count and
DeviceIDs" errors; update both places where docker_args is appended (the branch
handling gpu_count > available and the else branch) so the selector value is
wrapped in quotes (i.e., pass a quoted "device=0,1" string instead of
device=0,1) when adding ['--gpus', ...], ensuring node.name/gpu_count/available
logic remains the same.

417-423: ⚠️ Potential issue | 🟠 Major

Honor inline resources.gpu in _task_gpu_count().

This helper only consults spec.resources[task_spec.resource], so tasks that declare GPUs inline under resources: are treated as CPU-only. ComposeExecutor._build_compose_service() inherits the same bug through this helper.

Suggested fix
     def _task_gpu_count(self, task_spec: task_module.TaskSpec,
                         spec: workflow_module.WorkflowSpec) -> int:
         """Return the number of GPUs requested by a task's resource spec, defaulting to 0."""
-        resource_spec = spec.resources.get(task_spec.resource)
+        if task_spec.resources and task_spec.resources.gpu:
+            return task_spec.resources.gpu
+        resource_spec = spec.resources.get(task_spec.resource)
         if resource_spec and resource_spec.gpu:
             return resource_spec.gpu
         return 0
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/utils/standalone_executor.py` around lines 417 - 423, _task_gpu_count
currently only looks up GPUs via spec.resources.get(task_spec.resource) and
ignores inline declarations on the TaskSpec; update _task_gpu_count to first
inspect task_spec.resources (e.g., task_spec.resources.gpu or
task_spec.resources.get("gpu") depending on structure) and return that value if
present, then fall back to resolving spec.resources.get(task_spec.resource) as
before; also ensure ComposeExecutor._build_compose_service (which uses
_task_gpu_count) will then honor inline task-level GPU requests without further
changes.

131-137: ⚠️ Potential issue | 🟠 Major

Clean output directories for tasks that will be re-executed.

_setup_directories() only creates missing folders. On --resume / --from-step, any task not restored into self._results keeps its previous <work_dir>/<task>/output contents, so a rerun can read stale artifacts from {{output}} before it writes new ones.

Suggested fix
         self._validate_for_standalone(spec)
         self._setup_directories()
 
         if resume or from_step:
             self._restore_completed_tasks(from_step)
+            for task_name in self._task_nodes:
+                if task_name in self._results:
+                    continue
+                task_output_dir = os.path.join(self._work_dir, task_name, 'output')
+                shutil.rmtree(task_output_dir, ignore_errors=True)
+                os.makedirs(task_output_dir, exist_ok=True)

Also applies to: 372-376

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/utils/standalone_executor.py` around lines 131 - 137, When
resuming/from-step, output directories for tasks that will be re-executed are
not cleaned because _setup_directories only creates missing folders; after
calling _restore_completed_tasks(...) you must identify tasks that remain to run
(e.g., all task ids in the DAG minus keys in self._results) and remove or empty
their <work_dir>/<task>/output directories so no stale artifacts are read;
implement this cleanup right after _restore_completed_tasks(...) in the block
that calls _build_dag, _validate_for_standalone, _setup_directories (and
replicate the same cleanup in the analogous block around lines 372-376) to
ensure tasks selected for re-execution start with a clean output directory.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Duplicate comments:
In `@src/utils/standalone_executor.py`:
- Around line 462-468: The multi-GPU selector passed to Docker is unquoted and
can trigger "cannot set both Count and DeviceIDs" errors; update both places
where docker_args is appended (the branch handling gpu_count > available and the
else branch) so the selector value is wrapped in quotes (i.e., pass a quoted
"device=0,1" string instead of device=0,1) when adding ['--gpus', ...], ensuring
node.name/gpu_count/available logic remains the same.
- Around line 417-423: _task_gpu_count currently only looks up GPUs via
spec.resources.get(task_spec.resource) and ignores inline declarations on the
TaskSpec; update _task_gpu_count to first inspect task_spec.resources (e.g.,
task_spec.resources.gpu or task_spec.resources.get("gpu") depending on
structure) and return that value if present, then fall back to resolving
spec.resources.get(task_spec.resource) as before; also ensure
ComposeExecutor._build_compose_service (which uses _task_gpu_count) will then
honor inline task-level GPU requests without further changes.
- Around line 131-137: When resuming/from-step, output directories for tasks
that will be re-executed are not cleaned because _setup_directories only creates
missing folders; after calling _restore_completed_tasks(...) you must identify
tasks that remain to run (e.g., all task ids in the DAG minus keys in
self._results) and remove or empty their <work_dir>/<task>/output directories so
no stale artifacts are read; implement this cleanup right after
_restore_completed_tasks(...) in the block that calls _build_dag,
_validate_for_standalone, _setup_directories (and replicate the same cleanup in
the analogous block around lines 372-376) to ensure tasks selected for
re-execution start with a clean output directory.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: af05a982-9fd9-494e-8e5f-7bf2cc94808d

📥 Commits

Reviewing files that changed from the base of the PR and between 53a18c7 and a52facc.

📒 Files selected for processing (3)
  • src/cli/BUILD
  • src/utils/compose_executor.py
  • src/utils/standalone_executor.py
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/cli/BUILD

- Introduced a new method `_clean_rerun_output_dirs` to remove output directories for tasks that will be re-executed, ensuring no stale artifacts remain.
- Updated the `run_workflow_standalone` method to call the cleanup function when resuming or starting from a specific step.
- Adjusted GPU device specification formatting in Docker arguments to ensure correct syntax with quotes around device lists.
- Added support for `--credential`, `--set`, and `--set-string` arguments in both `docker_compose.py` and `standalone.py` to allow users to map credential names to local directories and override default values in workflow specifications.
- Implemented `_parse_credentials` function to validate and parse credential inputs.
- Updated `ComposeExecutor` and `StandaloneExecutor` to handle new credential and variable parameters, ensuring proper binding of directories and values during execution.
- Enhanced error handling for unsupported credential configurations in workflow specifications.
@maufrancom
Copy link
Copy Markdown
Author

@coderabbitai resume

@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Apr 10, 2026

✅ Actions performed

Reviews resumed.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/utils/compose_executor.py`:
- Around line 193-197: Validation currently accepts any declared credential name
by checking task_spec.credentials against self._credentials, but
_build_compose_service() only mounts credentials whose value is a string, so
dict-style credential mappings silently get dropped; update the validator in
compose mode (the loop referencing task_spec.credentials and self._credentials)
to also check that self._credentials[cred_name] is a str and reject/raise an
error if it's not (e.g., is a dict), and make the same change in the other
validator block referenced around lines 368-371 so both validation sites enforce
string-only credential values.
- Around line 542-546: The finally block currently removes auto-created work
dirs even on failure; update the cleanup logic in compose_executor.py so that
shutil.rmtree(work_dir, ...) only runs when created_work_dir is true,
keep_work_dir is false, AND success is true (i.e., delete only on successful
runs), and ensure when not success you log/preserve the work_dir via
logger.info('Work directory preserved for debugging: %s', work_dir); adjust the
conditionals around created_work_dir, keep_work_dir, success, and the logger
call accordingly to preserve generated docker-compose.yml on failure.

In `@src/utils/standalone_executor.py`:
- Around line 78-81: The docstring listing unsupported features is out of date:
it still claims "Credentials" are unsupported even though StandaloneExecutor
accepts --credential mounts and binds them in the _run_task() method; update the
text in the unsupported-features block to remove "Credentials" (or explicitly
mark them as supported via --credential) and, if helpful, add a short note
referencing the --credential flag and that binding happens in
StandaloneExecutor._run_task() so the comment matches the implementation.
- Around line 357-361: The loop that validates TaskSpec.credentials currently
treats each credential as a string and silently ignores dict-style credentials;
update the validation in the credentials-checking loop (iterating over
task_spec.credentials) to detect dict items (isinstance(item, dict)) and add a
clear rejection message to unsupported_features (e.g., 'Task "{task_spec.name}":
credential "{key}" uses dict-style mapping which the standalone executor does
not support; provide credentials as NAME=/path or flatten the mapping') instead
of treating them as missing, and apply the same explicit rejection check where
credentials are mounted (the block around the mount code that references
self._credentials) so dict credential entries are rejected consistently rather
than skipped.
- Around line 673-677: The finally block currently deletes auto-created work
dirs whenever created_work_dir is True and keep_work_dir is False, even on
failure; change the logic so auto-created dirs are removed only when the run
succeeded and keep_work_dir is False. Specifically, update the condition around
shutil.rmtree in the finally block (referencing created_work_dir, keep_work_dir,
success, work_dir) to run only when success is True (e.g., if created_work_dir
and not keep_work_dir and success) and ensure the branch that logs preservation
(logger.info('Work directory preserved for debugging: %s', work_dir)) runs when
the run failed (not success) so that auto-created dirs are always kept on
failure.
- Around line 580-636: _spec_has_templates currently only looks for '{%', '{#',
and 'default-values' so specs that only use '{{ ... }}' never trigger local
expansion; additionally _OSMO_TOKEN_PATTERN only matches unspaced tokens like
'{{output}}' so spaced tokens like '{{ output }}' are not protected and Jinja
raises StrictUndefined. Update _OSMO_TOKEN_PATTERN to allow optional whitespace
inside the braces and to match the full token (e.g.
r'\{\{\s*(uuid|workflow_id|output|input:[^}]+|host:[^}]+)\s*\}\}'), and in
_expand_jinja_locally use the match.group(0) (the full token including spaces)
as original_token when building placeholder_map so replacements preserve
spacing; also update _spec_has_templates to return True if '{{' appears (in
addition to '{%' and '{#' and 'default-values') so plain '{{ var }}' triggers
local Jinja expansion.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 0c881831-6032-4c86-b602-7a0c4af46766

📥 Commits

Reviewing files that changed from the base of the PR and between 53a18c7 and a875365.

📒 Files selected for processing (8)
  • .gitignore
  • AGENTS.md
  • src/cli/BUILD
  • src/cli/docker_compose.py
  • src/cli/standalone.py
  • src/utils/BUILD
  • src/utils/compose_executor.py
  • src/utils/standalone_executor.py
✅ Files skipped from review due to trivial changes (1)
  • .gitignore
🚧 Files skipped from review as they are similar to previous changes (3)
  • src/cli/BUILD
  • src/utils/BUILD
  • AGENTS.md

Comment thread src/utils/compose_executor.py Outdated
Comment thread src/utils/compose_executor.py
Comment thread src/utils/standalone_executor.py Outdated
Comment thread src/utils/standalone_executor.py Outdated
Comment thread src/utils/standalone_executor.py Outdated
Comment thread src/utils/standalone_executor.py
- Updated credential processing to support dict-style mappings, raising clear errors for unsupported configurations.
- Improved error messages for missing credentials, guiding users on the correct format for credential specification.
- Adjusted cleanup logic in `run_workflow_compose` and `run_workflow_standalone` to ensure work directories are only removed on successful execution.
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (3)
src/utils/compose_executor.py (1)

77-78: Project name may be invalid if workflow name starts with non-alphanumeric.

If the workflow name starts with a special character (e.g., _myworkflow), the resulting project name osmo--myworkflow has a double hyphen but is valid. However, if a workflow name is all special characters (unlikely but possible), the result could be problematic.

This is a minor edge case, but consider validating the workflow name earlier or stripping leading/trailing hyphens:

     def _compose_project_name(self, spec: workflow_module.WorkflowSpec) -> str:
-        return f'osmo-{re.sub(r"[^a-z0-9-]", "-", spec.name.lower())}'
+        sanitized = re.sub(r"[^a-z0-9-]", "-", spec.name.lower()).strip('-')
+        return f'osmo-{sanitized or "workflow"}'
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/utils/compose_executor.py` around lines 77 - 78, The
_compose_project_name function can produce leading/trailing or repeated hyphens
(or an empty name) when spec.name contains only non-alphanumeric chars; update
the logic in _compose_project_name to first replace non [a-z0-9] with hyphens,
collapse consecutive hyphens into a single hyphen, strip leading/trailing
hyphens, and if the result is empty fallback to a safe default like
"osmo-default" (all while keeping the "osmo-" prefix and using spec.name.lower()
as the input); locate and update the _compose_project_name method in
compose_executor.py to implement these transformations and the fallback.
src/utils/standalone_executor.py (2)

508-509: Consider adding strict=True to zip() for defensive coding.

While task_spec.environment.keys() and resolved_env_values are guaranteed to have the same length (built from the same dict), adding strict=True makes this invariant explicit and would catch any future refactoring bugs:

-        for env_key, resolved_value in zip(task_spec.environment.keys(), resolved_env_values):
+        for env_key, resolved_value in zip(task_spec.environment.keys(), resolved_env_values, strict=True):
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/utils/standalone_executor.py` around lines 508 - 509, The loop that
builds docker_args uses zip(task_spec.environment.keys(), resolved_env_values) —
change it to zip(task_spec.environment.keys(), resolved_env_values, strict=True)
to enforce the invariant that both iterables have the same length; update the
for-loop over (env_key, resolved_value) accordingly (still appending to
docker_args), and run tests/CI to ensure Python runtime supports zip(...,
strict=True).

203-214: State file does not include workflow version/hash.

The state file only persists task results without any workflow fingerprint. If the spec changes between runs (different images, args, etc.) but task names stay the same, --resume will skip incompatible tasks and reuse stale outputs.

The PR discussion indicates this was a deliberate design decision, but consider at minimum logging a warning when resuming that the user is responsible for ensuring spec consistency.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/utils/standalone_executor.py` around lines 203 - 214, The state file
currently saved by _save_state lacks any workflow fingerprint, so add a
deterministic workflow/version hash to the saved JSON (e.g., compute from the
workflow spec, image names, args -- implement a helper like
_compute_workflow_fingerprint and include its result under a top-level key such
as "workflow_fingerprint" alongside "tasks"), update _save_state to write that
fingerprint to self._state_file_path, and then update the resume/load path (the
method that reads the state, e.g., _load_state or the resume logic that calls
it) to compare the saved fingerprint with the current computed fingerprint and
emit a clear processLogger.warning (or similar logger used in this class) when
fingerprints are missing or do not match so users are warned before reusing
stale outputs.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/utils/standalone_executor.py`:
- Around line 499-501: The docker_args construction in standalone_executor.py is
adding literal double-quote characters into the GPU spec (in the f-strings used
to build the '--gpus' argument), which breaks Docker when invoking
subprocess.run() with list mode; update both branches that set docker_args (the
branch using available and the branch using gpu_count) to remove the surrounding
quotes so the value is e.g. device=0,1 instead of "device=0,1". Locate the code
that appends to docker_args (the two f'"device=..."' occurrences) and change
them to unquoted f'device=...'; keep using subprocess.run() list invocation
unchanged.

---

Nitpick comments:
In `@src/utils/compose_executor.py`:
- Around line 77-78: The _compose_project_name function can produce
leading/trailing or repeated hyphens (or an empty name) when spec.name contains
only non-alphanumeric chars; update the logic in _compose_project_name to first
replace non [a-z0-9] with hyphens, collapse consecutive hyphens into a single
hyphen, strip leading/trailing hyphens, and if the result is empty fallback to a
safe default like "osmo-default" (all while keeping the "osmo-" prefix and using
spec.name.lower() as the input); locate and update the _compose_project_name
method in compose_executor.py to implement these transformations and the
fallback.

In `@src/utils/standalone_executor.py`:
- Around line 508-509: The loop that builds docker_args uses
zip(task_spec.environment.keys(), resolved_env_values) — change it to
zip(task_spec.environment.keys(), resolved_env_values, strict=True) to enforce
the invariant that both iterables have the same length; update the for-loop over
(env_key, resolved_value) accordingly (still appending to docker_args), and run
tests/CI to ensure Python runtime supports zip(..., strict=True).
- Around line 203-214: The state file currently saved by _save_state lacks any
workflow fingerprint, so add a deterministic workflow/version hash to the saved
JSON (e.g., compute from the workflow spec, image names, args -- implement a
helper like _compute_workflow_fingerprint and include its result under a
top-level key such as "workflow_fingerprint" alongside "tasks"), update
_save_state to write that fingerprint to self._state_file_path, and then update
the resume/load path (the method that reads the state, e.g., _load_state or the
resume logic that calls it) to compare the saved fingerprint with the current
computed fingerprint and emit a clear processLogger.warning (or similar logger
used in this class) when fingerprints are missing or do not match so users are
warned before reusing stale outputs.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 8e89b1d9-9eb4-425d-bcbf-3ba7b3572583

📥 Commits

Reviewing files that changed from the base of the PR and between a875365 and de30c90.

📒 Files selected for processing (2)
  • src/utils/compose_executor.py
  • src/utils/standalone_executor.py

Comment thread src/utils/standalone_executor.py Outdated
- Introduced a new method `_compute_workflow_fingerprint` to generate a SHA-256 hash of task specifications, allowing for detection of workflow changes across runs.
- Updated the state management to include the workflow fingerprint, enabling verification of spec consistency between runs and warning users of potential stale outputs.
- Improved GPU device specification formatting in Docker arguments to ensure correct syntax without unnecessary quotes.
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (3)
src/utils/standalone_executor.py (2)

280-290: Minor: Consider using collections.deque for BFS queue.

list.pop(0) is O(n) per operation. For large workflows, collections.deque.popleft() provides O(1) performance. This also applies to _cancel_downstream at lines 469-477.

Suggested improvement
+from collections import deque
+
     def _get_downstream_tasks(self, task_name: str) -> Set[str]:
         """Return all transitive downstream dependents of the given task via BFS."""
         visited: Set[str] = set()
-        queue = [task_name]
+        queue = deque([task_name])
         while queue:
-            current = queue.pop(0)
+            current = queue.popleft()
             for downstream in self._task_nodes[current].downstream:
                 if downstream not in visited:
                     visited.add(downstream)
                     queue.append(downstream)
         return visited
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/utils/standalone_executor.py` around lines 280 - 290, Replace the BFS
queue implementation that uses a list and pop(0) with a collections.deque to get
O(1) pops: import deque from collections, initialize queue = deque([task_name])
in _get_downstream_tasks and use queue.popleft() instead of pop(0); make the
same change in the _cancel_downstream function (initialize its queue as a deque
and call popleft()) to avoid O(n) behavior on large workflows while preserving
the existing visited/append logic.

126-141: Consider including files contents in the workflow fingerprint.

The fingerprint captures task name, image, command, args, environment, inputs, and resource but omits files contents. If a user changes inline file contents without changing other fields, the fingerprint won't detect it and --resume may reuse stale outputs.

Suggested improvement
             fingerprint_data.append({
                 'name': name,
                 'image': spec.image,
                 'command': spec.command,
                 'args': spec.args,
                 'environment': dict(sorted(spec.environment.items())),
                 'inputs': [str(i) for i in spec.inputs],
                 'resource': spec.resource,
+                'files': [(f.path, f.contents) for f in spec.files],
             })
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/utils/standalone_executor.py` around lines 126 - 141, The
_compute_workflow_fingerprint function currently omits task file contents;
update it to include each task spec's files in the fingerprint by adding a
deterministic representation of spec.files (e.g., a sorted list of {path,
content} or a map path->content) into fingerprint_data for each task in
_task_nodes. Ensure ordering is stable (sort file paths), handle missing/None
safely, and convert file contents to strings (or a content hash) so the JSON
dump stays deterministic before hashing; keep using json.dumps(sort_keys=True,
separators=(',', ':')) and hashlib.sha256 to produce the final hex digest.
src/utils/compose_executor.py (1)

493-506: Consider logging a hint when JSON parsing fails to find the service.

The JSON parsing handles both single-object and array formats from docker compose ps, which is good. However, if the output format changes in future Docker Compose versions, parsing could silently fail and return exit code 1, making debugging difficult.

Suggested improvement
             for line in result.stdout.strip().splitlines():
                 line = line.strip()
                 if not line:
                     continue
                 try:
                     container_info = json.loads(line)
                 except json.JSONDecodeError:
+                    logger.debug('Could not parse compose ps output line: %s', line[:100])
                     continue
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/utils/compose_executor.py` around lines 493 - 506, The loop that parses
result.stdout with json.loads can silently skip malformed lines and return 1, so
update the parsing in compose_executor.py (the block iterating over
result.stdout.splitlines and using json.loads) to log a hint when
JSONDecodeError occurs and when no matching service is found: catch
json.JSONDecodeError and call the module logger (e.g., logger.error or
process_logger.error) including the exception and the raw line from
result.stdout, and after the loop, if no entry matched service_name, log a
warning that the service was not found in parsed container_info along with the
full result.stdout to aid debugging.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@src/utils/compose_executor.py`:
- Around line 493-506: The loop that parses result.stdout with json.loads can
silently skip malformed lines and return 1, so update the parsing in
compose_executor.py (the block iterating over result.stdout.splitlines and using
json.loads) to log a hint when JSONDecodeError occurs and when no matching
service is found: catch json.JSONDecodeError and call the module logger (e.g.,
logger.error or process_logger.error) including the exception and the raw line
from result.stdout, and after the loop, if no entry matched service_name, log a
warning that the service was not found in parsed container_info along with the
full result.stdout to aid debugging.

In `@src/utils/standalone_executor.py`:
- Around line 280-290: Replace the BFS queue implementation that uses a list and
pop(0) with a collections.deque to get O(1) pops: import deque from collections,
initialize queue = deque([task_name]) in _get_downstream_tasks and use
queue.popleft() instead of pop(0); make the same change in the
_cancel_downstream function (initialize its queue as a deque and call popleft())
to avoid O(n) behavior on large workflows while preserving the existing
visited/append logic.
- Around line 126-141: The _compute_workflow_fingerprint function currently
omits task file contents; update it to include each task spec's files in the
fingerprint by adding a deterministic representation of spec.files (e.g., a
sorted list of {path, content} or a map path->content) into fingerprint_data for
each task in _task_nodes. Ensure ordering is stable (sort file paths), handle
missing/None safely, and convert file contents to strings (or a content hash) so
the JSON dump stays deterministic before hashing; keep using
json.dumps(sort_keys=True, separators=(',', ':')) and hashlib.sha256 to produce
the final hex digest.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: d75af904-9945-450c-8df8-dabef5dbcd16

📥 Commits

Reviewing files that changed from the base of the PR and between de30c90 and 4367955.

📒 Files selected for processing (2)
  • src/utils/compose_executor.py
  • src/utils/standalone_executor.py

- Added error logging for JSON parsing failures in `ComposeExecutor` to aid in debugging.
- Enhanced warning messages in `ComposeExecutor` to include the full Docker compose output when no container info is found.
- Updated `StandaloneExecutor` to use `deque` for improved performance in task processing queues.
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/utils/compose_executor.py`:
- Around line 82-86: The _compose_base_cmd function currently uses
self._compose_cmd.split(), which mishandles quoted paths; replace that call with
shlex.split(self._compose_cmd) so quoted/space-containing arguments are parsed
correctly, and ensure shlex is imported; keep the rest of the return
construction (adding '-p', self._compose_project_name(spec), '-f',
self._compose_file_path) unchanged.

In `@src/utils/standalone_executor.py`:
- Around line 127-147: The fingerprint only records spec.resource by name;
update _compute_workflow_fingerprint to include the resolved/effective resource
configuration so changes in GPU/CPU/memory alter the hash. For each task node in
_compute_workflow_fingerprint, replace the simple 'resource': spec.resource
entry with the full resource dict resolved from spec.resources (e.g.,
spec.resources.get(spec.resource) or spec.effective_resource if available),
normalizing it to a deterministic order (sorted keys) and serializing numeric
values so the SHA-256 reflects actual resource settings.
- Around line 228-246: Make saving and loading the resume state atomic and
tolerant of partial/corrupt files: in _save_state use a temp file (e.g.,
self._state_file_path + ".tmp"), write JSON to it and fsync the file and its
directory, then atomically replace the real file with os.replace so truncation
cannot leave a broken file; in _load_state catch JSONDecodeError (and possibly
IOError) when reading self._state_file_path and treat it as no state (return
None) after optionally renaming the corrupt file to a .corrupt timestamped
backup so the run can start fresh and the bad file is preserved for debugging;
keep references to
_save_state/_load_state/_state_file_path/_workflow_fingerprint/_results when
making the changes.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 3eebc53e-3c27-4b6b-abba-4cd68bef171f

📥 Commits

Reviewing files that changed from the base of the PR and between 4367955 and 2f13f82.

📒 Files selected for processing (2)
  • src/utils/compose_executor.py
  • src/utils/standalone_executor.py

Comment thread src/utils/compose_executor.py
Comment thread src/utils/standalone_executor.py Outdated
Comment thread src/utils/standalone_executor.py Outdated
…ng and state management

- Replaced string splitting with `shlex.split` in `ComposeExecutor` to ensure correct parsing of command arguments.
- Enhanced `_compute_workflow_fingerprint` in `StandaloneExecutor` to accept `WorkflowSpec` directly, improving clarity and functionality.
- Updated state file handling in `StandaloneExecutor` to use a temporary file for safer writes and added error handling for corrupt state files.
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (2)
src/utils/standalone_executor.py (2)

369-396: Consider using iterable unpacking for cycle path construction.

Minor style improvement at line 380.

Suggested change
             if state[name] == IN_PROGRESS:
                 cycle_start = path.index(name)
-                return path[cycle_start:] + [name]
+                return [*path[cycle_start:], name]
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/utils/standalone_executor.py` around lines 369 - 396, In
_check_for_cycles (inner function visit) replace the list concatenation used to
build the cycle path (path[cycle_start:] + [name]) with iterable unpacking to
improve clarity and performance; locate the visit function inside the
_check_for_cycles method and return the cycle as [*path[cycle_start:], name]
instead of using + to join the slices.

694-700: Consider using SHA-256 instead of MD5 for placeholder key generation.

While MD5 is only used here for generating unique placeholder names (not security), using hashlib.sha256 would be consistent with the fingerprint computation elsewhere in this file and silence the static analysis warning.

Suggested change
     placeholder_map: Dict[str, str] = {}
     for match in _OSMO_TOKEN_PATTERN.finditer(file_text):
         field = match.group(1).strip()
-        hash_key = 'hash' + str(int(hashlib.md5(field.encode('utf-8')).hexdigest(), 16))
+        hash_key = 'osmo_' + hashlib.sha256(field.encode('utf-8')).hexdigest()[:16]
         original_token = match.group(0)
         template_data[hash_key] = original_token
         placeholder_map[original_token] = hash_key
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/utils/standalone_executor.py` around lines 694 - 700, The placeholder key
generation currently uses hashlib.md5 on the matched field inside the loop that
builds placeholder_map and template_data (see _OSMO_TOKEN_PATTERN,
placeholder_map, template_data, hash_key, original_token); replace
hashlib.md5(...) with hashlib.sha256(...) and keep the same .hexdigest() ->
int(...) conversion so the produced hash_key remains a deterministic string
(e.g., 'hash' + str(int(hashlib.sha256(field.encode('utf-8')).hexdigest(),
16))). This aligns with the fingerprint code elsewhere and removes the MD5
usage.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@src/utils/standalone_executor.py`:
- Around line 369-396: In _check_for_cycles (inner function visit) replace the
list concatenation used to build the cycle path (path[cycle_start:] + [name])
with iterable unpacking to improve clarity and performance; locate the visit
function inside the _check_for_cycles method and return the cycle as
[*path[cycle_start:], name] instead of using + to join the slices.
- Around line 694-700: The placeholder key generation currently uses hashlib.md5
on the matched field inside the loop that builds placeholder_map and
template_data (see _OSMO_TOKEN_PATTERN, placeholder_map, template_data,
hash_key, original_token); replace hashlib.md5(...) with hashlib.sha256(...) and
keep the same .hexdigest() -> int(...) conversion so the produced hash_key
remains a deterministic string (e.g., 'hash' +
str(int(hashlib.sha256(field.encode('utf-8')).hexdigest(), 16))). This aligns
with the fingerprint code elsewhere and removes the MD5 usage.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 3043a7fb-1201-4911-b8eb-8297aa4403a6

📥 Commits

Reviewing files that changed from the base of the PR and between 2f13f82 and 093f5b9.

📒 Files selected for processing (2)
  • src/utils/compose_executor.py
  • src/utils/standalone_executor.py

…ction logic

- Updated the cycle detection logic to use list unpacking for clarity in `StandaloneExecutor`.
- Changed hash generation from MD5 to SHA-256 for improved security in `_expand_jinja_locally`, ensuring better uniqueness and collision resistance.
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
src/utils/standalone_executor.py (1)

165-176: ⚠️ Potential issue | 🟠 Major

Fresh reruns still reuse stale task outputs.

_clean_rerun_output_dirs() only runs in the resume / from_step path. If the same executor or work_dir is used for a normal rerun, _setup_directories() leaves <work_dir>/<task>/output intact, so a supposedly fresh run can consume or publish stale artifacts.

♻️ Proposed fix
     def execute(self, spec: workflow_module.WorkflowSpec,
                 resume: bool = False, from_step: str | None = None) -> bool:
         """Run all tasks in topological order, returning True if the entire workflow succeeds."""
         self._results.clear()
         self._build_dag(spec)
         self._workflow_fingerprint = self._compute_workflow_fingerprint(spec)
         self._validate_for_standalone(spec)
-        self._setup_directories()
+        self._setup_directories(clean_outputs=not (resume or from_step))
 
         if resume or from_step:
             self._restore_completed_tasks(from_step)
             self._clean_rerun_output_dirs()
@@
-    def _setup_directories(self):
+    def _setup_directories(self, clean_outputs: bool = False):
         """Create the work directory and per-task output directories on the host filesystem."""
         os.makedirs(self._work_dir, exist_ok=True)
         for task_name in self._task_nodes:
-            os.makedirs(os.path.join(self._work_dir, task_name, 'output'), exist_ok=True)
+            output_dir = os.path.join(self._work_dir, task_name, 'output')
+            if clean_outputs and os.path.isdir(output_dir):
+                shutil.rmtree(output_dir)
+            os.makedirs(output_dir, exist_ok=True)

Also applies to: 468-472

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/utils/standalone_executor.py` around lines 165 - 176, The bug is that
_clean_rerun_output_dirs() is only invoked for resume/from_step flows so fresh
reruns can reuse stale outputs; fix by invoking _clean_rerun_output_dirs() after
_setup_directories() for all runs (or at minimum when not resuming) so
execute(...) always clears previous task output dirs before running; update the
execute method to call _clean_rerun_output_dirs() unconditionally (or add an
else branch when resume/from_step is false) so _setup_directories() +
_clean_rerun_output_dirs() always run together.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/utils/standalone_executor.py`:
- Around line 533-541: The loop that writes inline files treats all files as
UTF-8 text; change it to honor file_spec.base64 by checking that flag after
resolving tokens with _substitute_tokens: if file_spec.base64 is truthy,
base64-decode the resolved_contents and open host_path in binary mode ('wb') to
write the bytes, otherwise continue writing as UTF-8 text ('w',
encoding='utf-8'). Keep the same path-safety checks (host_path, files_dir,
node.name) and directory creation logic (os.makedirs) but branch the file
open/write based on file_spec.base64 so binary payloads are correctly decoded
and written.

---

Duplicate comments:
In `@src/utils/standalone_executor.py`:
- Around line 165-176: The bug is that _clean_rerun_output_dirs() is only
invoked for resume/from_step flows so fresh reruns can reuse stale outputs; fix
by invoking _clean_rerun_output_dirs() after _setup_directories() for all runs
(or at minimum when not resuming) so execute(...) always clears previous task
output dirs before running; update the execute method to call
_clean_rerun_output_dirs() unconditionally (or add an else branch when
resume/from_step is false) so _setup_directories() + _clean_rerun_output_dirs()
always run together.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 3c919781-bfd4-4f35-acf2-c230b97e6189

📥 Commits

Reviewing files that changed from the base of the PR and between 093f5b9 and 6e76645.

📒 Files selected for processing (1)
  • src/utils/standalone_executor.py

Comment thread src/utils/standalone_executor.py Outdated
…ncoding

- Introduced base64 decoding for file writing in `StandaloneExecutor`, allowing for proper handling of encoded content.
- Ensured cleanup of rerun output directories is executed when resuming tasks, improving state management.
@maufrancom
Copy link
Copy Markdown
Author

@coderabbitai resume

@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Apr 11, 2026

✅ Actions performed

Reviews resumed.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

♻️ Duplicate comments (1)
src/utils/standalone_executor.py (1)

173-176: ⚠️ Potential issue | 🟠 Major

Restore completed state before recreating task outputs.

_setup_directories() recreates every <work_dir>/<task>/output before _restore_completed_tasks() runs, so Line 300’s os.path.isdir(info['output_dir']) check no longer proves that prior output actually exists. A cleaned or partially deleted work dir can therefore skip tasks and feed empty outputs downstream on --resume / --from-step.

Also applies to: 297-301

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/utils/standalone_executor.py` around lines 173 - 176, The code currently
calls self._setup_directories() before restoring completed state, which
recreates every <work_dir>/<task>/output and makes the subsequent
os.path.isdir(info['output_dir']) check in _restore_completed_tasks unreliable;
move the call to self._restore_completed_tasks(from_step) to run before
self._setup_directories() when resume or from_step is set (or alternatively
modify _setup_directories to skip recreating task output dirs when restoring) so
that existing outputs are detected correctly by _restore_completed_tasks; update
any references to resume/from_step handling to reflect the new call order and
ensure downstream logic uses the restored state.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/utils/standalone_executor.py`:
- Around line 402-461: The standalone executor currently ignores
WorkflowSpec.timeout; update _validate_for_standalone to detect a non-null
spec.timeout and treat it as unsupported: inside _validate_for_standalone (the
method performing feature checks), add a check for spec.timeout and append an
explanatory message to unsupported_features (e.g. 'WorkflowSpec.timeout is not
supported in standalone mode; use the service executor or remove the timeout'),
so workflows that declare a timeout are rejected rather than silently ignored.
- Around line 129-154: The fingerprint currently omits credential-backed inputs
so changes to credential names, mount paths, or supplied directories won't
change the hash; update _compute_workflow_fingerprint to include
task_spec.credentials and the resolved credential mapping used at runtime (the
mapping that _run_task mounts) by serializing credential names, mount paths, and
the resolved source/path for each credential referenced by task_spec.credentials
into the fingerprint_data (use deterministic ordering and stable serialization
like sorted keys and json.dumps as done for other fields); also apply the same
change to the other fingerprinting code path that computes per-task resume
fingerprints so credential changes invalidate cached outputs.
- Around line 344-369: Standalone executor currently ignores group.barrier
semantics; add validation to reject barrier groups in standalone mode. Update
_validate_for_standalone to iterate the workflow spec's groups (same groups used
by _build_dag) and raise a clear error (e.g., ValueError) when any group has
barrier == True, referencing the group name in the message; this prevents silent
acceptance of barrier groups since _build_dag only respects TaskInputOutput
edges and cannot enforce barrier synchronization. Ensure the check uses the same
group iteration helper (the _groups method) and triggers before DAG
construction/_build_dag is called.

---

Duplicate comments:
In `@src/utils/standalone_executor.py`:
- Around line 173-176: The code currently calls self._setup_directories() before
restoring completed state, which recreates every <work_dir>/<task>/output and
makes the subsequent os.path.isdir(info['output_dir']) check in
_restore_completed_tasks unreliable; move the call to
self._restore_completed_tasks(from_step) to run before self._setup_directories()
when resume or from_step is set (or alternatively modify _setup_directories to
skip recreating task output dirs when restoring) so that existing outputs are
detected correctly by _restore_completed_tasks; update any references to
resume/from_step handling to reflect the new call order and ensure downstream
logic uses the restored state.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: dd9fe1cd-5f10-415c-9c90-b4ec9e4d2997

📥 Commits

Reviewing files that changed from the base of the PR and between 093f5b9 and 1223504.

📒 Files selected for processing (1)
  • src/utils/standalone_executor.py

Comment thread src/utils/standalone_executor.py
Comment on lines +344 to +369
def _build_dag(self, spec: workflow_module.WorkflowSpec):
"""Construct the internal DAG of TaskNodes from the workflow spec's tasks and input dependencies."""
self._task_nodes.clear()
self._group_specs.clear()

for group in self._groups(spec):
self._group_specs[group.name] = group
for task_spec in group.tasks:
self._task_nodes[task_spec.name] = TaskNode(
name=task_spec.name,
spec=task_spec,
group=group.name,
)

for group in self._groups(spec):
for task_spec in group.tasks:
for input_source in task_spec.inputs:
if isinstance(input_source, task_module.TaskInputOutput):
upstream_task = input_source.task
if upstream_task not in self._task_nodes:
raise ValueError(
f'Task "{task_spec.name}" depends on unknown task "{upstream_task}"')
self._task_nodes[task_spec.name].upstream.add(upstream_task)
self._task_nodes[upstream_task].downstream.add(task_spec.name)

self._check_for_cycles()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "=== TaskGroupSpec semantics ==="
rg -n --type=py -C3 '\b(barrier|ignoreNonleadStatus)\b' src/utils/job src/utils src/lib

echo
echo "=== Barrier handling in schedulers/executors ==="
rg -n --type=py -C4 '\bbarrier\b|_build_dag|_find_ready_tasks' src/utils src/lib

Repository: NVIDIA/OSMO

Length of output: 50367


🏁 Script executed:

# Check StandaloneExecutor class docstring and any barrier-related limitations
head -100 src/utils/standalone_executor.py | grep -A 20 "class StandaloneExecutor"

Repository: NVIDIA/OSMO

Length of output: 1029


🏁 Script executed:

# Search for any explicit limitation comments about barrier support
rg -n "barrier|standalone" src/utils/standalone_executor.py | head -30

Repository: NVIDIA/OSMO

Length of output: 1231


🏁 Script executed:

# Check if compose executor has any barrier handling
rg -n "barrier" src/utils/compose_executor.py

Repository: NVIDIA/OSMO

Length of output: 37


🏁 Script executed:

# Search for validation that might reject barrier specs in standalone mode
rg -n "_validate|barrier.*error|not.*support.*barrier" src/utils/standalone_executor.py src/utils/compose_executor.py

Repository: NVIDIA/OSMO

Length of output: 628


🏁 Script executed:

# Read the full _validate_for_standalone implementation
sed -n '402,500p' src/utils/standalone_executor.py

Repository: NVIDIA/OSMO

Length of output: 5493


🏁 Script executed:

# Search for tests that use barrier with standalone executor
rg -n "barrier.*standalone|standalone.*barrier" src/utils/tests/test_standalone_executor.py

Repository: NVIDIA/OSMO

Length of output: 37


🏁 Script executed:

# Check if there are any tests that test barrier specs in standalone mode
rg -B5 -A10 "barrier.*true\|barrier.*false" src/utils/tests/test_standalone_executor.py | head -50

Repository: NVIDIA/OSMO

Length of output: 37


🏁 Script executed:

# Check how _notify_barrier works and when it's called
rg -B10 -A15 "_notify_barrier" src/utils/job/jobs.py | head -60

Repository: NVIDIA/OSMO

Length of output: 2723


🏁 Script executed:

# Search for when barrier is checked in readiness logic
rg -B5 -A5 "has_group_barrier\|barrier" src/utils/job/jobs.py | grep -A5 -B5 "ready\|schedule\|start"

Repository: NVIDIA/OSMO

Length of output: 37


🏁 Script executed:

# Check if multi-task groups have implicit within-group dependencies
rg -n "lead.*task|lead_task" src/utils/standalone_executor.py src/utils/compose_executor.py | head -20

Repository: NVIDIA/OSMO

Length of output: 942


🏁 Script executed:

# Look for how groups are scheduled - is there group-level synchronization?
rg -B3 -A3 "multi.*task.*group|group.*ready|_find_ready" src/utils/standalone_executor.py | head -40

Repository: NVIDIA/OSMO

Length of output: 909


🏁 Script executed:

# Check if there are implicit within-group dependencies or ordering
rg -B5 -A10 "for task_spec in group.tasks|tasks in group" src/utils/standalone_executor.py | head -40

Repository: NVIDIA/OSMO

Length of output: 1993


🏁 Script executed:

# Verify compose executor also lacks barrier handling
rg -n "_build_dag|_find_ready" src/utils/compose_executor.py | head -20

Repository: NVIDIA/OSMO

Length of output: 239


🏁 Script executed:

# Check if there's test coverage for multi-task groups in standalone mode
rg -B10 -A10 "multi.*task.*group|group.*tasks.*\>" src/utils/tests/test_standalone_executor.py | grep -A10 "def test" | head -50

Repository: NVIDIA/OSMO

Length of output: 189


🏁 Script executed:

# Look for any within-group task ordering
rg -B3 -A3 "GROUP_BARRIER_NAME|barrier.*=.*true" src/utils/standalone_executor.py src/utils/compose_executor.py

Repository: NVIDIA/OSMO

Length of output: 37


🏁 Script executed:

# Check if there's any documentation about barrier support in standalone
rg -i "barrier" src/utils/standalone_executor.py

Repository: NVIDIA/OSMO

Length of output: 37


🏁 Script executed:

# Check how compose executor handles multi-task groups
sed -n '414,450p' src/utils/compose_executor.py

Repository: NVIDIA/OSMO

Length of output: 1406


🏁 Script executed:

# Look for integration tests that use multi-task groups with barriers
rg -B5 -A15 "barrier.*true|has_group_barrier" src/utils/tests/test_standalone_executor.py | head -60

Repository: NVIDIA/OSMO

Length of output: 37


Add validation to reject barrier groups in standalone mode.

_build_dag() reads only explicit TaskInputOutput dependencies and never checks the barrier field in task groups. Barriers are used in the Kubernetes job scheduler (via Redis) to synchronize task completion within groups, but there is no equivalent mechanism in standalone execution.

Additionally, _validate_for_standalone() does not reject barrier: true specs, so groups with barriers are silently accepted but their barrier semantics are ignored. Tasks in a barrier group with barrier: true will run in parallel as soon as their explicit task-input dependencies are satisfied, rather than being held until all group members complete.

Either:

  1. Add a validation check to explicitly reject groups with barrier: true, or
  2. Document in the class docstring that barrier semantics are not enforced in standalone mode.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/utils/standalone_executor.py` around lines 344 - 369, Standalone executor
currently ignores group.barrier semantics; add validation to reject barrier
groups in standalone mode. Update _validate_for_standalone to iterate the
workflow spec's groups (same groups used by _build_dag) and raise a clear error
(e.g., ValueError) when any group has barrier == True, referencing the group
name in the message; this prevents silent acceptance of barrier groups since
_build_dag only respects TaskInputOutput edges and cannot enforce barrier
synchronization. Ensure the check uses the same group iteration helper (the
_groups method) and triggers before DAG construction/_build_dag is called.

Comment thread src/utils/standalone_executor.py
@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Apr 11, 2026

✅ Actions performed

Reviews resumed.

- Added support for credential data mapping in `_compute_workflow_fingerprint`, allowing for better management of task-specific credentials.
- Updated `_validate_for_standalone` to raise a ValueError for unsupported timeout features in standalone mode, improving error handling and user guidance.
- Ensured directory setup is executed correctly during workflow execution, enhancing state management.
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
src/utils/standalone_executor.py (1)

199-221: Note: Sequential task execution.

Ready tasks are processed sequentially in the for task_name in ready loop rather than in parallel. This is a reasonable choice for local standalone execution (simpler implementation, predictable resource usage), but could be a performance consideration for workflows with many independent tasks.

If parallel execution becomes desirable in the future, consider using concurrent.futures.ThreadPoolExecutor with a configurable worker count.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/utils/standalone_executor.py` around lines 199 - 221, The loop processing
ready tasks currently runs them sequentially (for task_name in ready) which
limits parallelism; to make execution parallel, submit each ready task to a
concurrent.futures.ThreadPoolExecutor (configurable max_workers) where each
worker calls _run_task(node, spec), then gather futures and iterate over
completed futures to update self._results[task_name], call self._save_state(),
and perform the same exit_code checks (_is_nonlead_failure_ignorable,
_cancel_downstream) and logging; ensure access to shared state (_results and any
other mutated attributes) and calls to _save_state are thread-safe (use a
threading.Lock) and preserve the existing failure-handling semantics when
processing completed futures.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@src/utils/standalone_executor.py`:
- Around line 199-221: The loop processing ready tasks currently runs them
sequentially (for task_name in ready) which limits parallelism; to make
execution parallel, submit each ready task to a
concurrent.futures.ThreadPoolExecutor (configurable max_workers) where each
worker calls _run_task(node, spec), then gather futures and iterate over
completed futures to update self._results[task_name], call self._save_state(),
and perform the same exit_code checks (_is_nonlead_failure_ignorable,
_cancel_downstream) and logging; ensure access to shared state (_results and any
other mutated attributes) and calls to _save_state are thread-safe (use a
threading.Lock) and preserve the existing failure-handling semantics when
processing completed futures.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: bfa0157c-cd9f-4228-8498-c07245251390

📥 Commits

Reviewing files that changed from the base of the PR and between 1223504 and fda5dbb.

📒 Files selected for processing (1)
  • src/utils/standalone_executor.py

@maufrancom maufrancom requested a review from vvnpn-nv April 11, 2026 05:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants