Source code for km3pipe.utils.qrunprocessor

#!/usr/bin/env python
# Filename: qrunprocessor.py
# Author: Tamas Gal <tgal@km3net.de>
"""
=========================================
Create a summary for a given list of runs
=========================================

Use this batch runner to process a given list of run numbers with a
script, which takes a `-o` to create a summary file, which has the name
of the processed file + a given suffix.
The actual call will look like this, with a bit of copy safeness:

    /abs/path/to/SCRIPT run.root -o /abs/path/to/OUTPUT_PATH+SUFFIX

Before constructing the job scripts, the ``OUTPUT_PATH`` will be traversed
to find files which have already been converted to avoid multiple conversions.

.. code-block:: console

    Usage:
        qrunprocessor [options] DET_ID RUN_LIST OUTPUT_PATH SCRIPT
        qrunprocessor (-h | --help)

    Options:
        DET_ID         Detector ID (e.g. 29).
        RUN_LIST       Path to the file containing the space separated run IDs.
        OUTPUT_PATH    Path to store the individual summary files.
        SCRIPT         The script to fire up.
        -s SUFFIX      The suffix, appended by SCRIPT [default: .summary.h5].
        -n N_FILES     Number of files to process per job [default: 10].
        -e ET          Estimated walltime per file in minutes [default: 15].
        -f FSIZE       Estimated filesystem size for a job [default: 12G].
        -m VMEM        Estimated vmem for a job [default: 8G].
        -j JOBNAME     The name of the submitted jobs [default: qrunprocessor].
        -l LOG_PATH    Path of the job log files [default: qlogs].
        -q             Dryrun: don't submit jobs, just print the job script.
        -h --help      Show this screen.

"""
__author__ = "Tamas Gal"
__email__ = "tgal@km3net.de"
__version__ = "1.0"


[docs]def main(): from docopt import docopt args = docopt(__doc__, version=__version__) from glob import glob import os from os.path import basename, join, abspath import pathlib import time import km3pipe as kp from km3pipe.tools import chunks, iexists try: from tqdm import tqdm except ImportError: tqdm = lambda x: x cprint = kp.logger.get_printer('qrunprocessor') log = kp.logger.get_logger('qrunprocessor') CWD = os.getcwd() RUN_LIST = join(CWD, args['RUN_LIST']) OUTPUT_PATH = join(CWD, args['OUTPUT_PATH']) SCRIPT = abspath(args['SCRIPT']) SUFFIX = args['-s'] DET_ID = int(args['DET_ID']) ET_PER_FILE = int(args['-e']) * 60 # [s] FILES_PER_JOB = int(args['-n']) FSIZE = args['-f'] VMEM = args['-m'] LOG_PATH = args['-l'] JOB_NAME = args['-j'] DRYRUN = args['-q'] pathlib.Path(OUTPUT_PATH).mkdir(parents=True, exist_ok=True) with open(RUN_LIST, 'r') as fobj: run_numbers = [int(run) for run in fobj.read().split()] irods_files = [] for run in run_numbers: irods_path = kp.tools.irods_filepath(DET_ID, run) irods_files.append(irods_path) processed_files = [ basename(f) for f in glob(join(OUTPUT_PATH, '*{}'.format(SUFFIX))) ] rem_files = [] for irods_file in irods_files: if basename(irods_file) + SUFFIX not in processed_files: rem_files.append(irods_file) cprint("Checking if files are accessible on iRODS") missing_files_on_irods = 0 for rem_file in tqdm(rem_files): if not iexists(rem_file): log.error( "Skipping file, since not found on iRODS: {}".format(rem_file) ) rem_files.remove(rem_file) missing_files_on_irods += 1 cprint( "{} runs in total, {} already processed ({} missing on iRODS).".format( len(irods_files), len(processed_files), missing_files_on_irods ) ) cprint("Proceeding with the remaining {} files.".format(len(rem_files))) s = kp.shell.Script() for job_id, file_chunk in enumerate(chunks(rem_files, FILES_PER_JOB)): n_files = len(file_chunk) s.add("echo Creating run summary for {} files".format(n_files)) s.add("cd $TMPDIR; mkdir -p $USER; cd $USER") s.add("echo") for ipath in file_chunk: fname = basename(ipath) s.separator(' ') s.separator('=') s.echo("Processing {}:".format(fname)) s.add('pwd') s.iget(ipath) s.add('ls -al {}'.format(fname)) s.add('JPrintTree -f {}'.format(fname)) out_fname = fname + SUFFIX out_fpath = join(OUTPUT_PATH, out_fname) tmp_fname = out_fname + '.copying' tmp_fpath = join(OUTPUT_PATH, tmp_fname) s.add("{} {} -o {}".format(SCRIPT, fname, out_fname)) s.cp(out_fname, tmp_fpath) s.mv(tmp_fpath, out_fpath) s.echo("File '{}' processed.".format(fname)) s.separator('-') walltime = time.strftime( '%H:%M:%S', time.gmtime(ET_PER_FILE * n_files) ) kp.shell.qsub( s, '{}_{}'.format(JOB_NAME, job_id), walltime=walltime, fsize=FSIZE, vmem=VMEM, log_path=LOG_PATH, irods=True, dryrun=DRYRUN ) if DRYRUN: break s.clear()
if __name__ == "__main__": main()