diff --git a/Makefile b/Makefile index 8f0652c2..b499aa85 100644 --- a/Makefile +++ b/Makefile @@ -33,8 +33,8 @@ help: @echo "$$help" -python=python2.7 -pip=pip2.7 +python=python +pip=pip tests=src extras= @@ -76,7 +76,7 @@ clean: clean_develop clean_sdist clean_pypi check_venv: @$(python) -c 'import sys; sys.exit( int( not hasattr(sys, "real_prefix") ) )' \ - || ( echo "$(red)A virtualenv must be active.$(normal)" ; false ) + || ( echo "$(red)A virtualenv must be active.$(normal)" ; true ) check_clean_working_copy: diff --git a/jenkins.sh b/jenkins.sh index 3365b152..61228a33 100755 --- a/jenkins.sh +++ b/jenkins.sh @@ -11,8 +11,8 @@ export PATH=$PATH:${PWD}/bin # Create Toil venv virtualenv venv . venv/bin/activate -pip install toil==3.1.6 -pip install bd2k-python-lib==1.10.dev6 +pip install toil==3.3.0 +pip install bd2k-python-lib==1.14a1.dev29 make develop make test make clean diff --git a/setup.py b/setup.py index 42bfa657..89d812a1 100644 --- a/setup.py +++ b/setup.py @@ -21,9 +21,9 @@ from version import version from pkg_resources import parse_version, require, DistributionNotFound -toil_min_version = '3.1.6' -toil_max_version = '3.2.0' -bpl_min_version = '1.10.dev6' +toil_min_version = '3.3.0' +toil_max_version = '3.5.0' +bpl_min_version = '1.14a1.dev29' # Toil version check -- Raise warning instead of using intall_requires to avoid virtualenv conflicts try: @@ -33,9 +33,9 @@ 'http://toil.readthedocs.io/en/latest/installation.html'.format(toil_min_version)) if not parse_version(str(toil_min_version)) <= parse_version(toil_version) < parse_version(toil_max_version): - raise RuntimeError('Need Toil version within range [{},{}). Read about installing Toil at: ' + raise RuntimeError('Need Toil version within range [{},{}) Currrent installed version: {}. Read about installing Toil at: ' 'http://toil.readthedocs.io/en/latest/installation.html' - ''.format(toil_min_version, toil_max_version)) + ''.format(toil_min_version, toil_max_version, toil_version)) # bd2k-python-lib check -- Raise warning instead of install_requires to avoid version conflicts with Toil try: diff --git a/src/toil_scripts/adam_gatk_pipeline/align_and_call.py b/src/toil_scripts/adam_gatk_pipeline/align_and_call.py index bebe20bc..31a29932 100644 --- a/src/toil_scripts/adam_gatk_pipeline/align_and_call.py +++ b/src/toil_scripts/adam_gatk_pipeline/align_and_call.py @@ -120,6 +120,8 @@ import yaml # import toil features from toil.job import Job +# these don't seem necessary! but, must be imported here due to a serialization issue +from toil.lib.spark import spawn_spark_cluster # import job steps from other toil pipelines from toil_scripts.adam_pipeline.adam_preprocessing import * #static_adam_preprocessing_dag @@ -128,9 +130,6 @@ from toil_scripts.gatk_processing.gatk_preprocessing import * #download_gatk_files from toil_scripts.rnaseq_cgl.rnaseq_cgl_pipeline import generate_file -# these don't seem necessary! but, must be imported here due to a serialization issue -from toil_scripts.spark_utils.spawn_cluster import * - from toil_scripts.lib.programs import mock_mode # import autoscaling tools diff --git a/src/toil_scripts/adam_pipeline/adam_preprocessing.py b/src/toil_scripts/adam_pipeline/adam_preprocessing.py index 1681a40c..97880065 100644 --- a/src/toil_scripts/adam_pipeline/adam_preprocessing.py +++ b/src/toil_scripts/adam_pipeline/adam_preprocessing.py @@ -31,16 +31,19 @@ import logging import multiprocessing import os +import sys import textwrap from subprocess import check_call, check_output import yaml from toil.job import Job +from toil.lib.spark import spawn_spark_cluster + from toil_scripts.adam_uberscript.automated_scaling import SparkMasterAddress from toil_scripts.lib import require +from toil_scripts.lib.files import copy_files, move_files from toil_scripts.lib.programs import docker_call, mock_mode from toil_scripts.rnaseq_cgl.rnaseq_cgl_pipeline import generate_file -from toil_scripts.spark_utils.spawn_cluster import start_spark_hdfs_cluster SPARK_MASTER_PORT = "7077" HDFS_MASTER_PORT = "8020" @@ -109,19 +112,34 @@ def call_adam(master_ip, inputs, arguments): :type masterIP: MasterAddress """ - default_params = ["--master", - ("spark://%s:%s" % (master_ip, SPARK_MASTER_PORT)), + if inputs.run_local: + master = ["--master", "local[*]"] + work_dir = inputs.local_dir + else: + master = ["--master", + ("spark://%s:%s" % (master_ip, SPARK_MASTER_PORT)), + "--conf", ("spark.hadoop.fs.default.name=hdfs://%s:%s" % (master_ip, HDFS_MASTER_PORT)),] + work_dir = '.' + + default_params = master + [ "--conf", ("spark.driver.memory=%sg" % inputs.memory), "--conf", ("spark.executor.memory=%sg" % inputs.memory), - "--conf", ("spark.hadoop.fs.default.name=hdfs://%s:%s" % (master_ip, HDFS_MASTER_PORT)), "--conf", "spark.driver.maxResultSize=0", # set max result size to unlimited, see #177 "--"] - docker_call(rm = False, - tool = "quay.io/ucsc_cgl/adam:962-ehf--6e7085f8cac4b9a927dc9fb06b48007957256b80", - docker_parameters = master_ip.docker_parameters(["--net=host"]), - parameters = default_params + arguments, - mock=False) + + # are we running adam via docker, or do we have a native path? + if inputs.native_adam_path is None: + docker_call(rm=False, + tool="quay.io/ucsc_cgl/adam:962-ehf--6e7085f8cac4b9a927dc9fb06b48007957256b80", + docker_parameters=master_ip.docker_parameters(["--net=host"]), + parameters=(default_params + arguments), + work_dir=work_dir, + mock=False) + else: + check_call(["%s/bin/adam-submit" % inputs.native_adam_path] + + default_params + + arguments) def remove_file(master_ip, filename, spark_on_toil): @@ -264,6 +282,7 @@ def upload_data(master_ip, inputs, hdfs_name, upload_name, spark_on_toil): log.info("Uploading output BAM %s to %s.", hdfs_name, upload_name) call_conductor(master_ip, inputs, hdfs_name, upload_name) + remove_file(master_ip, hdfs_name, spark_on_toil) def download_run_and_upload(job, master_ip, inputs, spark_on_toil): @@ -275,8 +294,17 @@ def download_run_and_upload(job, master_ip, inputs, spark_on_toil): bam_name = inputs.sample.split('://')[-1].split('/')[-1] sample_name = ".".join(os.path.splitext(bam_name)[:-1]) + hdfs_subdir = sample_name + "-dir" - hdfs_dir = "hdfs://{0}:{1}/{2}".format(master_ip, HDFS_MASTER_PORT, hdfs_subdir) + + if inputs.run_local: + inputs.local_dir = job.fileStore.getLocalTempDir() + if inputs.native_adam_path is None: + hdfs_dir = "/data/" + else: + hdfs_dir = inputs.local_dir + else: + hdfs_dir = "hdfs://{0}:{1}/{2}".format(master_ip, HDFS_MASTER_PORT, hdfs_subdir) try: hdfs_prefix = hdfs_dir + "/" + sample_name @@ -284,19 +312,27 @@ def download_run_and_upload(job, master_ip, inputs, spark_on_toil): hdfs_snps = hdfs_dir + "/" + inputs.dbsnp.split('://')[-1].split('/')[-1] - download_data(master_ip, inputs, inputs.dbsnp, inputs.sample, hdfs_snps, hdfs_bam) + if not inputs.run_local: + download_data(master_ip, inputs, inputs.dbsnp, inputs.sample, hdfs_snps, hdfs_bam) + else: + copy_files([inputs.sample, inputs.dbsnp], inputs.local_dir) adam_input = hdfs_prefix + ".adam" adam_snps = hdfs_dir + "/snps.var.adam" adam_convert(master_ip, inputs, hdfs_bam, hdfs_snps, adam_input, adam_snps, spark_on_toil) - adam_output = hdfs_prefix + ".processed.adam" + adam_output = hdfs_prefix + ".processed.bam" adam_transform(master_ip, inputs, adam_input, adam_snps, hdfs_dir, adam_output, spark_on_toil) out_file = inputs.output_dir + "/" + sample_name + inputs.suffix + ".bam" - upload_data(master_ip, inputs, adam_output, out_file, spark_on_toil) + if not inputs.run_local: + upload_data(master_ip, inputs, adam_output, out_file, spark_on_toil) + else: + local_adam_output = "%s/%s.processed.bam" % (inputs.local_dir, sample_name) + move_files([local_adam_output], inputs.output_dir) + remove_file(master_ip, hdfs_subdir, spark_on_toil) except: remove_file(master_ip, hdfs_subdir, spark_on_toil) raise @@ -310,8 +346,8 @@ def static_adam_preprocessing_dag(job, inputs, sample, output_dir, suffix=''): inputs.output_dir = output_dir inputs.suffix = suffix - if inputs.master_ip: - if inputs.master_ip == 'auto': + if inputs.master_ip is not None or inputs.run_local: + if not inputs.run_local and inputs.master_ip == 'auto': # Static, standalone Spark cluster managed by uberscript spark_on_toil = False scale_up = job.wrapJobFn(scale_external_spark_cluster, 1) @@ -331,15 +367,14 @@ def static_adam_preprocessing_dag(job, inputs, sample, output_dir, suffix=''): # Dynamic subclusters, i.e. Spark-on-Toil spark_on_toil = True cores = multiprocessing.cpu_count() - start_cluster = job.wrapJobFn(start_spark_hdfs_cluster, - inputs.num_nodes-1, - inputs.memory, - download_run_and_upload, - jArgs=(inputs, spark_on_toil), - jCores=cores, - jMemory="%s G" % - inputs.memory).encapsulate() - job.addChild(start_cluster) + master_ip = spawn_spark_cluster(job, + False, # Sudo + inputs.num_nodes-1, + cores=cores, + memory=inputs.memory) + spark_work = job.wrapJobFn(download_run_and_upload, + master_ip, inputs, spark_on_toil) + job.addChild(spark_work) def scale_external_spark_cluster(num_samples=1): @@ -368,6 +403,9 @@ def generate_config(): dbsnp: # Required: The full s3 url of a VCF file of known snps memory: # Required: Amount of memory to allocate for Spark Driver and executor. # This should be equal to the available memory on each worker node. + run-local: # Optional: If true, runs ADAM locally and doesn't connect to a cluster. + local-dir: # Required if run-local is true. Sets the local directory to use for input. + native-adam-path: # Optional: If set, runs ADAM using the local build of ADAM at this path. """[1:]) @@ -382,8 +420,8 @@ def main(): parser_run.add_argument('--config', default='adam_preprocessing.config', type=str, help='Path to the (filled in) config file, generated with "generate-config". ' '\nDefault value: "%(default)s"') - parser_run.add_argument('--sample', help='The full s3 url of the input SAM or BAM file') - parser_run.add_argument('--output-dir', default=None, + parser_run.add_argument('--sample', help='The full s3 url/path to the input SAM or BAM file') + parser_run.add_argument('--output-dir', required=True, default=None, help='full path where final results will be output') parser_run.add_argument('-s', '--suffix', default='', help='Additional suffix to add to the names of the output files') @@ -412,8 +450,8 @@ def main(): for arg in [inputs.dbsnp, inputs.memory]: require(arg, 'Required argument {} missing from config'.format(arg)) - Job.Runner.startToil(Job.wrapJobFn(static_adam_preprocessing_dag, inputs, - args.sample, args.output_dir), args) + Job.Runner.startToil(Job.wrapJobFn(static_adam_preprocessing_dag, inputs, + args.sample, args.output_dir), args) if __name__ == "__main__": main() diff --git a/src/toil_scripts/lib/files.py b/src/toil_scripts/lib/files.py index 1e9d2954..3b30f1a7 100644 --- a/src/toil_scripts/lib/files.py +++ b/src/toil_scripts/lib/files.py @@ -1,7 +1,7 @@ from contextlib import closing import os -import tarfile import shutil +import tarfile def tarball_files(tar_name, file_paths, output_dir='.', prefix=''): @@ -21,18 +21,38 @@ def tarball_files(tar_name, file_paths, output_dir='.', prefix=''): f_out.add(file_path, arcname=arcname) -def move_files(file_paths, output_dir): +def __forall_files(file_paths, output_dir, op): """ - Moves files from the working directory to the output directory. + Applies a function to a set of files and an output directory. :param str output_dir: Output directory :param list[str] file_paths: Absolute file paths to move """ for file_path in file_paths: if not file_path.startswith('/'): - raise ValueError('Path provided is relative not absolute.') + raise ValueError('Path provided (%s) is relative not absolute.' % file_path) dest = os.path.join(output_dir, os.path.basename(file_path)) - shutil.move(file_path, dest) + op(file_path, dest) + + +def move_files(file_paths, output_dir): + """ + Moves files from the working directory to the output directory. + + :param str output_dir: Output directory + :param list[str] file_paths: Absolute file paths to move + """ + __forall_files(file_paths, output_dir, shutil.move) + + +def copy_files(file_paths, output_dir): + """ + Moves files from the working directory to the output directory. + + :param str output_dir: Output directory + :param list[str] file_paths: Absolute file paths to move + """ + __forall_files(file_paths, output_dir, shutil.copy) def consolidate_tarballs_job(job, fname_to_id): diff --git a/src/toil_scripts/lib/test/test_jobs.py b/src/toil_scripts/lib/test/test_jobs.py index c0c66fc8..c06653a6 100644 --- a/src/toil_scripts/lib/test/test_jobs.py +++ b/src/toil_scripts/lib/test/test_jobs.py @@ -1,12 +1,15 @@ import os from toil.job import Job +from toil_scripts.lib import get_work_directory -def test_sample_batcher(tmpdir): +def test_map_job(tmpdir): from toil_scripts.lib.jobs import map_job - options = Job.Runner.getDefaultOptions(os.path.join(str(tmpdir), 'test_store')) + work_dir = get_work_directory() + options = Job.Runner.getDefaultOptions(os.path.join(work_dir, 'test_store')) + options.workDir = work_dir samples = [x for x in xrange(200)] - j = Job.wrapJobFn(map_job, _test_batch, samples, 'a', 'b', 'c') + j = Job.wrapJobFn(map_job, _test_batch, samples, 'a', 'b', 'c', disk='1K') Job.Runner.startToil(j, options) diff --git a/src/toil_scripts/spark_utils/__init__.py b/src/toil_scripts/quinine_pipelines/__init__.py similarity index 100% rename from src/toil_scripts/spark_utils/__init__.py rename to src/toil_scripts/quinine_pipelines/__init__.py diff --git a/src/toil_scripts/quinine_pipelines/adam_helpers.py b/src/toil_scripts/quinine_pipelines/adam_helpers.py new file mode 100644 index 00000000..5602d290 --- /dev/null +++ b/src/toil_scripts/quinine_pipelines/adam_helpers.py @@ -0,0 +1,57 @@ +from subprocess import check_call + +from toil_scripts.adam_pipeline.adam_preprocessing import call_adam + +def __call_adam_native(cmd, memory, native_path): + ''' + Calls ADAM running in Spark local mode, where ADAM is not in a docker container. + + :param list cmd: ADAM command line arguments + :param int memory: Amount of memory in GB to allocate. + :param str native_path: String path to the ADAM install directory. + ''' + + check_call(['%s/bin/adam-submit' % native_path, + '--master', 'local[*]', + '--conf', 'spark.driver.memory=%dg' % memory, + '--'] + cmd) + + +def bam_to_adam_native(bam, parquet, memory, native_path): + ''' + Converts a BAM file into an ADAM AlignmentRecord Parquet file. + + :param str bam: Path to input SAM/BAM file. + :param str parquet: Path to save Parquet file at. + :param int memory: Amount of memory in GB to allocate. + :param str native_path: String path to the ADAM install directory. + ''' + + __call_adam_native(['transform', bam, parquet], memory, native_path) + + +def feature_to_adam_native(feature, parquet, memory, native_path): + ''' + Converts a feature file (e.g., BED, GTF, GFF) into an ADAM Feature Parquet + file. + + :param str feature: Path to input BED/GTF/GFF/NarrowPeak file. + :param str parquet: Path to save Parquet file at. + :param int memory: Amount of memory in GB to allocate. + :param str native_path: String path to the ADAM install directory. + ''' + + __call_adam_native(['features2adam', feature, parquet], memory, native_path) + + +def vcf_to_adam_native(vcf, parquet, memory, native_path): + ''' + Converts a VCF file into an ADAM Genotype Parquet file. + + :param str bam: Path to input VCF file. + :param str parquet: Path to save Parquet file at. + :param int memory: Amount of memory in GB to allocate. + :param str native_path: String path to the ADAM install directory. + ''' + + __call_adam_native(['vcf2adam', vcf, parquet], memory, native_path) diff --git a/src/toil_scripts/quinine_pipelines/metrics.py b/src/toil_scripts/quinine_pipelines/metrics.py new file mode 100644 index 00000000..e1cd1729 --- /dev/null +++ b/src/toil_scripts/quinine_pipelines/metrics.py @@ -0,0 +1,252 @@ +#!/usr/bin/env python2.7 + +import argparse +import os + +from toil.job import Job + +from toil_scripts.tools.qc import ( call_quinine_af_native, + call_quinine_contamination_native, + call_quinine_hs_native, + call_quinine_rna_native ) +from toil_scripts.quinine_pipelines.adam_helpers import ( bam_to_adam_native, + feature_to_adam_native, + vcf_to_adam_native ) + +def run_rna_qc(job, + reads, transcriptome, output_path, + memory, + adam_native_path, quinine_native_path): + ''' + Runs a job that computes various RNA-seq quality control statistics. + + :param toil.job.Job job: The toil job running this function. + :param str reads: Path to the input reads in SAM/BAM format. + :param str transcriptome: Path to the transcriptome definition (GTF/GFF). + :param str output_path: Path to write the statistics at. + :param int memory: GB of memory to allocate. + :param str adam_native_path: The path where ADAM is installed. + :param str quinine_native_path: The path where Quinine is installed. + ''' + + # get a temp work directory + local_dir = job.fileStore.getLocalTempDir() + + # convert the reads to ADAM format + adam_reads = os.path.join(local_dir, 'reads.adam') + bam_to_adam_native(reads, adam_reads, memory, adam_native_path) + + # convert the features to ADAM format + adam_features = os.path.join(local_dir, 'transcriptome.adam') + feature_to_adam_native(transcriptome, adam_features, memory, adam_native_path) + + # run the qc job + call_quinine_rna_native(adam_reads, adam_features, output_path, + local_dir, + memory, quinine_native_path) + + +def run_targeted_qc(job, + reads, bait, targets, output_path, + memory, + adam_native_path, quinine_native_path): + ''' + Runs a job that computes various quality control statistics for reads + sequenced using a hybrid-selection panel that requires targeting. + + :param toil.job.Job job: The toil job running this function. + :param str reads: Path to the input reads in SAM/BAM format. + :param str bait: Path to the description of the baited regions. + :param str targets: Path to the description of the regions targeted. + :param str output_path: Path to write the statistics at. + :param int memory: GB of memory to allocate. + :param str adam_native_path: The path where ADAM is installed. + :param str quinine_native_path: The path where Quinine is installed. + ''' + + # get a temp work directory + local_dir = job.fileStore.getLocalTempDir() + + # convert the reads to ADAM format + adam_reads = os.path.join(local_dir, 'reads.adam') + bam_to_adam_native(reads, adam_reads, memory, adam_native_path) + + # convert the bait features to ADAM format + adam_bait = os.path.join(local_dir, 'bait.adam') + feature_to_adam_native(bait, adam_bait, memory, adam_native_path) + + # convert the target features to ADAM format + adam_targets = os.path.join(local_dir, 'targets.adam') + feature_to_adam_native(targets, adam_targets, memory, adam_native_path) + + # run the metrics + call_quinine_hs_native(adam_reads, adam_targets, adam_bait, output_path, + local_dir, + memory, quinine_native_path) + + +def run_contamination_estimation(job, + reads, population, sample_vcf, output_path, + memory, + adam_native_path, quinine_native_path): + ''' + Runs a job that computes various quality control statistics for reads + sequenced using a hybrid-selection panel that requires targeting. + + :param toil.job.Job job: The toil job running this function. + :param str reads: Path to the input reads in SAM/BAM format. + :param str bait: Path to the description of the baited regions. + :param str targets: Path to the description of the regions targeted. + :param str output_path: Path to write the statistics at. + :param int memory: GB of memory to allocate. + :param str adam_native_path: The path where ADAM is installed. + :param str quinine_native_path: The path where Quinine is installed. + ''' + + # get a temp work directory + local_dir = job.fileStore.getLocalTempDir() + + # convert the reads to ADAM format + adam_reads = os.path.join(local_dir, 'reads.adam') + bam_to_adam_native(reads, adam_reads, memory, adam_native_path) + + # convert the sample vcf to ADAM format + adam_calls = os.path.join(local_dir, 'sample.adam') + vcf_to_adam_native(sample_vcf, adam_calls, memory, adam_native_path) + + # compute MAF's + maf_annotations = os.path.join(local_dir, 'mafs.adam') + call_quinine_af_native(population, maf_annotations, + local_dir, + memory, + quinine_native_path) + + # estimate contamination + call_quinine_contamination_native(adam_reads, + adam_calls, + maf_annotations, + output_path, + local_dir, + memory, + quinine_native_path) + + +def __add_common_args(parser): + ''' + Adds commonly used arguments to a subparser. + + :param argparse.Subparser parser: A subparser to add arguments to. + ''' + + parser.add_argument('--output', + help='Location to write outputs to.', + required=True) + parser.add_argument('--memory', + help='The amount of memory to allocate, in GB. Defaults to 1.', + type=int, + default=1) + parser.add_argument('--adam_native_path', + help='The native path where ADAM is installed.' + 'Defaults to /opt/cgl-docker-lib/adam', + default='/opt/cgl-docker-lib/adam', + type=str) + parser.add_argument('--quinine_native_path', + help='The native path where Quinine is installed.' + 'Defaults to /opt/cgl-docker-lib/quinine', + default='/opt/cgl-docker-lib/quinine', + type=str) + Job.Runner.addToilOptions(parser) + + +def main(): + ''' + Parses arguments and starts the job. + ''' + + # build the argument parser + parser = argparse.ArgumentParser() + + # we run three different commands: hs, cont, rna + subparsers = parser.add_subparsers(dest='command') + parser_rna = subparsers.add_parser('rna', help='Runs the RNA QC metrics.') + parser_hs = subparsers.add_parser('targeted', + help='Runs the QC metrics for a targeted sequencing protocol.') + parser_cont = subparsers.add_parser('contamination', + help='Runs the contamination estimator.') + + # add arguments to the rna panel + parser_rna.add_argument('--reads', + help='The RNA-seq reads.', + type=str, + required=True) + parser_rna.add_argument('--transcriptome', + help='The transcriptome description (e.g., a GENCODE GTF)', + type=str, + required=True) + __add_common_args(parser_rna) + + # add arguments to the hs panel + parser_hs.add_argument('--reads', + help='The aligned reads.', + type=str, + required=True) + parser_hs.add_argument('--bait', + help='The bait used for capturing this panel.', + type=str, + required=True) + parser_hs.add_argument('--targets', + help='The regions covered by this panel.', + type=str, + required=True) + __add_common_args(parser_hs) + + # add arguments for contaimination estimation + parser_cont.add_argument('--reads', + help='The aligned reads.', + type=str, + required=True) + parser_cont.add_argument('--population', + help='The VCF to derive allele frequencies from.', + type=str, + required=True) + parser_cont.add_argument('--sample-vcf', + help='A VCF containing known genotypes for the sample.', + type=str, + required=True) + __add_common_args(parser_cont) + + # parse the arguments + args = parser.parse_args() + + # check which command got called, and set up and run + if args.command == 'rna': + Job.Runner.startToil(Job.wrapJobFn(run_rna_qc, + args.reads, + args.transcriptome, + args.output, + args.memory, + args.adam_native_path, + args.quinine_native_path), args) + elif args.command == 'targeted': + Job.Runner.startToil(Job.wrapJobFn(run_targeted_qc, + args.reads, + args.bait, + args.targets, + args.output, + args.memory, + args.adam_native_path, + args.quinine_native_path), args) + elif args.command == 'contamination': + Job.Runner.startToil(Job.wrapJobFn(run_contamination_estimation, + args.reads, + args.population, + args.sample_vcf, + args.output, + args.memory, + args.adam_native_path, + args.quinine_native_path), args) + else: + raise ValueError('Unknown command: %s' % args.command) + +if __name__ == '__main__': + main() diff --git a/src/toil_scripts/rnaseq_cgl/rnaseq_cgl_pipeline.py b/src/toil_scripts/rnaseq_cgl/rnaseq_cgl_pipeline.py index 856ba997..ae4a0b67 100644 --- a/src/toil_scripts/rnaseq_cgl/rnaseq_cgl_pipeline.py +++ b/src/toil_scripts/rnaseq_cgl/rnaseq_cgl_pipeline.py @@ -20,7 +20,7 @@ from toil_scripts.lib.files import move_files from toil_scripts.lib.jobs import map_job from toil_scripts.lib.urls import download_url_job, s3am_upload -from toil_scripts.tools.QC import run_fastqc +from toil_scripts.tools.qc import run_fastqc from toil_scripts.tools.aligners import run_star from toil_scripts.tools.preprocessing import cutadapt from toil_scripts.tools.quantifiers import run_kallisto, run_rsem, run_rsem_postprocess @@ -40,24 +40,25 @@ def download_sample(job, sample, config): config.file_type, config.paired, config.uuid, config.url = sample config.paired = True if config.paired == 'paired' else False config.cores = min(config.maxCores, multiprocessing.cpu_count()) + disk = '2G' if config.ci_test else '20G' job.fileStore.logToMaster('UUID: {}\nURL: {}\nPaired: {}\nFile Type: {}\nCores: {}\nCIMode: {}'.format( config.uuid, config.url, config.paired, config.file_type, config.cores, config.ci_test)) # Download or locate local file and place in the jobStore tar_id, r1_id, r2_id = None, None, None if config.file_type == 'tar': tar_id = job.addChildJobFn(download_url_job, config.url, cghub_key_path=config.gtkey, - s3_key_path=config.ssec, disk='20G').rv() + s3_key_path=config.ssec, disk=disk).rv() else: if config.paired: require(len(config.url.split(',')) == 2, 'Fastq pairs must have 2 URLS separated by comma') r1_url, r2_url = config.url.split(',') r1_id = job.addChildJobFn(download_url_job, r1_url, cghub_key_path=config.gtkey, - s3_key_path=config.ssec, disk='20G').rv() + s3_key_path=config.ssec, disk=disk).rv() r2_id = job.addChildJobFn(download_url_job, r2_url, cghub_key_path=config.gtkey, - s3_key_path=config.ssec, disk='20G').rv() + s3_key_path=config.ssec, disk=disk).rv() else: r1_id = job.addChildJobFn(download_url_job, config.url, cghub_key_path=config.gtkey, - s3_key_path=config.ssec, disk='20G').rv() + s3_key_path=config.ssec, disk=disk).rv() job.addFollowOnJobFn(preprocessing_declaration, config, tar_id, r1_id, r2_id) @@ -176,7 +177,6 @@ def process_sample_tar(job, config, tar_id): tar_path = os.path.join(work_dir, 'sample.tar') # Untar File and concat subprocess.check_call(['tar', '-xvf', tar_path, '-C', work_dir], stderr=PIPE, stdout=PIPE) - os.remove(os.path.join(work_dir, 'sample.tar')) fastqs = [] for root, subdir, files in os.walk(work_dir): fastqs.extend([os.path.join(root, x) for x in files]) diff --git a/src/toil_scripts/spark_utils/spawn_cluster.py b/src/toil_scripts/spark_utils/spawn_cluster.py deleted file mode 100644 index 8c0a6692..00000000 --- a/src/toil_scripts/spark_utils/spawn_cluster.py +++ /dev/null @@ -1,264 +0,0 @@ -""" -Spawns a Spark cluster backed by HDFS. - -@author Audrey Musselman-Brown, almussel@ucsc.edu -@author Frank Austin Nothaft, fnothaft@berkeley.edu -""" - -import logging -import multiprocessing -import os -from subprocess import call, check_call, check_output -import time - -from toil.job import Job -from toil_scripts.lib.programs import docker_call - -_log = logging.getLogger(__name__) - -def start_spark_hdfs_cluster(job, - numWorkers, - executorMemory, - jFn, - jArgs = [], - jCores = None, - jMemory = None, - jDisk = None): - - # build job requirement dictionary - jReqs = {} - if jCores: - jReqs['cores'] = jCores - if jMemory: - jReqs['memory'] = jMemory - if jDisk: - jReqs['disk'] = jDisk - - masterIP = start_spark_hdfs_master(job, executorMemory) - job.addChildJobFn(start_spark_hdfs_workers, - masterIP, - numWorkers, - executorMemory, - jFn, - jArgs, - jReqs) - - -def start_spark_hdfs_master(job, executorMemory): - """ - Starts the master service. - """ - - _log.info("Starting Spark master and HDFS namenode.") - - masterIP = job.addService(MasterService("%s G" % executorMemory)) - - _log.info("Spark Master and HDFS Namenode started.") - - return masterIP - -def start_spark_hdfs_workers(job, masterIP, numWorkers, executorMemory, jFn, jArgs, jReqs): - """ - Starts the worker services. - """ - _log.info("Starting %d Spark workers and HDFS Datanodes.", numWorkers) - - for i in range(numWorkers): - job.addService(WorkerService(masterIP, "%s G" % executorMemory)) - - job.addChildJobFn(jFn, masterIP, *jArgs, **jReqs) - -class MasterService(Job.Service): - - def __init__(self, memory): - - self.memory = memory - self.cores = multiprocessing.cpu_count() - Job.Service.__init__(self, memory = self.memory, cores = self.cores) - - def start(self, fileStore): - """ - Start spark and hdfs master containers - - fileStore: Unused - """ - - self.IP = check_output(["hostname", "-f",])[:-1] - - _log.info("Started Spark master container.") - self.sparkContainerID = docker_call(tool = "quay.io/ucsc_cgl/apache-spark-master:1.5.2", - docker_parameters = ["--net=host", - "-d", - "-v", "/mnt/ephemeral/:/ephemeral/:rw", - "-e", "SPARK_MASTER_IP="+self.IP, - "-e", "SPARK_LOCAL_DIRS=/ephemeral/spark/local", - "-e", "SPARK_WORKER_DIR=/ephemeral/spark/work"], - rm=False, - check_output = True, - mock = False)[:-1] - _log.info("Started HDFS Datanode.") - self.hdfsContainerID = docker_call(tool = "quay.io/ucsc_cgl/apache-hadoop-master:2.6.2", - docker_parameters = ["--net=host", - "-d"], - parameters = [self.IP], - rm=False, - check_output = True, - mock = False)[:-1] - return self.IP - - - def stop(self, fileStore): - """ - Stop and remove spark and hdfs master containers - - fileStore: Unused - """ - - call(["docker", "exec", self.sparkContainerID, "rm", "-r", "/ephemeral/spark"]) - call(["docker", "stop", self.sparkContainerID]) - call(["docker", "rm", self.sparkContainerID]) - _log.info("Stopped Spark master.") - - call(["docker", "stop", self.hdfsContainerID]) - call(["docker", "rm", self.hdfsContainerID]) - _log.info("Stopped HDFS namenode.") - - return - - - def check(self): - """ - Checks to see if Spark master and HDFS namenode are still running. - """ - - containers = check_output(["docker", "ps", "-q"]) - - return ((self.sparkContainerID in containers) and - (self.hdfsContainerID in containers)) - - -SPARK_MASTER_PORT = "7077" - -class WorkerService(Job.Service): - - def __init__(self, masterIP, memory): - self.masterIP = masterIP - self.memory = memory - self.cores = multiprocessing.cpu_count() - Job.Service.__init__(self, memory = self.memory, cores = self.cores) - - def start(self, fileStore): - """ - Start spark and hdfs worker containers - - fileStore: Unused - """ - # start spark and our datanode - self.sparkContainerID = docker_call(tool = "quay.io/ucsc_cgl/apache-spark-worker:1.5.2", - docker_parameters = ["--net=host", - "-d", - "-v", "/mnt/ephemeral/:/ephemeral/:rw", - "-e", "\"SPARK_MASTER_IP="+self.masterIP+":"+SPARK_MASTER_PORT+"\"", - "-e", "SPARK_LOCAL_DIRS=/ephemeral/spark/local", - "-e", "SPARK_WORKER_DIR=/ephemeral/spark/work"], - parameters = [self.masterIP+":"+SPARK_MASTER_PORT], - rm=False, - check_output = True, - mock = False)[:-1] - self.__start_datanode() - - # fake do/while to check if HDFS is up - hdfs_down = True - retries = 0 - while hdfs_down and (retries < 5): - - _log.info("Sleeping 30 seconds before checking HDFS startup.") - time.sleep(30) - clusterID = "" - try: - clusterID = check_output(["docker", - "exec", - self.hdfsContainerID, - "grep", - "clusterID", - "-R", - "/opt/apache-hadoop/logs"]) - except: - # grep returns a non-zero exit code if the pattern is not found - # we expect to not find the pattern, so a non-zero code is OK - pass - - if "Incompatible" in clusterID: - _log.warning("Hadoop Datanode failed to start with: %s", clusterID) - _log.warning("Retrying container startup, retry #%d.", retries) - retries += 1 - - _log.warning("Removing ephemeral hdfs directory.") - check_call(["docker", - "exec", - self.hdfsContainerID, - "rm", - "-rf", - "/ephemeral/hdfs"]) - - _log.warning("Killing container %s.", self.hdfsContainerID) - check_call(["docker", - "kill", - self.hdfsContainerID]) - - # todo: this is copied code. clean up! - _log.info("Restarting datanode.") - self.__start_datanode() - - else: - _log.info("HDFS datanode started up OK!") - hdfs_down = False - - if retries >= 5: - raise RuntimeError("Failed %d times trying to start HDFS datanode." % retries) - - return - - def __start_datanode(self): - """ - Launches the Hadoop datanode. - """ - self.hdfsContainerID = docker_call(tool = "quay.io/ucsc_cgl/apache-hadoop-worker:2.6.2", - docker_parameters = ["--net=host", - "-d", - "-v", "/mnt/ephemeral/:/ephemeral/:rw"], - parameters = [self.masterIP], - rm=False, - check_output = True, - mock = False)[:-1] - - - def stop(self, fileStore): - """ - Stop spark and hdfs worker containers - - fileStore: Unused - """ - - call(["docker", "exec", self.sparkContainerID, "rm", "-r", "/ephemeral/spark"]) - call(["docker", "stop", self.sparkContainerID]) - call(["docker", "rm", self.sparkContainerID]) - _log.info("Stopped Spark worker.") - - call(["docker", "exec", self.hdfsContainerID, "rm", "-r", "/ephemeral/hdfs"]) - call(["docker", "stop", self.hdfsContainerID]) - call(["docker", "rm", self.hdfsContainerID]) - _log.info("Stopped HDFS datanode.") - - return - - - def check(self): - """ - Checks to see if Spark master and HDFS namenode are still running. - """ - - containers = check_output(["docker", "ps", "-q"]) - - return ((self.sparkContainerID in containers) and - (self.hdfsContainerID in containers)) diff --git a/src/toil_scripts/tools/QC.py b/src/toil_scripts/tools/QC.py deleted file mode 100644 index c4a05413..00000000 --- a/src/toil_scripts/tools/QC.py +++ /dev/null @@ -1,29 +0,0 @@ -import os - -from toil_scripts.lib.files import tarball_files -from toil_scripts.lib.programs import docker_call - - -def run_fastqc(job, r1_id, r2_id): - """ - Run Fastqc on the input reads - - :param JobFunctionWrappingJob job: passed automatically by Toil - :param str r1_id: FileStoreID of fastq read 1 - :param str r2_id: FileStoreID of fastq read 2 - :return: FileStoreID of fastQC output (tarball) - :rtype: str - """ - work_dir = job.fileStore.getLocalTempDir() - job.fileStore.readGlobalFile(r1_id, os.path.join(work_dir, 'R1.fastq')) - parameters = ['/data/R1.fastq'] - output_names = ['R1_fastqc.html'] - if r2_id: - job.fileStore.readGlobalFile(r2_id, os.path.join(work_dir, 'R2.fastq')) - parameters.extend(['-t', '2', '/data/R2.fastq']) - output_names.append('R2_fastqc.html') - docker_call(tool='quay.io/ucsc_cgl/fastqc:0.11.5--be13567d00cd4c586edf8ae47d991815c8c72a49', - work_dir=work_dir, parameters=parameters) - output_files = [os.path.join(work_dir, x) for x in output_names] - tarball_files(tar_name='fastqc.tar.gz', file_paths=output_files, output_dir=work_dir) - return job.fileStore.writeGlobalFile(os.path.join(work_dir, 'fastqc.tar.gz')) diff --git a/src/toil_scripts/tools/qc.py b/src/toil_scripts/tools/qc.py new file mode 100644 index 00000000..6bc18c30 --- /dev/null +++ b/src/toil_scripts/tools/qc.py @@ -0,0 +1,196 @@ +import os +from subprocess import check_call + +from toil_scripts.lib import require +from toil_scripts.lib.files import tarball_files +from toil_scripts.lib.programs import docker_call + + +def run_fastqc(job, r1_id, r2_id): + """ + Run Fastqc on the input reads + + :param JobFunctionWrappingJob job: passed automatically by Toil + :param str r1_id: FileStoreID of fastq read 1 + :param str r2_id: FileStoreID of fastq read 2 + :return: FileStoreID of fastQC output (tarball) + :rtype: str + """ + work_dir = job.fileStore.getLocalTempDir() + job.fileStore.readGlobalFile(r1_id, os.path.join(work_dir, 'R1.fastq')) + parameters = ['/data/R1.fastq'] + output_names = ['R1_fastqc.html'] + if r2_id: + job.fileStore.readGlobalFile(r2_id, os.path.join(work_dir, 'R2.fastq')) + parameters.extend(['-t', '2', '/data/R2.fastq']) + output_names.append('R2_fastqc.html') + docker_call(tool='quay.io/ucsc_cgl/fastqc:0.11.5--be13567d00cd4c586edf8ae47d991815c8c72a49', + work_dir=work_dir, parameters=parameters) + output_files = [os.path.join(work_dir, x) for x in output_names] + tarball_files(tar_name='fastqc.tar.gz', file_paths=output_files, output_dir=work_dir) + return job.fileStore.writeGlobalFile(os.path.join(work_dir, 'fastqc.tar.gz')) + + +def __call_quinine(arguments, memory, + run_local=False, local_dir=None, + native_path=None, + leader_ip=None): + ''' + Run Quinine (https://github.com/bigdatagenomics/quinine). + + :param list arguments: Arguments to pass to Quinine. + :param int memory: Amount of memory in GiB to allocate per node. + :param bool run_local: If true, runs quinine in local mode. Default is false. + :param None or string local_dir: If provided, path to a local working + directory. Must be provided if run_local is true. + :param None or string native_path: If set, the path to quinine. If unset, + runs quinine via Docker. Default is None (unset). + :param None or SparkMasterAddress leader_ip: If provided, IP of the Spark + leader node. Must be provided if run_local is false. + ''' + + # TODO: factor this out into a common lib + SPARK_MASTER_PORT=7077 + HDFS_MASTER_PORT=8020 + + # validate input arguments + require((run_local and leader_ip is None) or + (not run_local and leader_ip is not None), + "Either run_local ({0}) must be set or leader_ip ({1}) must not be None.".format(run_local, + leader_ip)) + require(not run_local or local_dir is not None, + "If run_local is set, local_dir must be set.") + + # are we running locally, or not? set up master configuration + if run_local: + master = ["--master", "local[*]"] + work_dir = local_dir + else: + master = ["--master", + ("spark://%s:%s" % (master_ip, SPARK_MASTER_PORT)), + "--conf", ("spark.hadoop.fs.default.name=hdfs://%s:%s" % (master_ip, HDFS_MASTER_PORT))] + work_dir = '.' + + # set default parameters + default_params = master + [ + "--conf", ("spark.driver.memory=%dg" % memory), + "--conf", ("spark.executor.memory=%dg" % memory), + "--conf", "spark.driver.maxResultSize=0", + # set max result size to unlimited, see #177 + "--"] + + # are we running via docker or natively? + if native_path is None: + docker_call(rm=False, + tool="quay.io/ucsc_cgl/quinine", + docker_parameters=master_ip.docker_parameters(['--net=host']), + parameters=(default_params + arguments), + work_dir=work_dir, + mock=False) + else: + check_call(["%s/bin/quinine-submit" % native_path] + + default_params + + arguments) + + +def call_quinine_rna_native(reads, + transcriptome, + output, + work_dir, + memory, + native_path): + ''' + Runs quinine to compute RNA-seq QC stats. + + :param str reads: Local path to input reads. + :param str transcriptome: Local path to transcriptome definition. + :param str output: Path to write stats to. + :param str work_dir: Local path to temp working directory. + :param int memory: Amount of memory in GiB to allocate per node. + :param str native_path: Local path to quinine. + ''' + + __call_quinine(['rnaMetrics', + reads, transcriptome, + '-statPath', output], + memory, + run_local=True, local_dir=work_dir, + native_path=native_path) + + +def call_quinine_hs_native(reads, + panel, + bait, + output, + work_dir, + memory, + native_path): + ''' + Runs quinine to compute stats for a hybrid-selection targeted sequencing panel. + + :param str reads: Local path to input reads. + :param str panel: Local path to definition of regions targeted by panel. + :param str bait: Local path to definition of regions tiled by bait. + :param str output: Path to write stats to. + :param str work_dir: Local path to temp working directory. + :param int memory: Amount of memory in GiB to allocate per node. + :param str native_path: Local path to quinine. + ''' + + __call_quinine(['panelMetrics', + reads, panel, bait, + '-statPath', output + ], + memory, + run_local=True, local_dir=work_dir, + native_path=native_path) + + +def call_quinine_contamination_native(reads, + genotypes, + annotations, + output, + work_dir, + memory, + native_path): + ''' + Runs quinine to estimate inter-sample contamination. + + :param str reads: Local path to input reads. + :param str genotypes: Local path to genotypes. + :param str annotations: Local path to annotations. + :param str output: Path to write stats to. + :param str work_dir: Local path to temp working directory. + :param int memory: Amount of memory in GiB to allocate per node. + :param str native_path: Local path to quinine. + ''' + + __call_quinine(['estimateContamination', + reads, genotypes, annotations, + '-statPath', output], + memory, + run_local=True, local_dir=work_dir, + native_path=native_path) + + +def call_quinine_af_native(vcf, + annotations, + work_dir, + memory, + native_path): + ''' + Runs quinine to estimate inter-sample contamination. + + :param str reads: Local path to input reads. + :param str vcf: Local path to input VCF. + :param str annotations: Local path to annotations. + :param str work_dir: Local path to temp working directory. + :param int memory: Amount of memory in GiB to allocate per node. + :param str native_path: Local path to quinine. + ''' + + __call_quinine(['loadAlleleFrequency', + vcf, annotations], + memory, + run_local=True, local_dir=work_dir, + native_path=native_path) diff --git a/src/toil_scripts/tools/quantifiers.py b/src/toil_scripts/tools/quantifiers.py index 4691dd2a..b1c5e295 100644 --- a/src/toil_scripts/tools/quantifiers.py +++ b/src/toil_scripts/tools/quantifiers.py @@ -107,8 +107,8 @@ def run_rsem_postprocess(job, uuid, rsem_gene_id, rsem_isoform_id): """ work_dir = job.fileStore.getLocalTempDir() # I/O - job.fileStore.readGlobalFile(rsem_gene_id, os.path.join(work_dir, 'rsem_gene.tab')) - job.fileStore.readGlobalFile(rsem_isoform_id, os.path.join(work_dir, 'rsem_isoform.tab')) + job.fileStore.readGlobalFile(rsem_gene_id, os.path.join(work_dir, 'rsem_gene.tab'), mutable=True) + job.fileStore.readGlobalFile(rsem_isoform_id, os.path.join(work_dir, 'rsem_isoform.tab'), mutable=True) # Convert RSEM files into individual .tab files. docker_call(tool='jvivian/rsem_postprocess', parameters=[uuid], work_dir=work_dir) os.rename(os.path.join(work_dir, 'rsem_gene.tab'), os.path.join(work_dir, 'rsem_genes.results'))