Source code for km3pipe.utils.calibrate

#!/usr/bin/env python
# Filename: calibrate.py
"""
Apply geometry and time calibration from a DETX to an HDF5 file.

Usage:
    calibrate [options] DETXFILE HDF5FILE
    calibrate (-h | --help)
    calibrate --version

Options:
    -c CHUNK_SIZE   Size of the chunk to load into memory [default: 200000].
    --verbose       Shows a progress bar during calibration.
    -h --help       Show this screen.
"""
import os

import km3pipe as kp

import numpy as np
import tables as tb
from tqdm import tqdm

[docs]FILTERS = tb.Filters( complevel=5, shuffle=True, fletcher32=True, complib='zlib'
)
[docs]F4_ATOM = tb.Float32Atom()
[docs]U1_ATOM = tb.UInt8Atom()
[docs]cprint = kp.logger.get_printer(os.path.basename(__file__))
[docs]log = kp.logger.get_logger(os.path.basename(__file__))
[docs]def calibrate_hits(f, cal, chunk_size, h5group, is_verbose): if h5group + "/pmt_id" in f: cprint("Found MC hits in '{}'".format(h5group)) is_mc = True else: cprint("Found DAQ hits in '{}'".format(h5group)) is_mc = False if is_mc: pmt_ids = f.get_node(h5group + "/pmt_id") else: dom_ids = f.get_node(h5group + "/dom_id") channel_ids = f.get_node(h5group + "/channel_id") n_hits = len(f.get_node(h5group + "/time")) idx = 0 if is_verbose: chunks = kp.tools.chunks(range(n_hits), chunk_size) chunks = tqdm(chunks, total=(n_hits // chunk_size)) else: chunks = kp.tools.chunks(range(n_hits), chunk_size) for chunk in chunks: n = len(chunk) calib = np.empty((n, 9), dtype='f4') if is_mc: _pmt_ids = pmt_ids[idx:idx + n] else: _dom_ids = dom_ids[idx:idx + n] _channel_ids = channel_ids[idx:idx + n] idx += n for i in range(n): if is_mc: pmt_id = _pmt_ids[i] calib[i] = cal._calib_by_pmt_id[pmt_id] else: dom_id = _dom_ids[i] channel_id = _channel_ids[i] calib[i] = cal._calib_by_dom_and_channel[dom_id][channel_id] write_calibration(calib, f, h5group) if is_mc: f.get_node(h5group)._v_attrs.datatype = "CMcHitSeries" else: f.get_node(h5group)._v_attrs.datatype = "CRawHitSeries"
[docs]def write_calibration(calib, f, loc): """Write calibration set to file""" for i, node in enumerate( [p + '_' + s for p in ['pos', 'dir'] for s in 'xyz']): h5loc = loc + '/' + node ca = f.get_node(h5loc) ca.append(calib[:, i]) du = f.get_node(loc + '/du') du.append(calib[:, 7].astype('u1')) floor = f.get_node(loc + '/floor') floor.append(calib[:, 8].astype('u1')) t0 = f.get_node(loc + '/t0') t0.append(calib[:, 6]) if loc == "/hits": time = f.get_node(loc + "/time") offset = len(time) chunk_size = len(calib) time[offset - chunk_size:offset] += calib[:, 6]
[docs]def initialise_arrays(group, f): """Create EArrays for calibrated hits""" for node in ['pos_x', 'pos_y', 'pos_z', 'dir_x', 'dir_y', 'dir_z', 'du', 'floor', 't0']: if node in ['floor', 'du']: atom = U1_ATOM else: atom = F4_ATOM f.create_earray(group, node, atom, (0, ), filters=FILTERS)
[docs]def main(): from docopt import docopt args = docopt(__doc__) is_verbose = bool(args['--verbose']) with tb.File(args['HDF5FILE'], "a") as f: try: kp.io.hdf5.check_version(f) except kp.io.hdf5.H5VersionError as e: log.critical(e) raise SystemExit if ("/hits/pos_x" in f or "/mc_hits/pos_y" in f): log.critical("The file seems to be calibrated, please check.") raise SystemExit cprint("Reading calbration information") cal = kp.calib.Calibration(filename=args['DETXFILE']) for h5group in ['/hits', '/mc_hits']: if h5group in f: initialise_arrays(h5group, f) cprint("Calibrating hits in '{}'".format(h5group)) calibrate_hits(f, cal, int(args['-c']), h5group, is_verbose) cprint("Done.")
if __name__ == "__main__": main()