Skip to content

Commit 1e795c8

Browse files
committed
Add ability to run submit processes as batch jobs.
Note: Limited to same cluster with shared filesystems.
1 parent 9816d69 commit 1e795c8

17 files changed

Lines changed: 1008 additions & 52 deletions

doc/changes/DM-53494.feature.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Added ability to run submit processes as batch jobs on same cluster with shared filesystems.

doc/lsst.ctrl.bps/quickstart.rst

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1613,6 +1613,49 @@ Parsl).
16131613
responsibility to remove them once no longer needed. The removal
16141614
should be done regularly to avoid too many in single directory.
16151615

1616+
.. _bps_submit_as_batch:
1617+
1618+
Submit Stages as Batch Jobs
1619+
---------------------------
1620+
1621+
.. warning::
1622+
1623+
This feature is part of the ongoing work for submission to remote sites.
1624+
So details may change. Also currently only the HTCondor plugin supports
1625+
this.
1626+
1627+
In cases where one cannot run ``bps submit`` interactively (e.g., needs
1628+
too much memory), BPS can run the submit processes as batch jobs after
1629+
which the payload workflow will start running.
1630+
1631+
The interactive submission process, ``bps batch-submit <submit yaml>``,
1632+
is much shorter. It will create a workflow with two jobs:
1633+
1634+
- ``buildQuantumGraph`` which creates the quantum graph.
1635+
- ``preparePayloadWorkflow`` which does the rest of the submission stages seen
1636+
when running ``bps submit``. These include clustering, creation of the payload
1637+
workflow, and preparing the WMS-specific workflow.
1638+
1639+
One can set runtime values specific to those jobs (e.g., ``requestMemory``) in
1640+
sections with corresponding names similar to ``finalJob``. Currently the
1641+
logging-related command-line arguments aren't passed from ``bps batch-submit``
1642+
to these jobs. Instead, one can set ``bpsPreCommandOpts``, which has the
1643+
same default as the payload job.
1644+
1645+
Even with the new jobs, there is only one output run collection, one submit
1646+
directory and one top level WMS ID to be used with BPS commands.
1647+
1648+
``bps report`` will show these 2 new jobs same as the payload jobs. They
1649+
will be the only lines to appear in the report until the ``preparePayloadWorkflow``
1650+
has finished at which time the expected payload lines should appear (from
1651+
``pipetaskInit`` through ``finalJob``).
1652+
1653+
``bps cancel`` can be used to abort the run during these new jobs or later
1654+
during the running of the payload jobs.
1655+
1656+
See the corresponding section in the WMS-plugin documentation for additional
1657+
information.
1658+
16161659
.. _bps-troubleshooting:
16171660

16181661
Troubleshooting
Lines changed: 234 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,234 @@
1+
# This file is part of ctrl_bps.
2+
#
3+
# Developed for the LSST Data Management System.
4+
# This product includes software developed by the LSST Project
5+
# (https://www.lsst.org).
6+
# See the COPYRIGHT file at the top-level directory of this distribution
7+
# for details of code ownership.
8+
#
9+
# This software is dual licensed under the GNU General Public License and also
10+
# under a 3-clause BSD license. Recipients may choose which of these licenses
11+
# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12+
# respectively. If you choose the GPL option then the following text applies
13+
# (but note that there is still no warranty even if you opt for BSD instead):
14+
#
15+
# This program is free software: you can redistribute it and/or modify
16+
# it under the terms of the GNU General Public License as published by
17+
# the Free Software Foundation, either version 3 of the License, or
18+
# (at your option) any later version.
19+
#
20+
# This program is distributed in the hope that it will be useful,
21+
# but WITHOUT ANY WARRANTY; without even the implied warranty of
22+
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23+
# GNU General Public License for more details.
24+
#
25+
# You should have received a copy of the GNU General Public License
26+
# along with this program. If not, see <https://www.gnu.org/licenses/>.
27+
28+
"""Driver to run submit stages as batch jobs."""
29+
30+
__all__ = ["batch_payload_prepare", "create_batch_stages"]
31+
32+
import logging
33+
import os
34+
35+
from lsst.resources import ResourcePath, ResourcePathExpression
36+
from lsst.utils.logging import VERBOSE
37+
from lsst.utils.timer import time_this, timeMethod
38+
39+
from . import (
40+
DEFAULT_MEM_FMT,
41+
DEFAULT_MEM_UNIT,
42+
BpsConfig,
43+
GenericWorkflow,
44+
GenericWorkflowJob,
45+
GenericWorkflowLazyGroup,
46+
)
47+
from .pre_transform import cluster_quanta, read_quantum_graph
48+
from .prepare import prepare
49+
from .transform import _get_job_values, transform
50+
51+
_LOG = logging.getLogger(__name__)
52+
53+
54+
@timeMethod(logger=_LOG, logLevel=VERBOSE)
55+
def create_batch_stages(
56+
config: BpsConfig, prefix: ResourcePathExpression
57+
) -> tuple[GenericWorkflow, BpsConfig]:
58+
"""Create a GenericWorkflow that performs the submit stages as a workflow.
59+
60+
Parameters
61+
----------
62+
config : `lsst.ctrl.bps.BpsConfig`
63+
BPS configuration.
64+
prefix : `lsst.resources.ResourcePathExpression`
65+
Root path for any output files.
66+
67+
Returns
68+
-------
69+
generic_workflow : `lsst.ctrl.bps.GenericWorkflow`
70+
The generic workflow transformed from the clustered quantum graph.
71+
generic_workflow_config : `lsst.ctrl.bps.BpsConfig`
72+
Configuration to accompany GenericWorkflow.
73+
"""
74+
prefix = ResourcePath(prefix)
75+
generic_workflow: GenericWorkflow = GenericWorkflow(name=f"{config['uniqProcName']}_ctrl")
76+
cmd_line_key = "jobCommand"
77+
78+
# build QuantumGraph job
79+
search_opt = {}
80+
if "buildQuantumGraph" in config:
81+
search_opt["searchobj"] = config.get("buildQuantumGraph")
82+
build_job = GenericWorkflowJob(
83+
name="buildQuantumGraph",
84+
label="buildQuantumGraph",
85+
)
86+
job_values = _get_job_values(config, search_opt, cmd_line_key)
87+
if not job_values["executable"]:
88+
raise RuntimeError(
89+
f"Missing executable for buildQuantumGraph. Double check submit yaml for {cmd_line_key}"
90+
)
91+
for key, value in job_values.items():
92+
if key not in {"name", "label"}:
93+
setattr(build_job, key, value)
94+
95+
generic_workflow.add_job(build_job)
96+
generic_workflow.run_attrs.update(
97+
{
98+
"bps_isjob": "True",
99+
"bps_project": config["project"],
100+
"bps_campaign": config["campaign"],
101+
"bps_run": config["uniqProcName"],
102+
"bps_operator": config["operator"],
103+
"bps_payload": config["payloadName"],
104+
"bps_runsite": config["computeSite"],
105+
}
106+
)
107+
108+
# cluster/transform/prepare job
109+
search_opt = {}
110+
if "preparePayloadWorkflow" in config:
111+
search_opt["searchobj"] = config.get("preparePayloadWorkflow")
112+
prepare_job = GenericWorkflowLazyGroup(
113+
name="preparePayloadWorkflow",
114+
label="preparePayloadWorkflow",
115+
)
116+
job_values = _get_job_values(config, search_opt, cmd_line_key)
117+
if not job_values["executable"]:
118+
raise RuntimeError(
119+
f"Missing executable for preparePayloadWorkflow. Double check submit yaml for {cmd_line_key}"
120+
)
121+
for key, value in job_values.items():
122+
if key not in {"name", "label"}:
123+
setattr(prepare_job, key, value)
124+
125+
generic_workflow.add_job(prepare_job, parent_names=["buildQuantumGraph"])
126+
127+
_, save_workflow = config.search("saveGenericWorkflow", opt={"default": False})
128+
if save_workflow:
129+
with prefix.join("bps_stages_generic_workflow.pickle").open("wb") as outfh:
130+
generic_workflow.save(outfh, "pickle")
131+
132+
return generic_workflow, config
133+
134+
135+
@timeMethod(logger=_LOG, logLevel=VERBOSE)
136+
def batch_payload_prepare(config: BpsConfig, prefix: ResourcePathExpression) -> None:
137+
"""Create a GenericWorkflow that performs the submit stages as a workflow.
138+
139+
Parameters
140+
----------
141+
config : `lsst.ctrl.bps.BpsConfig`
142+
BPS configuration.
143+
144+
prefix : `lsst.resources.ResourcePathExpression`
145+
Root path for any output files.
146+
147+
Returns
148+
-------
149+
generic_workflow : `lsst.ctrl.bps.GenericWorkflow`
150+
The generic workflow transformed from the clustered quantum graph.
151+
generic_workflow_config : `lsst.ctrl.bps.BpsConfig`
152+
Configuration to accompany GenericWorkflow.
153+
"""
154+
prefix = ResourcePath(prefix)
155+
# Read existing QuantumGraph
156+
qgraph_filename = prefix.join(config["qgraphFileTemplate"])
157+
qgraph = read_quantum_graph(qgraph_filename)
158+
config[".bps_defined.runQgraphFile"] = str(qgraph_filename)
159+
160+
# Cluster
161+
_LOG.info("Starting cluster stage (grouping quanta into jobs)")
162+
with time_this(
163+
log=_LOG,
164+
level=logging.INFO,
165+
prefix=None,
166+
msg="Cluster stage completed",
167+
mem_usage=True,
168+
mem_unit=DEFAULT_MEM_UNIT,
169+
mem_fmt=DEFAULT_MEM_FMT,
170+
):
171+
clustered_qgraph = cluster_quanta(config, qgraph, config["uniqProcName"])
172+
173+
_LOG.info("ClusteredQuantumGraph contains %d cluster(s)", len(clustered_qgraph))
174+
175+
submit_path = config[".bps_defined.submitPath"]
176+
_, save_clustered_qgraph = config.search("saveClusteredQgraph", opt={"default": False})
177+
if save_clustered_qgraph:
178+
clustered_qgraph.save(os.path.join(submit_path, "bps_clustered_qgraph.pickle"))
179+
_, save_dot = config.search("saveDot", opt={"default": False})
180+
if save_dot:
181+
clustered_qgraph.draw(os.path.join(submit_path, "bps_clustered_qgraph.dot"))
182+
183+
# Transform
184+
_LOG.info("Starting transform stage (creating generic workflow)")
185+
with time_this(
186+
log=_LOG,
187+
level=logging.INFO,
188+
prefix=None,
189+
msg="Transform stage completed",
190+
mem_usage=True,
191+
mem_unit=DEFAULT_MEM_UNIT,
192+
mem_fmt=DEFAULT_MEM_FMT,
193+
):
194+
generic_workflow, generic_workflow_config = transform(config, clustered_qgraph, submit_path)
195+
_LOG.info("Generic workflow name '%s'", generic_workflow.name)
196+
197+
num_jobs = sum(generic_workflow.job_counts.values())
198+
_LOG.info("GenericWorkflow contains %d job(s) (including final)", num_jobs)
199+
200+
_, save_workflow = config.search("saveGenericWorkflow", opt={"default": False})
201+
if save_workflow:
202+
with open(os.path.join(submit_path, "bps_generic_workflow.pickle"), "wb") as outfh:
203+
generic_workflow.save(outfh, "pickle")
204+
_, save_dot = config.search("saveDot", opt={"default": False})
205+
if save_dot:
206+
with open(os.path.join(submit_path, "bps_generic_workflow.dot"), "w") as outfh:
207+
generic_workflow.draw(outfh, "dot")
208+
209+
# Prepare
210+
_LOG.info("Starting prepare stage (creating specific implementation of workflow)")
211+
with time_this(
212+
log=_LOG,
213+
level=logging.INFO,
214+
prefix=None,
215+
msg="Prepare stage completed",
216+
mem_usage=True,
217+
mem_unit=DEFAULT_MEM_UNIT,
218+
mem_fmt=DEFAULT_MEM_FMT,
219+
):
220+
wms_workflow = prepare(generic_workflow_config, generic_workflow, submit_path)
221+
222+
# Add payload workflow to currently running workflow
223+
_LOG.info("Starting update workflow")
224+
with time_this(
225+
log=_LOG,
226+
level=logging.INFO,
227+
prefix=None,
228+
msg="Workflow update completed",
229+
mem_usage=True,
230+
mem_unit=DEFAULT_MEM_UNIT,
231+
mem_fmt=DEFAULT_MEM_FMT,
232+
):
233+
# Assuming submit_path for ctrl workflow is visible by this job.
234+
wms_workflow.add_to_parent_workflow(generic_workflow_config)

python/lsst/ctrl/bps/cli/cmd/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@
2727

2828
__all__ = [
2929
"acquire",
30+
"batch_acquire",
31+
"batch_prepare",
32+
"batch_submit",
3033
"cluster",
3134
"transform",
3235
"prepare",
@@ -41,6 +44,9 @@
4144

4245
from .commands import (
4346
acquire,
47+
batch_acquire,
48+
batch_prepare,
49+
batch_submit,
4450
cancel,
4551
cluster,
4652
ping,

python/lsst/ctrl/bps/cli/cmd/commands.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@
3636
from ... import BpsSubprocessError
3737
from ...drivers import (
3838
acquire_qgraph_driver,
39+
batch_acquire_driver,
40+
batch_prepare_driver,
41+
batch_submit_driver,
3942
cancel_driver,
4043
cluster_qgraph_driver,
4144
ping_driver,
@@ -219,3 +222,30 @@ def ping(*args, **kwargs):
219222
def submitcmd(*args, **kwargs):
220223
"""Submit a command for execution."""
221224
submitcmd_driver(*args, **kwargs)
225+
226+
227+
@click.command(cls=BpsCommand)
228+
@opt.config_file_argument(required=True)
229+
@opt.submission_options()
230+
def batch_acquire(*args, **kwargs):
231+
"""Run inside a batch job to create a new quantum graph."""
232+
with catch_errors():
233+
batch_acquire_driver(*args, **kwargs)
234+
235+
236+
@click.command(cls=BpsCommand)
237+
@opt.config_file_argument(required=True)
238+
@opt.submission_options()
239+
def batch_prepare(*args, **kwargs):
240+
"""Run payload workflow preparation inside a batch job."""
241+
with catch_errors():
242+
batch_prepare_driver(*args, **kwargs)
243+
244+
245+
@click.command(cls=BpsCommand)
246+
@opt.config_file_argument(required=True)
247+
@opt.submission_options()
248+
def batch_submit(*args, **kwargs):
249+
"""Submit a workflow with preparation inside a batch jobs too."""
250+
with catch_errors():
251+
batch_submit_driver(*args, **kwargs)

0 commit comments

Comments
 (0)