Source code for km3pipe.hardware

# Filename: hardware.py
# pylint: disable=locally-disabled
"""
Classes representing KM3NeT hardware.

"""
from __future__ import absolute_import, print_function, division

from collections import OrderedDict, defaultdict
from io import StringIO

import numpy as np

from .dataclasses import Table
from .tools import unpack_nfirst, split
from .math import intersect_3d, qrot_yaw    # , ignored
from .db import DBManager

from .logger import get_logger, get_printer

[docs]log = get_logger(__name__) # pylint: disable=C0103
__author__ = "Tamas Gal" __copyright__ = "Copyright 2016, Tamas Gal and the KM3NeT collaboration." __credits__ = [] __license__ = "MIT" __maintainer__ = "Tamas Gal" __email__ = "tgal@km3net.de" __status__ = "Development"
[docs]class Detector(object): """A KM3NeT detector. Parameters ---------- filename: str, optional Name of .detx file with detector definition. det_id: int, optional .detx ID of detector (when retrieving from database). t0set: optional t0set (when retrieving from database). calibration: optional calibration (when retrieving from database). """ def __init__( self, filename=None, det_id=None, t0set=None, calibration=None ): self._det_file = None self.det_id = None self.n_doms = None self.dus = [] self.n_pmts_per_dom = None self.doms = OrderedDict() self.pmts = [] self.version = None self.valid_from = None self.valid_until = None self.utm_info = None self._comments = [] self._dom_ids = [] self._pmt_index_by_omkey = OrderedDict() self._pmt_index_by_pmt_id = OrderedDict() self._current_du = None self._com = None self._dom_positions = None self._dom_table = None self._pmt_angles = None self._xy_positions = None self.reset_caches() self.print = get_printer(self.__class__.__name__) if filename: self._init_from_file(filename) if det_id is not None: self.print( "Retrieving DETX with detector ID {0} " "from the database...".format(det_id) ) db = DBManager() detx = db.detx(det_id, t0set=t0set, calibration=calibration) self._det_file = StringIO(detx) self._parse_header() self._parse_doms() if self.n_doms < 1: log.error("No data found for detector ID %s." % det_id) def _init_from_file(self, filename): """Create detector from detx file.""" if not filename.endswith("detx"): raise NotImplementedError('Only the detx format is supported.') self._open_file(filename) self._extract_comments() self._parse_header() self._parse_doms() self._det_file.close() def _open_file(self, filename): """Create the file handler""" self._det_file = open(filename, 'r') def _readline(self, ignore_comments=True): """The next line of the DETX file, optionally ignores comments""" while True: line = self._det_file.readline() if line == '': return line # To conform the EOF behaviour of .readline() line = line.strip() if line == '': continue # white-space-only line if line.startswith('#'): if not ignore_comments: return line else: return line def _extract_comments(self): """Retrieve all comments from the file""" self._det_file.seek(0, 0) for line in self._det_file.readlines(): line = line.strip() if line.startswith('#'): self.add_comment(line[1:]) def _parse_header(self): """Extract information from the header of the detector file""" self.print("Parsing the DETX header") self._det_file.seek(0, 0) first_line = self._readline() try: self.det_id, self.n_doms = split(first_line, int) self.version = 'v1' except ValueError: det_id, self.version = first_line.split() self.det_id = int(det_id) validity = self._readline().strip() self.valid_from, self.valid_until = split(validity, float) raw_utm_info = self._readline().strip().split(' ') try: self.utm_info = UTMInfo(*raw_utm_info[1:]) except TypeError: log.warning("Missing UTM information.") n_doms = self._readline() self.n_doms = int(n_doms) # pylint: disable=C0103 def _parse_doms(self): """Extract dom information from detector file""" self.print("Reading PMT information...") self._det_file.seek(0, 0) self._readline() pmts = defaultdict(list) pmt_index = 0 while True: line = self._readline() if line == '': self.print("Done.") break try: dom_id, du, floor, n_pmts = split(line, int) except ValueError: continue if du != self._current_du: log.debug("Next DU, resetting floor to 1.") self._current_du = du self.dus.append(du) self._current_floor = 1 if du == 1 and floor == -1: log.warning( "Floor ID is -1 (Jpp conversion bug), " "using our own floor ID!" ) else: self._current_floor += 1 if floor == -1: log.debug("Setting floor ID to our own ID") floor = self._current_floor self.doms[dom_id] = (du, floor, n_pmts) if self.n_pmts_per_dom is None: self.n_pmts_per_dom = n_pmts if self.n_pmts_per_dom != n_pmts: log.warning( "DOMs with different number of PMTs are " "detected, this can cause some unexpected " "behaviour." ) for i in range(n_pmts): raw_pmt_info = self._readline() pmt_info = raw_pmt_info.split() pmt_id, x, y, z, rest = unpack_nfirst(pmt_info, 4) dx, dy, dz, t0, rest = unpack_nfirst(rest, 4) pmt_id = int(pmt_id) omkey = (du, floor, i) pmts['pmt_id'].append(int(pmt_id)) pmts['pos_x'].append(float(x)) pmts['pos_y'].append(float(y)) pmts['pos_z'].append(float(z)) pmts['dir_x'].append(float(dx)) pmts['dir_y'].append(float(dy)) pmts['dir_z'].append(float(dz)) pmts['t0'].append(float(t0)) pmts['du'].append(int(du)) pmts['floor'].append(int(floor)) pmts['channel_id'].append(int(i)) pmts['dom_id'].append(int(dom_id)) if self.version == 'v3' and rest: status, rest = unpack_nfirst(rest, 1) pmts['status'].append(int(status)) if rest: log.warning("Unexpected PMT values: {0}".format(rest)) self._pmt_index_by_omkey[omkey] = pmt_index self._pmt_index_by_pmt_id[pmt_id] = pmt_index pmt_index += 1 self.pmts = Table(pmts, name='PMT')
[docs] def reset_caches(self): log.debug('Resetting caches.') self._dom_positions = OrderedDict() self._dom_table = None self._xy_positions = [] self._pmt_angles = [] self._com = None
[docs] def add_comment(self, comment): """Add a comment which will be prefixed with a '#'""" self._comments.append(comment)
@property
[docs] def comments(self): return self._comments
@property
[docs] def dom_ids(self): if not self._dom_ids: self._dom_ids = list(self.doms.keys()) return self._dom_ids
@property
[docs] def dom_positions(self): """The positions of the DOMs, calculated from PMT directions.""" if not self._dom_positions: for dom_id in self.dom_ids: mask = self.pmts.dom_id == dom_id pmt_pos = self.pmts[mask].pos pmt_dir = self.pmts[mask].dir centre = intersect_3d(pmt_pos, pmt_pos - pmt_dir * 10) self._dom_positions[dom_id] = centre return self._dom_positions
@property
[docs] def dom_table(self): """A `Table` containing DOM attributes""" if self._dom_table is None: data = defaultdict(list) for dom_id, (du, floor, _) in self.doms.items(): data['dom_id'].append(dom_id) data['du'].append(du) data['floor'].append(floor) dom_position = self.dom_positions[dom_id] data['pos_x'].append(dom_position[0]) data['pos_y'].append(dom_position[1]) data['pos_z'].append(dom_position[2]) self._dom_table = Table(data, name='DOMs', h5loc='/dom_table') return self._dom_table
@property
[docs] def com(self): """Center of mass, calculated from the mean of the PMT positions""" if self._com is None: self._com = np.mean(self.pmts.pos, axis=0) return self._com
@property
[docs] def xy_positions(self): """XY positions of the DUs, given by the DOMs on floor 1.""" if self._xy_positions is None or len(self._xy_positions) == 0: xy_pos = [] for dom_id, pos in self.dom_positions.items(): if self.domid2floor(dom_id) == 1: xy_pos.append(np.array([pos[0], pos[1]])) self._xy_positions = np.array(xy_pos) return self._xy_positions
[docs] def translate_detector(self, vector): """Translate the detector by a given vector""" vector = np.array(vector, dtype=float) self.pmts.pos_x += vector[0] self.pmts.pos_y += vector[1] self.pmts.pos_z += vector[2] self.reset_caches()
[docs] def rotate_dom_by_yaw(self, dom_id, heading, centre_point=None): """Rotate a DOM by a given (yaw) heading.""" pmts = self.pmts[self.pmts.dom_id == dom_id] if centre_point is None: centre_point = self.dom_positions[dom_id] for pmt in pmts: pmt_pos = np.array([pmt.pos_x, pmt.pos_y, pmt.pos_z]) pmt_dir = np.array([pmt.dir_x, pmt.dir_y, pmt.dir_z]) pmt_radius = np.linalg.norm(centre_point - pmt_pos) index = self._pmt_index_by_pmt_id[pmt.pmt_id] pmt_ref = self.pmts[index] dir_rot = qrot_yaw([pmt.dir_x, pmt.dir_y, pmt.dir_z], heading) pos_rot = pmt_pos - pmt_dir * pmt_radius + dir_rot * pmt_radius pmt_ref.dir_x = dir_rot[0] pmt_ref.dir_y = dir_rot[1] pmt_ref.dir_z = dir_rot[2] pmt_ref.pos_x = pos_rot[0] pmt_ref.pos_y = pos_rot[1] pmt_ref.pos_z = pos_rot[2] self.reset_caches()
[docs] def rotate_du_by_yaw(self, du, heading): """Rotate all DOMs on DU by a given (yaw) heading.""" mask = (self.pmts.du == du) dom_ids = np.unique(self.pmts.dom_id[mask]) for dom_id in dom_ids: self.rotate_dom_by_yaw(dom_id, heading) self.reset_caches()
[docs] def rescale(self, factor, origin=(0, 0, 0)): """Stretch or shrink detector (DOM positions) by a given factor.""" pmts = self.pmts for dom_id in self.dom_ids: mask = pmts.dom_id == dom_id pos_x = pmts[mask].pos_x pos_y = pmts[mask].pos_y pos_z = pmts[mask].pos_z pmts.pos_x[mask] = (pos_x - origin[0]) * factor pmts.pos_y[mask] = (pos_y - origin[1]) * factor pmts.pos_z[mask] = (pos_z - origin[2]) * factor self.reset_caches()
@property
[docs] def pmt_angles(self): """A list of PMT directions sorted by PMT channel, on DU-1, floor-1""" if self._pmt_angles == []: mask = (self.pmts.du == 1) & (self.pmts.floor == 1) self._pmt_angles = self.pmts.dir[mask] return self._pmt_angles
@property
[docs] def ascii(self): """The ascii representation of the detector""" comments = '' if self.version == 'v3': for comment in self.comments: if not comment.startswith(' '): comment = ' ' + comment comments += "#" + comment + "\n" if self.version == 'v1': header = "{det.det_id} {det.n_doms}".format(det=self) else: header = "{det.det_id} {det.version}".format(det=self) header += "\n{0} {1}".format(self.valid_from, self.valid_until) header += "\n" + str(self.utm_info) + "\n" header += str(self.n_doms) doms = "" for dom_id, (line, floor, n_pmts) in self.doms.items(): doms += "{0} {1} {2} {3}\n".format(dom_id, line, floor, n_pmts) for channel_id in range(n_pmts): pmt_idx = self._pmt_index_by_omkey[(line, floor, channel_id)] pmt = self.pmts[pmt_idx] doms += " {0} {1} {2} {3} {4} {5} {6} {7}".format( pmt.pmt_id, pmt.pos_x, pmt.pos_y, pmt.pos_z, pmt.dir_x, pmt.dir_y, pmt.dir_z, pmt.t0 ) if self.version == 'v3': doms += " {0}".format(pmt.status) doms += "\n" return comments + header + "\n" + doms
[docs] def write(self, filename): """Save detx file.""" with open(filename, 'w') as f: f.write(self.ascii) self.print("Detector file saved as '{0}'".format(filename))
[docs] def pmt_with_id(self, pmt_id): """Get PMT with global pmt_id""" try: return self.pmts[self._pmt_index_by_pmt_id[pmt_id]] except KeyError: raise KeyError("No PMT found for ID: {0}".format(pmt_id))
[docs] def get_pmt(self, dom_id, channel_id): """Return PMT with DOM ID and DAQ channel ID""" du, floor, _ = self.doms[dom_id] pmt = self.pmts[self._pmt_index_by_omkey[(du, floor, channel_id)]] return pmt
[docs] def pmtid2omkey(self, pmt_id): return self._pmts_by_id[int(pmt_id)].omkey
[docs] def domid2floor(self, dom_id): _, floor, _ = self.doms[dom_id] return floor
@property
[docs] def n_dus(self): return len(self.dus)
def __str__(self): return "Detector id: '{0}', n_doms: {1}, dus: {2}".format( self.det_id, self.n_doms, self.dus ) def __repr__(self): return self.__str__()
[docs]class UTMInfo(object): """The UTM information""" def __init__(self, ellipsoid, grid, easting, northing, z): self.ellipsoid = ellipsoid self.grid = grid self.easting = float(easting) self.northing = float(northing) self.z = float(z) def __str__(self): return "UTM {} {} {} {} {}" \ .format(self.ellipsoid, self.grid, self.easting, self.northing, self.z) def __repr__(self): return "UTMInfo: {}".format(self)
[docs]class PMT(object): """Represents a photomultiplier. Parameters ---------- id: int pos: 3-float-tuple (x, y, z) dir: 3-float-tuple (x, y, z) t0: int channel_id: int omkey: int """ def __init__(self, id, pos, dir, t0, channel_id, omkey): self.id = id self.pos = pos self.dir = dir self.t0 = t0 self.channel_id = channel_id self.omkey = omkey def __str__(self): return "PMT id:{0}, pos: {1}, dir: dir{2}, t0: {3}, DAQ channel: {4}"\ .format(self.id, self.pos, self.dir, self.t0, self.channel_id)
# PMT DAQ channel IDs ordered from top to bottom
[docs]ORDERED_PMT_IDS = [ 28, 23, 22, 21, 27, 29, 20, 30, 26, 25, 19, 24, 13, 7, 1, 14, 18, 12, 6, 2, 11, 8, 0, 15, 4, 3, 5, 17, 10, 9, 16
]