Source code for km3modules.common

# Filename: common.py
# -*- coding: utf-8 -*-
# pylint: disable=locally-disabled
"""
A collection of commonly used modules.

"""
from __future__ import absolute_import, print_function, division

from time import time

import numpy as np

import km3pipe as kp
from km3pipe import Module, Blob
from km3pipe.tools import prettyln
from km3pipe.sys import peak_memory_usage
from km3pipe.math import zenith, azimuth
from km3pipe.dataclasses import Table

[docs]log = kp.logger.get_logger(__name__)
[docs]class Dump(Module): """Print the content of the blob. Parameters ---------- keys: collection(string), optional [default=None] Keys to print. If None, print all keys. full: bool, default=False Print blob values too, not just the keys? """
[docs] def configure(self): self.keys = self.get('keys') or None self.full = self.get('full') or False key = self.get('key') or None if key and not self.keys: self.keys = [key]
[docs] def process(self, blob): keys = sorted(blob.keys()) if self.keys is None else self.keys for key in keys: print(key + ':') if self.full: print(blob[key].__repr__()) print('') print('----------------------------------------\n') return blob
[docs]class Delete(Module): """Remove specific keys from the blob. Parameters ---------- keys: collection(string), optional Keys to remove. """
[docs] def configure(self): self.keys = self.get('keys') or set() key = self.get('key') or None if key and not self.keys: self.keys = [key]
[docs] def process(self, blob): for key in self.keys: blob.pop(key, None) return blob
[docs]class Keep(Module): """Keep only specified keys in the blob. Parameters ---------- keys: collection(string), optional Keys to keep. Everything else is removed. """
[docs] def configure(self): self.keys = self.get('keys', default=set()) key = self.get('key', default=None) self.h5locs = self.get('h5locs', default=set()) if key and not self.keys: self.keys = [key]
[docs] def process(self, blob): out = Blob() for key in blob.keys(): if key in self.keys: out[key] = blob[key] elif hasattr(blob[key], 'h5loc') and blob[key].h5loc.startswith( tuple(self.h5locs)): out[key] = blob[key] return out
[docs]class HitCounter(Module): """Prints the number of hits"""
[docs] def process(self, blob): try: self.print("Number of hits: {0}".format(len(blob['Hit']))) except KeyError: pass return blob
[docs]class HitCalibrator(Module): """A very basic hit calibrator, which requires a `Calibration` module."""
[docs] def configure(self): self.input_key = self.get('input_key', default='Hits') self.output_key = self.get('output_key', default='CalibHits')
[docs] def process(self, blob): if self.input_key not in blob: self.log.warn("No hits found in key '{}'.".format(self.input_key)) return blob hits = blob[self.input_key] chits = self.calibration.apply(hits) blob[self.output_key] = chits return blob
[docs]class BlobIndexer(Module): """Puts an incremented index in each blob for the key 'blob_index'"""
[docs] def configure(self): self.blob_index = 0
[docs] def process(self, blob): blob['blob_index'] = self.blob_index self.blob_index += 1 return blob
[docs]class StatusBar(Module): """Displays the current blob number."""
[docs] def configure(self): self.iteration = 1
[docs] def process(self, blob): prettyln("Blob {0:>7}".format(self.every * self.iteration)) self.iteration += 1 return blob
[docs] def finish(self): prettyln(".", fill='=')
[docs]class TickTock(Module): """Display the elapsed time. Parameters ---------- every: int, optional [default=1] Number of iterations between printout. """
[docs] def configure(self): self.t0 = time()
[docs] def process(self, blob): t1 = (time() - self.t0) / 60 prettyln("Time/min: {0:.3f}".format(t1)) return blob
[docs]class MemoryObserver(Module): """Shows the maximum memory usage Parameters ---------- every: int, optional [default=1] Number of iterations between printout. """
[docs] def process(self, blob): memory = peak_memory_usage() prettyln("Memory peak: {0:.3f} MB".format(memory)) return blob
[docs]class Siphon(Module): """A siphon to accumulate a given volume of blobs. Parameters ---------- volume: int number of blobs to hold flush: bool discard blobs after accumulation """
[docs] def configure(self): self.volume = self.require('volume') # [blobs] self.flush = self.get('flush', default=False) self.blob_count = 0
[docs] def process(self, blob): self.blob_count += 1 if self.blob_count > self.volume: log.debug("Siphone overflow reached!") if self.flush: log.debug("Flushing the siphon.") self.blob_count = 0 return blob