diff --git a/doc/changes/DM-53494.feature.rst b/doc/changes/DM-53494.feature.rst new file mode 100644 index 00000000..605b107f --- /dev/null +++ b/doc/changes/DM-53494.feature.rst @@ -0,0 +1 @@ +Added ability to run submit processes as batch jobs on same cluster with shared filesystems. diff --git a/doc/lsst.ctrl.bps/quickstart.rst b/doc/lsst.ctrl.bps/quickstart.rst index 4370ba89..4d449a94 100644 --- a/doc/lsst.ctrl.bps/quickstart.rst +++ b/doc/lsst.ctrl.bps/quickstart.rst @@ -1613,6 +1613,49 @@ Parsl). responsibility to remove them once no longer needed. The removal should be done regularly to avoid too many in single directory. +.. _bps_submit_as_batch: + +Submit Stages as Batch Jobs +--------------------------- + +.. warning:: + + This feature is part of the ongoing work for submission to remote sites. + So details may change. Also currently only the HTCondor plugin supports + this. + +In cases where one cannot run ``bps submit`` interactively (e.g., needs +too much memory), BPS can run the submit processes as batch jobs after +which the payload workflow will start running. + +The interactive submission process, ``bps batch-submit ``, +is much shorter. It will create a workflow with two jobs: + +- ``buildQuantumGraph`` which creates the quantum graph. +- ``preparePayloadWorkflow`` which does the rest of the submission stages seen + when running ``bps submit``. These include clustering, creation of the payload + workflow, and preparing the WMS-specific workflow. + +One can set runtime values specific to those jobs (e.g., ``requestMemory``) in +sections with corresponding names similar to ``finalJob``. Currently the +logging-related command-line arguments aren't passed from ``bps batch-submit`` +to these jobs. Instead, one can set ``bpsPreCommandOpts``, which has the +same default as the payload job. + +Even with the new jobs, there is only one output run collection, one submit +directory and one top level WMS ID to be used with BPS commands. + +``bps report`` will show these 2 new jobs same as the payload jobs. They +will be the only lines to appear in the report until the ``preparePayloadWorkflow`` +has finished at which time the expected payload lines should appear (from +``pipetaskInit`` through ``finalJob``). + +``bps cancel`` can be used to abort the run during these new jobs or later +during the running of the payload jobs. + +See the corresponding section in the WMS-plugin documentation for additional +information. + .. _bps-troubleshooting: Troubleshooting diff --git a/python/lsst/ctrl/bps/batch_submit.py b/python/lsst/ctrl/bps/batch_submit.py new file mode 100644 index 00000000..a9701630 --- /dev/null +++ b/python/lsst/ctrl/bps/batch_submit.py @@ -0,0 +1,234 @@ +# This file is part of ctrl_bps. +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This software is dual licensed under the GNU General Public License and also +# under a 3-clause BSD license. Recipients may choose which of these licenses +# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, +# respectively. If you choose the GPL option then the following text applies +# (but note that there is still no warranty even if you opt for BSD instead): +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +"""Driver to run submit stages as batch jobs.""" + +__all__ = ["batch_payload_prepare", "create_batch_stages"] + +import logging +import os + +from lsst.resources import ResourcePath, ResourcePathExpression +from lsst.utils.logging import VERBOSE +from lsst.utils.timer import time_this, timeMethod + +from . import ( + DEFAULT_MEM_FMT, + DEFAULT_MEM_UNIT, + BpsConfig, + GenericWorkflow, + GenericWorkflowJob, + GenericWorkflowLazyGroup, +) +from .pre_transform import cluster_quanta, read_quantum_graph +from .prepare import prepare +from .transform import _get_job_values, transform + +_LOG = logging.getLogger(__name__) + + +@timeMethod(logger=_LOG, logLevel=VERBOSE) +def create_batch_stages( + config: BpsConfig, prefix: ResourcePathExpression +) -> tuple[GenericWorkflow, BpsConfig]: + """Create a GenericWorkflow that performs the submit stages as a workflow. + + Parameters + ---------- + config : `lsst.ctrl.bps.BpsConfig` + BPS configuration. + prefix : `lsst.resources.ResourcePathExpression` + Root path for any output files. + + Returns + ------- + generic_workflow : `lsst.ctrl.bps.GenericWorkflow` + The generic workflow transformed from the clustered quantum graph. + generic_workflow_config : `lsst.ctrl.bps.BpsConfig` + Configuration to accompany GenericWorkflow. + """ + prefix = ResourcePath(prefix) + generic_workflow: GenericWorkflow = GenericWorkflow(name=f"{config['uniqProcName']}_ctrl") + cmd_line_key = "jobCommand" + + # build QuantumGraph job + search_opt = {} + if "buildQuantumGraph" in config: + search_opt["searchobj"] = config.get("buildQuantumGraph") + build_job = GenericWorkflowJob( + name="buildQuantumGraph", + label="buildQuantumGraph", + ) + job_values = _get_job_values(config, search_opt, cmd_line_key) + if not job_values["executable"]: + raise RuntimeError( + f"Missing executable for buildQuantumGraph. Double check submit yaml for {cmd_line_key}" + ) + for key, value in job_values.items(): + if key not in {"name", "label"}: + setattr(build_job, key, value) + + generic_workflow.add_job(build_job) + generic_workflow.run_attrs.update( + { + "bps_isjob": "True", + "bps_project": config["project"], + "bps_campaign": config["campaign"], + "bps_run": config["uniqProcName"], + "bps_operator": config["operator"], + "bps_payload": config["payloadName"], + "bps_runsite": config["computeSite"], + } + ) + + # cluster/transform/prepare job + search_opt = {} + if "preparePayloadWorkflow" in config: + search_opt["searchobj"] = config.get("preparePayloadWorkflow") + prepare_job = GenericWorkflowLazyGroup( + name="preparePayloadWorkflow", + label="preparePayloadWorkflow", + ) + job_values = _get_job_values(config, search_opt, cmd_line_key) + if not job_values["executable"]: + raise RuntimeError( + f"Missing executable for preparePayloadWorkflow. Double check submit yaml for {cmd_line_key}" + ) + for key, value in job_values.items(): + if key not in {"name", "label"}: + setattr(prepare_job, key, value) + + generic_workflow.add_job(prepare_job, parent_names=["buildQuantumGraph"]) + + _, save_workflow = config.search("saveGenericWorkflow", opt={"default": False}) + if save_workflow: + with prefix.join("bps_stages_generic_workflow.pickle").open("wb") as outfh: + generic_workflow.save(outfh, "pickle") + + return generic_workflow, config + + +@timeMethod(logger=_LOG, logLevel=VERBOSE) +def batch_payload_prepare(config: BpsConfig, prefix: ResourcePathExpression) -> None: + """Create a GenericWorkflow that performs the submit stages as a workflow. + + Parameters + ---------- + config : `lsst.ctrl.bps.BpsConfig` + BPS configuration. + + prefix : `lsst.resources.ResourcePathExpression` + Root path for any output files. + + Returns + ------- + generic_workflow : `lsst.ctrl.bps.GenericWorkflow` + The generic workflow transformed from the clustered quantum graph. + generic_workflow_config : `lsst.ctrl.bps.BpsConfig` + Configuration to accompany GenericWorkflow. + """ + prefix = ResourcePath(prefix) + # Read existing QuantumGraph + qgraph_filename = prefix.join(config["qgraphFileTemplate"]) + qgraph = read_quantum_graph(qgraph_filename) + config[".bps_defined.runQgraphFile"] = str(qgraph_filename) + + # Cluster + _LOG.info("Starting cluster stage (grouping quanta into jobs)") + with time_this( + log=_LOG, + level=logging.INFO, + prefix=None, + msg="Cluster stage completed", + mem_usage=True, + mem_unit=DEFAULT_MEM_UNIT, + mem_fmt=DEFAULT_MEM_FMT, + ): + clustered_qgraph = cluster_quanta(config, qgraph, config["uniqProcName"]) + + _LOG.info("ClusteredQuantumGraph contains %d cluster(s)", len(clustered_qgraph)) + + submit_path = config[".bps_defined.submitPath"] + _, save_clustered_qgraph = config.search("saveClusteredQgraph", opt={"default": False}) + if save_clustered_qgraph: + clustered_qgraph.save(os.path.join(submit_path, "bps_clustered_qgraph.pickle")) + _, save_dot = config.search("saveDot", opt={"default": False}) + if save_dot: + clustered_qgraph.draw(os.path.join(submit_path, "bps_clustered_qgraph.dot")) + + # Transform + _LOG.info("Starting transform stage (creating generic workflow)") + with time_this( + log=_LOG, + level=logging.INFO, + prefix=None, + msg="Transform stage completed", + mem_usage=True, + mem_unit=DEFAULT_MEM_UNIT, + mem_fmt=DEFAULT_MEM_FMT, + ): + generic_workflow, generic_workflow_config = transform(config, clustered_qgraph, submit_path) + _LOG.info("Generic workflow name '%s'", generic_workflow.name) + + num_jobs = sum(generic_workflow.job_counts.values()) + _LOG.info("GenericWorkflow contains %d job(s) (including final)", num_jobs) + + _, save_workflow = config.search("saveGenericWorkflow", opt={"default": False}) + if save_workflow: + with open(os.path.join(submit_path, "bps_generic_workflow.pickle"), "wb") as outfh: + generic_workflow.save(outfh, "pickle") + _, save_dot = config.search("saveDot", opt={"default": False}) + if save_dot: + with open(os.path.join(submit_path, "bps_generic_workflow.dot"), "w") as outfh: + generic_workflow.draw(outfh, "dot") + + # Prepare + _LOG.info("Starting prepare stage (creating specific implementation of workflow)") + with time_this( + log=_LOG, + level=logging.INFO, + prefix=None, + msg="Prepare stage completed", + mem_usage=True, + mem_unit=DEFAULT_MEM_UNIT, + mem_fmt=DEFAULT_MEM_FMT, + ): + wms_workflow = prepare(generic_workflow_config, generic_workflow, submit_path) + + # Add payload workflow to currently running workflow + _LOG.info("Starting update workflow") + with time_this( + log=_LOG, + level=logging.INFO, + prefix=None, + msg="Workflow update completed", + mem_usage=True, + mem_unit=DEFAULT_MEM_UNIT, + mem_fmt=DEFAULT_MEM_FMT, + ): + # Assuming submit_path for ctrl workflow is visible by this job. + wms_workflow.add_to_parent_workflow(generic_workflow_config) diff --git a/python/lsst/ctrl/bps/cli/cmd/__init__.py b/python/lsst/ctrl/bps/cli/cmd/__init__.py index 49c97190..aedb5323 100644 --- a/python/lsst/ctrl/bps/cli/cmd/__init__.py +++ b/python/lsst/ctrl/bps/cli/cmd/__init__.py @@ -27,6 +27,9 @@ __all__ = [ "acquire", + "batch_acquire", + "batch_prepare", + "batch_submit", "cluster", "transform", "prepare", @@ -41,6 +44,9 @@ from .commands import ( acquire, + batch_acquire, + batch_prepare, + batch_submit, cancel, cluster, ping, diff --git a/python/lsst/ctrl/bps/cli/cmd/commands.py b/python/lsst/ctrl/bps/cli/cmd/commands.py index 46ffb32a..e412edf8 100644 --- a/python/lsst/ctrl/bps/cli/cmd/commands.py +++ b/python/lsst/ctrl/bps/cli/cmd/commands.py @@ -36,6 +36,9 @@ from ... import BpsSubprocessError from ...drivers import ( acquire_qgraph_driver, + batch_acquire_driver, + batch_prepare_driver, + batch_submit_driver, cancel_driver, cluster_qgraph_driver, ping_driver, @@ -219,3 +222,30 @@ def ping(*args, **kwargs): def submitcmd(*args, **kwargs): """Submit a command for execution.""" submitcmd_driver(*args, **kwargs) + + +@click.command(cls=BpsCommand) +@opt.config_file_argument(required=True) +@opt.submission_options() +def batch_acquire(*args, **kwargs): + """Run inside a batch job to create a new quantum graph.""" + with catch_errors(): + batch_acquire_driver(*args, **kwargs) + + +@click.command(cls=BpsCommand) +@opt.config_file_argument(required=True) +@opt.submission_options() +def batch_prepare(*args, **kwargs): + """Run payload workflow preparation inside a batch job.""" + with catch_errors(): + batch_prepare_driver(*args, **kwargs) + + +@click.command(cls=BpsCommand) +@opt.config_file_argument(required=True) +@opt.submission_options() +def batch_submit(*args, **kwargs): + """Submit a workflow with preparation inside a batch jobs too.""" + with catch_errors(): + batch_submit_driver(*args, **kwargs) diff --git a/python/lsst/ctrl/bps/drivers.py b/python/lsst/ctrl/bps/drivers.py index 8debefdc..11e2828f 100644 --- a/python/lsst/ctrl/bps/drivers.py +++ b/python/lsst/ctrl/bps/drivers.py @@ -33,6 +33,9 @@ __all__ = [ "acquire_qgraph_driver", + "batch_acquire_driver", + "batch_prepare_driver", + "batch_submit_driver", "cancel_driver", "cluster_qgraph_driver", "ping_driver", @@ -49,12 +52,22 @@ import logging import os from pathlib import Path +from typing import Any from lsst.pipe.base.quantum_graph import PredictedQuantumGraph from lsst.utils.timer import time_this from lsst.utils.usage import get_peak_mem_usage -from . import BPS_DEFAULTS, BPS_SEARCH_ORDER, DEFAULT_MEM_FMT, DEFAULT_MEM_UNIT, BpsConfig +from . import ( + BPS_DEFAULTS, + BPS_SEARCH_ORDER, + DEFAULT_MEM_FMT, + DEFAULT_MEM_UNIT, + BpsConfig, + ClusteredQuantumGraph, + GenericWorkflow, +) +from .batch_submit import batch_payload_prepare, create_batch_stages from .bps_reports import compile_code_summary, compile_job_summary from .bps_utils import _dump_env_info, _dump_pkg_info, _make_id_link from .cancel import cancel @@ -67,7 +80,7 @@ submit_path_validator, ) from .ping import ping -from .pre_transform import acquire_quantum_graph, cluster_quanta +from .pre_transform import acquire_quantum_graph, cluster_quanta, read_quantum_graph from .prepare import prepare from .report import display_report, retrieve_report from .restart import restart @@ -142,14 +155,16 @@ def acquire_qgraph_driver(config_file: str, **kwargs) -> tuple[BpsConfig, Predic mem_unit=DEFAULT_MEM_UNIT, mem_fmt=DEFAULT_MEM_FMT, ): - qgraph_file, qgraph = acquire_quantum_graph(config, out_prefix=submit_path) + qgraph_file = acquire_quantum_graph(config, out_prefix=submit_path) + qgraph = read_quantum_graph(qgraph_file) + _log_mem_usage() config[".bps_defined.runQgraphFile"] = qgraph_file return config, qgraph -def cluster_qgraph_driver(config_file, **kwargs): +def cluster_qgraph_driver(config_file: str, **kwargs: Any) -> tuple[BpsConfig, ClusteredQuantumGraph]: """Group quanta into clusters. Parameters @@ -193,7 +208,7 @@ def cluster_qgraph_driver(config_file, **kwargs): return config, clustered_qgraph -def transform_driver(config_file, **kwargs): +def transform_driver(config_file: str, **kwargs: Any) -> tuple[BpsConfig, GenericWorkflow]: """Create a workflow for a specific workflow management system. Parameters @@ -207,7 +222,7 @@ def transform_driver(config_file, **kwargs): ------- generic_workflow_config : `lsst.ctrl.bps.BpsConfig` Configuration to use when creating the workflow. - generic_workflow : `lsst.ctrl.bps.BaseWmsWorkflow` + generic_workflow : `lsst.ctrl.bps.GenericWorkflow` Representation of the abstract/scientific workflow specific to a given workflow management system. """ @@ -687,3 +702,135 @@ def _log_mem_usage() -> None: "Peak memory usage for bps process %s (main), %s (largest child process)", *tuple(f"{val.to(DEFAULT_MEM_UNIT):{DEFAULT_MEM_FMT}}" for val in get_peak_mem_usage()), ) + + +def batch_acquire_driver(config_file: str, **kwargs: Any) -> None: + """Create a quantum graph from pipeline definition in a batch job. + + Parameters + ---------- + config_file : `str` + Name of the configuration file. + **kwargs : `~typing.Any` + Additional modifiers to the configuration. + """ + config = BpsConfig(config_file) + submit_path = config[".bps_defined.submitPath"] + + _LOG.info("Starting acquire stage (generating and/or reading quantum graph)") + with time_this( + log=_LOG, + level=logging.INFO, + prefix=None, + msg="Acquire stage completed", + mem_usage=True, + mem_unit=DEFAULT_MEM_UNIT, + mem_fmt=DEFAULT_MEM_FMT, + ): + _ = acquire_quantum_graph(config, out_prefix=submit_path) + _log_mem_usage() + + +def batch_prepare_driver(config_file: str, **kwargs: Any) -> None: + """Run workflow preparation in a batch job for an existing QuantumGraph. + + Parameters + ---------- + config_file : `str` + Name of the configuration file. + **kwargs : `~typing.Any` + Additional modifiers to the configuration. + """ + config = BpsConfig(config_file) + submit_path = config[".bps_defined.submitPath"] + + with time_this( + log=_LOG, + level=logging.INFO, + prefix=None, + msg="Batch preparation completed", + mem_usage=True, + mem_unit=DEFAULT_MEM_UNIT, + mem_fmt=DEFAULT_MEM_FMT, + ): + batch_payload_prepare(config, prefix=submit_path) + _log_mem_usage() + + +def batch_submit_driver(config_file: str, **kwargs: Any) -> None: + """Submit a workflow for execution with preparation done in batch jobs. + + Parameters + ---------- + config_file : `str` + Name of the configuration file. + **kwargs : `~typing.Any` + Additional modifiers to the configuration. + """ + kwargs.setdefault("runWmsSubmissionChecks", True) + + _LOG.info( + "DISCLAIMER: All values regarding memory consumption reported below are approximate and may " + "not accurately reflect actual memory usage by the bps process." + ) + + config = _init_submission_driver(config_file, **kwargs) + submit_path = config[".bps_defined.submitPath"] + + _LOG.info("Starting batch submission") + with time_this( + log=_LOG, + level=logging.INFO, + prefix=None, + msg="Batch submission completed", + mem_usage=True, + mem_unit=DEFAULT_MEM_UNIT, + mem_fmt=DEFAULT_MEM_FMT, + ): + _LOG.info("Starting to create control workflow") + with time_this( + log=_LOG, + level=logging.INFO, + prefix=None, + msg="Creation completed", + mem_usage=True, + mem_unit=DEFAULT_MEM_UNIT, + mem_fmt=DEFAULT_MEM_FMT, + ): + generic_workflow, config = create_batch_stages(config, submit_path) + + _LOG.info("Starting to prepare control workflow") + with time_this( + log=_LOG, + level=logging.INFO, + prefix=None, + msg="Preparation completed", + mem_usage=True, + mem_unit=DEFAULT_MEM_UNIT, + mem_fmt=DEFAULT_MEM_FMT, + ): + wms_workflow = prepare(config, generic_workflow, submit_path) + + if kwargs.get("dry_run", False): + return + + _LOG.info("Starting to submit control workflow") + with time_this( + log=_LOG, + level=logging.INFO, + prefix=None, + msg="Submission completed", + mem_usage=True, + mem_unit=DEFAULT_MEM_UNIT, + mem_fmt=DEFAULT_MEM_FMT, + ): + submit(config, wms_workflow, **kwargs) + _LOG.info("Run '%s' submitted for execution with id '%s'", wms_workflow.name, wms_workflow.run_id) + _log_mem_usage() + + _make_id_link(config, wms_workflow.run_id) + + print(f"Run Id: {wms_workflow.run_id}") + # The workflow names are for the controlling workflow with _ctrl + # in the name. We want to still print out the normal run name. + print(f"Run Name: {config['uniqProcName']}") diff --git a/python/lsst/ctrl/bps/etc/bps_defaults.yaml b/python/lsst/ctrl/bps/etc/bps_defaults.yaml index 7b1e39a0..a38fe32d 100644 --- a/python/lsst/ctrl/bps/etc/bps_defaults.yaml +++ b/python/lsst/ctrl/bps/etc/bps_defaults.yaml @@ -150,3 +150,11 @@ memoryLimit: 491520 # Default values for making id soft link to submit directory makeIdLink: False idLinkPath: "${PWD}/bps_links" + + +# Running submit stages as batch jobs +bpsPreCommandOpts: "{defaultPreCmdOpts}" +buildQuantumGraph: + jobCommand: "${CTRL_BPS_DIR}/bin/bps {bpsPreCommandOpts} batch-acquire {configFile}" +preparePayloadWorkflow: + jobCommand: "${CTRL_BPS_DIR}/bin/bps {bpsPreCommandOpts} batch-prepare {configFile}" diff --git a/python/lsst/ctrl/bps/generic_workflow.py b/python/lsst/ctrl/bps/generic_workflow.py index 65889d95..3539cf0d 100644 --- a/python/lsst/ctrl/bps/generic_workflow.py +++ b/python/lsst/ctrl/bps/generic_workflow.py @@ -33,6 +33,7 @@ "GenericWorkflowFile", "GenericWorkflowGroup", "GenericWorkflowJob", + "GenericWorkflowLazyGroup", "GenericWorkflowNode", "GenericWorkflowNodeType", "GenericWorkflowNoopJob", @@ -127,6 +128,9 @@ class GenericWorkflowNodeType(IntEnum): GROUP = auto() """A special group (subdag) of jobs.""" + LAZY_GROUP = auto() + """When run will generate sub-workflow of jobs.""" + @dataclasses.dataclass(slots=True) class GenericWorkflowNode: @@ -474,7 +478,7 @@ def add_job( super().add_node(job.name, job=job) self.add_job_relationships(parent_names, job.name) self.add_job_relationships(job.name, child_names) - if job.node_type == GenericWorkflowNodeType.PAYLOAD: + if job.node_type in [GenericWorkflowNodeType.PAYLOAD, GenericWorkflowNodeType.LAZY_GROUP]: job = cast(GenericWorkflowJob, job) self.add_executable(job.executable) self._job_labels.add_job( @@ -1335,6 +1339,18 @@ def __init__(self, name: str, label: str, blocking: bool = False) -> None: self.blocking = blocking +@dataclasses.dataclass(slots=True) +class GenericWorkflowLazyGroup(GenericWorkflowJob): + """Node representing a group of jobs to be generated when run.""" + + # Docstring inherited. + + @property + def node_type(self) -> GenericWorkflowNodeType: + """Indicate this is a lazy group job.""" + return GenericWorkflowNodeType.LAZY_GROUP + + class GenericWorkflowLabels: """Label-oriented representation of the GenericWorkflowJobs.""" diff --git a/python/lsst/ctrl/bps/initialize.py b/python/lsst/ctrl/bps/initialize.py index 166c9ac3..f4d466c9 100644 --- a/python/lsst/ctrl/bps/initialize.py +++ b/python/lsst/ctrl/bps/initialize.py @@ -140,7 +140,10 @@ def init_submission( # save copy of configs (orig and expanded config) shutil.copy2(config_file, submit_path) - with open(f"{submit_path}/{config['uniqProcName']}_config.yaml", "w") as fh: + expanded_config_file = f"{submit_path}/{config['uniqProcName']}_config.yaml" + config[".bps_defined.configFile"] = expanded_config_file + + with open(expanded_config_file, "w") as fh: config.dump(fh) # Dump information about runtime environment and software versions in use. diff --git a/python/lsst/ctrl/bps/pre_transform.py b/python/lsst/ctrl/bps/pre_transform.py index e4cef200..c712c9ba 100644 --- a/python/lsst/ctrl/bps/pre_transform.py +++ b/python/lsst/ctrl/bps/pre_transform.py @@ -41,7 +41,7 @@ from lsst.pipe.base import QuantumGraph from lsst.pipe.base.pipeline_graph import TaskImportMode from lsst.pipe.base.quantum_graph import PredictedQuantumGraph -from lsst.resources import ResourcePath +from lsst.resources import ResourcePath, ResourcePathExpression from lsst.utils import doImport from lsst.utils.logging import VERBOSE from lsst.utils.timer import time_this, timeMethod @@ -50,7 +50,7 @@ @timeMethod(logger=_LOG, logLevel=VERBOSE) -def acquire_quantum_graph(config: BpsConfig, out_prefix: str = "") -> tuple[str, PredictedQuantumGraph]: +def acquire_quantum_graph(config: BpsConfig, out_prefix: str = "") -> str: """Read a quantum graph from a file or create one from scratch. Parameters @@ -64,10 +64,7 @@ def acquire_quantum_graph(config: BpsConfig, out_prefix: str = "") -> tuple[str, Returns ------- qgraph_filename : `str` - Name of file containing QuantumGraph that was read into qgraph. - qgraph : `lsst.pipe.base.quantum_graph.PredictedQuantumGraph` - A quantum graph read in from pre-generated file or one that is the - result of running code that generates it. + Name of file containing the quantum graph. """ # Check to see if user provided pre-generated QuantumGraph. found, input_qgraph_filename = config.search("qgraphFile") @@ -91,6 +88,24 @@ def acquire_quantum_graph(config: BpsConfig, out_prefix: str = "") -> tuple[str, with time_this(log=_LOG, level=logging.INFO, prefix=None, msg="Completed creating quantum graph"): qgraph_filename = create_quantum_graph(config, out_prefix) + return qgraph_filename + + +@timeMethod(logger=_LOG, logLevel=VERBOSE) +def read_quantum_graph(qgraph_filename: ResourcePathExpression) -> PredictedQuantumGraph: + """Read a quantum graph from a file. + + Parameters + ---------- + qgraph_filename : `lsst.resources.ResourcePathExpression` + Name of file containing PredictedQuantumGraph to be read. + + Returns + ------- + qgraph : `lsst.pipe.base.quantum_graph.PredictedQuantumGraph` + A quantum graph read in from pre-generated file or one that is the + result of running code that generates it. + """ _LOG.info("Reading quantum graph from '%s'", qgraph_filename) with time_this(log=_LOG, level=logging.INFO, prefix=None, msg="Completed reading quantum graph"): qgraph_path = ResourcePath(qgraph_filename) @@ -102,7 +117,7 @@ def acquire_quantum_graph(config: BpsConfig, out_prefix: str = "") -> tuple[str, qgraph = PredictedQuantumGraph.from_old_quantum_graph(QuantumGraph.loadUri(qgraph_path)) else: raise ValueError(f"Unrecognized extension for quantum graph file: {qgraph_filename}.") - return qgraph_filename, qgraph + return qgraph def execute(command: str, filename: str, write_buffering: int = 1) -> int: diff --git a/python/lsst/ctrl/bps/tests/gw_test_utils.py b/python/lsst/ctrl/bps/tests/gw_test_utils.py index e3cdb278..e3c160c5 100644 --- a/python/lsst/ctrl/bps/tests/gw_test_utils.py +++ b/python/lsst/ctrl/bps/tests/gw_test_utils.py @@ -33,6 +33,7 @@ "make_5_label_workflow", "make_5_label_workflow_2_groups", "make_5_label_workflow_middle_groups", + "make_lazy_workflow", ] import logging @@ -44,6 +45,7 @@ GenericWorkflowExec, GenericWorkflowGroup, GenericWorkflowJob, + GenericWorkflowLazyGroup, GenericWorkflowNodeType, GenericWorkflowNoopJob, ) @@ -656,3 +658,43 @@ def compare_generic_workflows(gwf1: GenericWorkflow, gwf2: GenericWorkflow) -> b equal = False return equal + + +def make_lazy_workflow(workflow_name: str, final: bool) -> GenericWorkflow: + """Create a simple test workflow containing a lazy workflow node. + + Parameters + ---------- + workflow_name : `str` + Name of the test workflow. + final : `bool` + Whether to add a final job. + + Returns + ------- + gwf : `lsst.ctrl.bps.GenericWorkflow` + The test workflow. + """ + gwf = GenericWorkflow(workflow_name) + + # job 1 + gwexec1 = GenericWorkflowExec("exec1", "my_exec1.sh", False) + job1 = GenericWorkflowJob("job1", "label1", executable=gwexec1) + gwf.add_job(job1, None) + + # lazy workflow job + gwexec2 = GenericWorkflowExec("exec2", "${CTRL_BPS_DIR}/python/lsst/ctrl/bps/_make_workflow.sh", False) + job2 = GenericWorkflowLazyGroup("lazy2", "label2", executable=gwexec2) + gwf.add_job(job2, [job1.name]) + + # Job after + gwexec3 = GenericWorkflowExec("exec3", "my_exec3.sh", False) + job3 = GenericWorkflowJob("job3", "label3", executable=gwexec3) + gwf.add_job(job3, [job2.name]) + + if final: + gwexec = GenericWorkflowExec("finalJob.bash", "finalJob.bash", True) + job = GenericWorkflowJob("finalJob", label="finalJob", executable=gwexec) + gwf.add_final(job) + + return gwf diff --git a/python/lsst/ctrl/bps/wms_service.py b/python/lsst/ctrl/bps/wms_service.py index 7665526d..76082c02 100644 --- a/python/lsst/ctrl/bps/wms_service.py +++ b/python/lsst/ctrl/bps/wms_service.py @@ -43,6 +43,8 @@ from enum import Enum from typing import Any +from . import BpsConfig + _LOG = logging.getLogger(__name__) @@ -576,3 +578,14 @@ def write(self, out_prefix): as well as internal WMS files. """ raise NotImplementedError + + @abstractmethod + def add_to_parent_workflow(self, config: BpsConfig) -> None: + """Add self to parent workflow. + + Parameters + ---------- + config : `lsst.ctrl.bps.BpsConfig` + Configuration. + """ + raise NotImplementedError diff --git a/tests/qg_test_utils.py b/tests/qg_test_utils.py index 1e61514d..b57daf11 100644 --- a/tests/qg_test_utils.py +++ b/tests/qg_test_utils.py @@ -304,7 +304,7 @@ def make_test_helper() -> InMemoryRepo: return helper -def make_test_quantum_graph(run: str = "run", uneven=False): +def make_test_quantum_graph(run: str = "run", uneven=False, save_filename: str = None): """Create a quantum graph for unit tests. Parameters @@ -314,6 +314,8 @@ def make_test_quantum_graph(run: str = "run", uneven=False): uneven : `bool`, optional Whether some of the quanta for initial tasks are not included as if finished in previous run. + save_filename : `str`, optional + Save test quantum graph to file using given filename. Returns ------- @@ -345,4 +347,7 @@ def make_test_quantum_graph(run: str = "run", uneven=False): } qgc.set_thin_graph() qgc.set_header_counts() + + if save_filename: + qgc.write(save_filename) return qgc.assemble() diff --git a/tests/test_batch_submit.py b/tests/test_batch_submit.py new file mode 100644 index 00000000..1af3bf5d --- /dev/null +++ b/tests/test_batch_submit.py @@ -0,0 +1,131 @@ +# This file is part of ctrl_bps. +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This software is dual licensed under the GNU General Public License and also +# under a 3-clause BSD license. Recipients may choose which of these licenses +# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, +# respectively. If you choose the GPL option then the following text applies +# (but note that there is still no warranty even if you opt for BSD instead): +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +"""Unit tests for batch_submit.py.""" + +import tempfile +import unittest +from pathlib import Path + +from lsst.ctrl.bps import BpsConfig, batch_submit + + +class TestCreateBatchStages(unittest.TestCase): + """Tests for create_batch_stages function.""" + + def testMissingBuildCmd(self): + """Missing buildQuantumGraph jobCommand""" + config = BpsConfig({"uniqProcName": "uniq_proc_name"}) + with self.assertRaisesRegex( + RuntimeError, "Missing executable for buildQuantumGraph. Double check submit yaml for jobCommand" + ): + _ = batch_submit.create_batch_stages(config, "not_used_prefix") + + def testMissingPrepareCmd(self): + """Missing preparePayloadWorkflow jobCommand""" + config = BpsConfig( + { + "configFile": "not_used_configFile", + "uniqProcName": "uniq_proc_name", + "operator": "testuser", + "payload": {"payloadName": "testPayload"}, + "bpsPreCommandOpts": "--long-log --log-level=VERBOSE", + "buildQuantumGraph": {"jobCommand": "${CTRL_BPS_DIR}/bin/bps batch-acquire {configFile}"}, + } + ) + with self.assertRaisesRegex( + RuntimeError, + "Missing executable for preparePayloadWorkflow. Double check submit yaml for jobCommand", + ): + _ = batch_submit.create_batch_stages(config, "not_used_prefix") + + def testSuccess(self): + # No saving of files + config = BpsConfig( + { + "configFile": "not_used_configFile", + "uniqProcName": "uniq_proc_name", + "operator": "testuser", + "payload": {"payloadName": "testPayload"}, + "bpsPreCommandOpts": "--long-log --log-level=VERBOSE", + "buildQuantumGraph": { + "jobCommand": "${CTRL_BPS_DIR}/bin/bps batch-acquire {configFile}", + "requestMemory": 16384, + }, + "preparePayloadWorkflow": { + "jobCommand": "${CTRL_BPS_DIR}/bin/bps batch-prepare {configFile}", + "requestMemory": 24576, + }, + } + ) + + with tempfile.TemporaryDirectory() as tmpdir: + gw, config = batch_submit.create_batch_stages(config, tmpdir) + self.assertIn("buildQuantumGraph", gw) + job = gw.get_job("buildQuantumGraph") + self.assertIn("batch-acquire", job.arguments) + self.assertEqual(job.request_memory, 16384) + self.assertIn("preparePayloadWorkflow", gw) + job = gw.get_job("preparePayloadWorkflow") + self.assertIn("batch-prepare", job.arguments) + self.assertEqual(job.request_memory, 24576) + + # Check we didn't make any files + self.assertEqual(list(Path(tmpdir).iterdir()), []) + + def testSaving(self): + config = BpsConfig( + { + "configFile": "not_used_configFile", + "uniqProcName": "uniq_proc_name", + "operator": "testuser", + "payload": {"payloadName": "testPayload"}, + "bpsPreCommandOpts": "--long-log --log-level=VERBOSE", + "buildQuantumGraph": { + "jobCommand": "${CTRL_BPS_DIR}/bin/bps batch-acquire {configFile}", + "requestMemory": 16384, + }, + "preparePayloadWorkflow": { + "jobCommand": "${CTRL_BPS_DIR}/bin/bps batch-prepare {configFile}", + "requestMemory": 24576, + }, + "saveGenericWorkflow": True, + } + ) + with tempfile.TemporaryDirectory() as tmpdir: + gw, config = batch_submit.create_batch_stages(config, tmpdir) + self.assertTrue((Path(tmpdir) / "bps_stages_generic_workflow.pickle").exists()) + + +class TestBatchPayloadPrepare(unittest.TestCase): + """Tests for batch_payload_prepare function.""" + + def testSuccess(self): + pass + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_cli_commands.py b/tests/test_cli_commands.py index 3bef420d..116a1769 100644 --- a/tests/test_cli_commands.py +++ b/tests/test_cli_commands.py @@ -185,5 +185,47 @@ def testStatusNonZeroStatus(self): ) +class TestCommandBatchSubmit(unittest.TestCase): + """Test executing the batch-submit subcommand.""" + + def setUp(self): + self.runner = LogCliRunner() + + def testBatchSubmit(self): + with unittest.mock.patch("lsst.ctrl.bps.cli.cmd.commands.batch_submit_driver") as mock_driver: + mock_driver.return_value = 0 + result = self.runner.invoke(bps.cli, ["batch-submit", "test.yaml"]) + self.assertEqual(result.exit_code, 0) + mock_driver.assert_called_once() + + +class TestCommandBatchAcquire(unittest.TestCase): + """Test executing the batch-acquire subcommand.""" + + def setUp(self): + self.runner = LogCliRunner() + + def testBatchAcquire(self): + with unittest.mock.patch("lsst.ctrl.bps.cli.cmd.commands.batch_acquire_driver") as mock_driver: + mock_driver.return_value = 0 + result = self.runner.invoke(bps.cli, ["batch-acquire", "test.yaml"]) + self.assertEqual(result.exit_code, 0) + mock_driver.assert_called_once() + + +class TestCommandBatchPrepare(unittest.TestCase): + """Test executing the batch-prepare subcommand.""" + + def setUp(self): + self.runner = LogCliRunner() + + def testBatchPrepare(self): + with unittest.mock.patch("lsst.ctrl.bps.cli.cmd.commands.batch_prepare_driver") as mock_driver: + mock_driver.return_value = 0 + result = self.runner.invoke(bps.cli, ["batch-prepare", "test.yaml"]) + self.assertEqual(result.exit_code, 0) + mock_driver.assert_called_once() + + if __name__ == "__main__": unittest.main() diff --git a/tests/test_drivers.py b/tests/test_drivers.py index df851e2d..da8f0d9b 100644 --- a/tests/test_drivers.py +++ b/tests/test_drivers.py @@ -31,12 +31,12 @@ import shutil import tempfile import unittest +from pathlib import Path import yaml -from lsst.ctrl.bps import WmsRunReport, WmsStates +from lsst.ctrl.bps import BaseWmsWorkflow, BpsConfig, WmsRunReport, WmsStates, drivers from lsst.ctrl.bps.bps_reports import compile_code_summary, compile_job_summary -from lsst.ctrl.bps.drivers import _init_submission_driver, ping_driver, report_driver, status_driver TESTDIR = os.path.abspath(os.path.dirname(__file__)) @@ -63,7 +63,7 @@ def testDeprecatedOutCollection(self): with tempfile.NamedTemporaryFile(mode="w+", suffix=".yaml") as file: yaml.dump(config, stream=file) with self.assertRaisesRegex(KeyError, "outCollection"): - _init_submission_driver(file.name) + drivers._init_submission_driver(file.name) @unittest.mock.patch("lsst.ctrl.bps.initialize.BPS_DEFAULTS", {}) def testMissingOutputRun(self): @@ -71,7 +71,7 @@ def testMissingOutputRun(self): with tempfile.NamedTemporaryFile(mode="w+", suffix=".yaml") as file: yaml.dump(config, stream=file) with self.assertRaisesRegex(KeyError, "outputRun"): - _init_submission_driver(file.name) + drivers._init_submission_driver(file.name) @unittest.mock.patch("lsst.ctrl.bps.initialize.BPS_DEFAULTS", {}) def testMissingSubmitPath(self): @@ -79,19 +79,19 @@ def testMissingSubmitPath(self): with tempfile.NamedTemporaryFile(mode="w+", suffix=".yaml") as file: yaml.dump(config, stream=file) with self.assertRaisesRegex(KeyError, "submitPath"): - _init_submission_driver(file.name) + drivers._init_submission_driver(file.name) class TestPingDriver(unittest.TestCase): """Test ping.""" def testWmsServiceSuccess(self): - retval = ping_driver("wms_test_utils.WmsServiceSuccess") + retval = drivers.ping_driver("wms_test_utils.WmsServiceSuccess") self.assertEqual(retval, 0) def testWmsServiceFailure(self): with self.assertLogs(level=logging.ERROR) as cm: - retval = ping_driver("wms_test_utils.WmsServiceFailure") + retval = drivers.ping_driver("wms_test_utils.WmsServiceFailure") self.assertNotEqual(retval, 0) self.assertEqual(cm.records[0].getMessage(), "Couldn't contact service X") @@ -99,7 +99,7 @@ def testWmsServiceEnvVar(self): with unittest.mock.patch.dict( os.environ, {"BPS_WMS_SERVICE_CLASS": "wms_test_utils.WmsServiceSuccess"} ): - retval = ping_driver() + retval = drivers.ping_driver() self.assertEqual(retval, 0) @unittest.mock.patch( @@ -108,13 +108,13 @@ def testWmsServiceEnvVar(self): def testWmsServiceNone(self): with unittest.mock.patch.dict(os.environ, {}): with self.assertLogs(level=logging.INFO) as cm: - retval = ping_driver() + retval = drivers.ping_driver() self.assertEqual(retval, 0) self.assertEqual(cm.records[0].getMessage(), "DEFAULT None") def testWmsServicePassThru(self): with self.assertLogs(level=logging.INFO) as cm: - retval = ping_driver("wms_test_utils.WmsServicePassThru", "EXTRA_VALUES") + retval = drivers.ping_driver("wms_test_utils.WmsServicePassThru", "EXTRA_VALUES") self.assertEqual(retval, 0) self.assertRegex(cm.output[0], "INFO.+EXTRA_VALUES") @@ -124,13 +124,17 @@ class TestStatusDriver(unittest.TestCase): def testWmsServiceSuccess(self): with self.assertLogs(level=logging.INFO) as cm: - retval = status_driver("wms_test_utils.WmsServiceSuccess", run_id="/dummy/path", hist_days=3) + retval = drivers.status_driver( + "wms_test_utils.WmsServiceSuccess", run_id="/dummy/path", hist_days=3 + ) self.assertEqual(retval, WmsStates.SUCCEEDED.value) self.assertEqual(cm.records[0].getMessage(), "status: SUCCEEDED") def testWmsServiceFailure(self): with self.assertLogs(level=logging.WARNING) as cm: - retval = status_driver("wms_test_utils.WmsServiceFailure", run_id="/dummy/path", hist_days=3) + retval = drivers.status_driver( + "wms_test_utils.WmsServiceFailure", run_id="/dummy/path", hist_days=3 + ) self.assertEqual(retval, WmsStates.FAILED.value) self.assertEqual(cm.records[0].getMessage(), "Dummy error message.") @@ -139,7 +143,7 @@ def testWmsServiceFailure(self): ) def testWmsServiceNone(self): with unittest.mock.patch.dict(os.environ, {}): - retval = status_driver(None, run_id="/dummy/path", hist_days=3) + retval = drivers.status_driver(None, run_id="/dummy/path", hist_days=3) self.assertEqual(retval, WmsStates.RUNNING.value) @@ -152,7 +156,7 @@ class TestReportDriver(unittest.TestCase): def testWmsServiceFromDefaults(self): # Should not raise an exception and use default from BPS_DEFAULTS. with unittest.mock.patch.dict(os.environ, {}, clear=True): - report_driver( + drivers.report_driver( wms_service=None, run_id=None, user=None, @@ -165,7 +169,7 @@ def testWmsServiceFromEnvVar(self): with unittest.mock.patch.dict( os.environ, {"BPS_WMS_SERVICE_CLASS": "wms_test_utils.WmsServiceSuccess"} ): - report_driver( + drivers.report_driver( wms_service=None, run_id=None, user=None, @@ -178,7 +182,7 @@ def testWmsServiceFromEnvVar(self): def testHistDefault(self, mock_display, mock_retrieve): mock_retrieve.return_value = ([], []) - report_driver( + drivers.report_driver( wms_service="wms_test_utils.WmsServiceSuccess", run_id="123", user=None, @@ -195,7 +199,7 @@ def testHistDefault(self, mock_display, mock_retrieve): def testHistCustom(self, mock_display, mock_retrieve): mock_retrieve.return_value = ([], []) - report_driver( + drivers.report_driver( wms_service="wms_test_utils.WmsServiceSuccess", run_id="123", user=None, @@ -212,7 +216,7 @@ def testHistCustom(self, mock_display, mock_retrieve): def testPostprocessorsWithoutExitCodes(self, mock_display, mock_retrieve): mock_retrieve.return_value = ([], []) - report_driver( + drivers.report_driver( wms_service="wms_test_utils.WmsServiceSuccess", run_id="123", user=None, @@ -231,7 +235,7 @@ def testPostprocessorsWithoutExitCodes(self, mock_display, mock_retrieve): def testPostprocessorsWithExitCodes(self, mock_display, mock_retrieve): mock_retrieve.return_value = ([], []) - report_driver( + drivers.report_driver( wms_service="wms_test_utils.WmsServiceSuccess", run_id="123", user=None, @@ -251,7 +255,7 @@ def testPostprocessorsWithExitCodes(self, mock_display, mock_retrieve): def testPostprocessorsNoRunId(self, mock_display, mock_retrieve): mock_retrieve.return_value = ([], []) - report_driver( + drivers.report_driver( wms_service="wms_test_utils.WmsServiceSuccess", run_id=None, user=None, @@ -269,7 +273,7 @@ def testDisplayCalledIfRuns(self, mock_display, mock_retrieve): mock_runs = [WmsRunReport(wms_id="1", state=WmsStates.SUCCEEDED)] mock_retrieve.return_value = (mock_runs, []) - report_driver( + drivers.report_driver( wms_service="wms_test_utils.WmsServiceSuccess", run_id=None, user=None, @@ -288,7 +292,7 @@ def testDisplayCalledIfMessages(self, mock_display, mock_retrieve): mock_messages = ["Warning message 1", "Warning message 2"] mock_retrieve.return_value = ([], mock_messages) - report_driver( + drivers.report_driver( wms_service="wms_test_utils.WmsServiceSuccess", run_id=None, user=None, @@ -307,7 +311,7 @@ def testDisplayCalledIfMessages(self, mock_display, mock_retrieve): def testNoRecordsFoundMessage(self, mock_print, mock_display, mock_retrieve): mock_retrieve.return_value = ([], []) - report_driver( + drivers.report_driver( wms_service="wms_test_utils.WmsServiceSuccess", run_id="123", user=None, @@ -325,5 +329,129 @@ def testNoRecordsFoundMessage(self, mock_print, mock_display, mock_retrieve): self.assertIn("123", call_args) +class TestAcquireQgraphDriver(unittest.TestCase): + """Test acquire_qgraph_driver function.""" + + def setUp(self): + self.tmpdir = Path(tempfile.mkdtemp()) + self.config_file = str(self.tmpdir / "config.yaml") + config = BpsConfig({"bps_defined": {"submitPath": str(self.tmpdir)}}) + + with open(self.config_file, "w") as fh: + config.dump(fh) + + def tearDown(self): + shutil.rmtree(self.tmpdir, ignore_errors=True) + + @unittest.mock.patch("lsst.ctrl.bps.drivers.read_quantum_graph") + @unittest.mock.patch("lsst.ctrl.bps.drivers.acquire_quantum_graph") + @unittest.mock.patch("lsst.ctrl.bps.drivers._init_submission_driver") + def testSuccess(self, mock_init, mock_acquire, mock_read): + drivers.acquire_qgraph_driver(self.config_file) + mock_init.assert_called_once() + mock_acquire.assert_called_once() + mock_read.assert_called_once() + + +class TestBatchAcquireDriver(unittest.TestCase): + """Test batch_acquire_driver function.""" + + def setUp(self): + self.tmpdir = Path(tempfile.mkdtemp()) + self.config_file = str(self.tmpdir / "config.yaml") + config = BpsConfig({"bps_defined": {"submitPath": str(self.tmpdir)}}) + + with open(self.config_file, "w") as fh: + config.dump(fh) + + def tearDown(self): + shutil.rmtree(self.tmpdir, ignore_errors=True) + + @unittest.mock.patch("lsst.ctrl.bps.drivers.acquire_quantum_graph") + def testSuccess(self, mock_acquire): + drivers.batch_acquire_driver(self.config_file) + mock_acquire.assert_called_once() + + +class TestBatchPrepareDriver(unittest.TestCase): + """Test batch_prepare_driver function.""" + + def setUp(self): + self.tmpdir = tempfile.mkdtemp() + self.config_file = f"{self.tmpdir}/config.yaml" + config = BpsConfig({"bps_defined": {"submitPath": str(self.tmpdir)}}) + + with open(self.config_file, "w") as fh: + config.dump(fh) + + def tearDown(self): + shutil.rmtree(self.tmpdir, ignore_errors=True) + + @unittest.mock.patch("lsst.ctrl.bps.drivers.batch_payload_prepare") + def testSuccess(self, mock_prepare): + drivers.batch_prepare_driver(self.config_file) + mock_prepare.assert_called_once() + + +class _TestWorkflow(BaseWmsWorkflow): + def __init__(self, name, config=None, run_id=None): + super().__init__(name, config) + self.run_id = run_id + + def write(self, out_prefix): + pass # pragma: no cover + + def add_to_parent_workflow(self, config): + pass # pragma: no cover + + +class TestBatchSubmitDriver(unittest.TestCase): + """Test batch_submit_driver function.""" + + def setUp(self): + self.tmpdir = Path(tempfile.mkdtemp()) + self.config_file = str(self.tmpdir / "config.yaml") + self.config = BpsConfig( + {"bps_defined": {"submitPath": str(self.tmpdir), "outputRun": "output_run_dir"}} + ) + + with open(self.config_file, "w") as fh: + self.config.dump(fh) + + def tearDown(self): + shutil.rmtree(self.tmpdir, ignore_errors=True) + + @unittest.mock.patch("lsst.ctrl.bps.drivers.submit") + @unittest.mock.patch("lsst.ctrl.bps.drivers.prepare") + @unittest.mock.patch("lsst.ctrl.bps.drivers.create_batch_stages") + @unittest.mock.patch("lsst.ctrl.bps.drivers._init_submission_driver") + def testDryRun(self, mock_init, mock_create, mock_prepare, mock_submit): + mock_init.return_value = self.config + mock_create.return_value = ["mock_generic_workflow", self.config] + mock_prepare.return_value = _TestWorkflow("mock1", self.config) + drivers.batch_submit_driver(self.config_file, dry_run=True) + mock_init.assert_called_once() + mock_create.assert_called_once() + mock_prepare.assert_called_once() + mock_submit.assert_not_called() + + @unittest.mock.patch("lsst.ctrl.bps.drivers._make_id_link") + @unittest.mock.patch("lsst.ctrl.bps.drivers.submit") + @unittest.mock.patch("lsst.ctrl.bps.drivers.prepare") + @unittest.mock.patch("lsst.ctrl.bps.drivers.create_batch_stages") + @unittest.mock.patch("lsst.ctrl.bps.drivers._init_submission_driver") + def testSubmit(self, mock_init, mock_create, mock_prepare, mock_submit, mock_link): + mock_init.return_value = self.config + mock_create.return_value = ["mock_generic_workflow", self.config] + mock_prepare.return_value = _TestWorkflow("mock1", self.config) + mock_submit.return_value = _TestWorkflow("mock1", self.config, 12345) + drivers.batch_submit_driver(self.config_file) + mock_init.assert_called_once() + mock_create.assert_called_once() + mock_prepare.assert_called_once() + mock_submit.assert_called_once() + mock_link.assert_called_once() + + if __name__ == "__main__": unittest.main() diff --git a/tests/test_pre_transform.py b/tests/test_pre_transform.py index 72515034..7125dd44 100644 --- a/tests/test_pre_transform.py +++ b/tests/test_pre_transform.py @@ -33,8 +33,11 @@ import unittest from pathlib import Path -from lsst.ctrl.bps import BpsConfig, BpsSubprocessError, ClusteredQuantumGraph -from lsst.ctrl.bps.pre_transform import cluster_quanta, create_quantum_graph, execute, update_quantum_graph +from qg_test_utils import make_test_quantum_graph + +from lsst.ctrl.bps import BpsConfig, BpsSubprocessError, ClusteredQuantumGraph, pre_transform +from lsst.pipe.base import QuantumGraph +from lsst.pipe.base.quantum_graph import PredictedQuantumGraph from lsst.pipe.base.tests.mocks import InMemoryRepo TESTDIR = os.path.abspath(os.path.dirname(__file__)) @@ -56,7 +59,7 @@ def testSuccessfulExecution(self): content = "Successful execution" command = f"{sys.executable} -c 'print(\"{content}\")'" with self.assertLogs(logger=self.logger, level="INFO") as cm: - status = execute(command, self.file.name) + status = pre_transform.execute(command, self.file.name) self.assertIn(content, cm.output[0]) self.file.seek(0) file_contents = self.file.read() @@ -66,7 +69,7 @@ def testSuccessfulExecution(self): def testFailingExecution(self): """Test exit status if command failed.""" - status = execute("false", self.file.name) + status = pre_transform.execute("false", self.file.name) self.assertIn("false", self.file.read()) self.assertNotEqual(status, 0) @@ -92,7 +95,7 @@ def testSuccess(self): """Test if a new quantum graph was created successfully.""" config = BpsConfig(self.settings, search_order=[]) with self.assertLogs(logger=self.logger, level="INFO") as cm: - qgraph_filename = create_quantum_graph(config, self.tmpdir) + qgraph_filename = pre_transform.create_quantum_graph(config, self.tmpdir) _, command = config.search("createQuantumGraph", opt={"curvals": {"qgraphFile": qgraph_filename}}) self.assertIn(command, cm.output[0]) self.assertTrue(os.path.exists(qgraph_filename)) @@ -102,14 +105,14 @@ def testCommandMissing(self): del self.settings["createQuantumGraph"] config = BpsConfig(self.settings, search_order=[]) with self.assertRaisesRegex(KeyError, "command.*not found"): - create_quantum_graph(config, self.tmpdir) + pre_transform.create_quantum_graph(config, self.tmpdir) def testFailure(self): """Test if error is caught when the quantum graph creation fails.""" self.settings["createQuantumGraph"] = "bash -c 'exit 2'" config = BpsConfig(self.settings, search_order=[]) with self.assertRaises(BpsSubprocessError) as cm: - create_quantum_graph(config, self.tmpdir) + pre_transform.create_quantum_graph(config, self.tmpdir) self.assertEqual(cm.exception.errno, errno.ENOENT) self.assertIn("non-zero exit code", str(cm.exception)) @@ -143,7 +146,7 @@ def testSuccess(self): """Test if the quantum graph was updated.""" config = BpsConfig(self.settings, search_order=[]) with self.assertLogs(logger=self.logger, level="INFO") as cm: - update_quantum_graph(config, str(self.src), self.tmpdir) + pre_transform.update_quantum_graph(config, str(self.src), self.tmpdir) _, command = config.search("updateQuantumGraph", opt={"curvals": {"qgraphFile": str(self.src)}}) self.assertIn("backing up", cm.output[0].lower()) self.assertIn("completed", cm.output[1].lower()) @@ -156,7 +159,7 @@ def testSuccessInPlace(self): """Test if a quantum graph was updated inplace.""" config = BpsConfig(self.settings, search_order=[]) with self.assertLogs(logger=self.logger, level="INFO") as cm: - update_quantum_graph(config, str(self.src), self.tmpdir, inplace=True) + pre_transform.update_quantum_graph(config, str(self.src), self.tmpdir, inplace=True) _, command = config.search("updateQuantumGraph", opt={"curvals": {"qgraphFile": str(self.src)}}) self.assertIn(command, cm.output[0]) self.assertTrue(self.src.read_text(), "bar\n") @@ -167,18 +170,107 @@ def testCommandMissing(self): del self.settings["updateQuantumGraph"] config = BpsConfig(self.settings, search_order=[]) with self.assertRaisesRegex(KeyError, "command.*not found"): - update_quantum_graph(config, str(self.src), self.tmpdir) + pre_transform.update_quantum_graph(config, str(self.src), self.tmpdir) def testFailure(self): """Test if error is caught when the command fails.""" self.settings["updateQuantumGraph"] = "bash -c 'exit 2'" config = BpsConfig(self.settings, search_order=[]) with self.assertRaises(BpsSubprocessError) as cm: - update_quantum_graph(config, str(self.src), self.tmpdir) + pre_transform.update_quantum_graph(config, str(self.src), self.tmpdir) self.assertEqual(cm.exception.errno, errno.ENOENT) self.assertRegex(str(cm.exception), "non-zero exit code") +class TestReadQuantumGraph(unittest.TestCase): + """Test read_quantum_graph method.""" + + def setUp(self): + self.tmpdir = tempfile.mkdtemp(dir=TESTDIR) + self.logger = logging.getLogger("lsst.ctrl.bps") + + def tearDown(self): + shutil.rmtree(self.tmpdir, ignore_errors=True) + + def testBadExtension(self): + with self.assertRaisesRegex(ValueError, "Unrecognized extension for quantum graph file"): + _ = pre_transform.read_quantum_graph("mygraph.badext") + + def testReadQG(self): + filename = f"{self.tmpdir}/testQG.qg" + self.qg = make_test_quantum_graph("test_read", save_filename=filename) + self.assertTrue(os.path.exists(filename)) + + qg = pre_transform.read_quantum_graph(filename) + self.assertEqual(len(qg), 18) + + @unittest.mock.patch.object(QuantumGraph, "loadUri") + @unittest.mock.patch.object(PredictedQuantumGraph, "from_old_quantum_graph") + def testReadOldQG(self, mock_from, mock_load): + # Instead of maintaining old format in ctrl_bps, just make sure bps + # function calls methods to read old format and convert old format. + # This test will fail if from_old_quantum_graph or QuantumGraph are + # removed. Those calls should also be removed from read_quantum_graph. + mock_from.return_value = "quantum_graph" + mock_load.return_value = "quantum_graph" + filename = f"{self.tmpdir}/testQG.qgraph" + _ = pre_transform.read_quantum_graph(filename) + mock_load.assert_called_once() + mock_from.assert_called_once() + + +class TestAcquireQuantumGraph(unittest.TestCase): + """Test acquire_quantum_graph method.""" + + def setUp(self): + self.tmpdir = tempfile.mkdtemp(dir=TESTDIR) + self.logger = logging.getLogger("lsst.ctrl.bps") + + def tearDown(self): + shutil.rmtree(self.tmpdir, ignore_errors=True) + + @unittest.mock.patch("lsst.ctrl.bps.pre_transform.create_quantum_graph") + def testCreateGraph(self, mock_create): + qgraph_filename = f"{self.tmpdir}/created.qg" + mock_create.return_value = qgraph_filename + results = pre_transform.acquire_quantum_graph(BpsConfig({}), out_prefix=self.tmpdir) + self.assertEqual(results, qgraph_filename) + mock_create.assert_called_once() + + @unittest.mock.patch("lsst.ctrl.bps.pre_transform.update_quantum_graph") + def testExistingGraphNoCopy(self, mock_update): + filename = "original.qg" + config = BpsConfig({"qgraphFile": filename, "finalJob": {"dummy_var": "dummy_val"}}) + + results = pre_transform.acquire_quantum_graph(config, out_prefix=None) + self.assertEqual(results, filename) + mock_update.assert_called_once() + + @unittest.mock.patch("lsst.ctrl.bps.pre_transform.update_quantum_graph") + def testExistingGraphNoCopyNoUpdate(self, mock_update): + filename = "original.qg" + config = BpsConfig({"qgraphFile": filename}) + + results = pre_transform.acquire_quantum_graph(config, out_prefix=None) + self.assertEqual(results, filename) + mock_update.assert_not_called() + + @unittest.mock.patch("lsst.ctrl.bps.pre_transform.update_quantum_graph") + def testExistingGraphCopy(self, mock_update): + filename = Path(self.tmpdir) / "original.qg" + with open(filename, "w") as fh: + fh.write("test file") + path = Path(self.tmpdir) / "run_dir" + path.mkdir(parents=True, exist_ok=True) + + config = BpsConfig({"qgraphFile": str(filename), "finalJob": {"dummy_var": "dummy_val"}}) + + results = pre_transform.acquire_quantum_graph(config, out_prefix=path) + self.assertEqual(results, str(path / filename.name)) + self.assertTrue(Path(results).exists()) + mock_update.assert_called_once() + + class TestClusterQuanta(unittest.TestCase): """Test cluster_quanta method. Other tests cover functions cluster_quanta calls so mocking them here. @@ -197,7 +289,7 @@ def testValidate(self, mock_validate): with InMemoryRepo() as repo: qgraph = repo.make_quantum_graph() with self.assertRaisesRegex(RuntimeError, "Fake error"): - _ = cluster_quanta(config, qgraph, "a_name") + _ = pre_transform.cluster_quanta(config, qgraph, "a_name") @unittest.mock.patch.object(ClusteredQuantumGraph, "validate") def testNoValidate(self, mock_validate): @@ -211,7 +303,7 @@ def testNoValidate(self, mock_validate): config = BpsConfig(settings, search_order=[]) with InMemoryRepo() as repo: qgraph = repo.make_quantum_graph() - _ = cluster_quanta(config, qgraph, "a_name") + _ = pre_transform.cluster_quanta(config, qgraph, "a_name") if __name__ == "__main__":