# Filename: common.py
# -*- coding: utf-8 -*-
# pylint: disable=locally-disabled
"""
A collection of commonly used modules.
"""
from __future__ import absolute_import, print_function, division
from time import time
import numpy as np
import km3pipe as kp
from km3pipe import Module, Blob
from km3pipe.tools import prettyln
from km3pipe.sys import peak_memory_usage
from km3pipe.math import zenith, azimuth
from km3pipe.dataclasses import Table
[docs]log = kp.logger.get_logger(__name__)
[docs]class Dump(Module):
"""Print the content of the blob.
Parameters
----------
keys: collection(string), optional [default=None]
Keys to print. If None, print all keys.
full: bool, default=False
Print blob values too, not just the keys?
"""
[docs] def process(self, blob):
keys = sorted(blob.keys()) if self.keys is None else self.keys
for key in keys:
print(key + ':')
if self.full:
print(blob[key].__repr__())
print('')
print('----------------------------------------\n')
return blob
[docs]class Delete(Module):
"""Remove specific keys from the blob.
Parameters
----------
keys: collection(string), optional
Keys to remove.
"""
[docs] def process(self, blob):
for key in self.keys:
blob.pop(key, None)
return blob
[docs]class Keep(Module):
"""Keep only specified keys in the blob.
Parameters
----------
keys: collection(string), optional
Keys to keep. Everything else is removed.
"""
[docs] def process(self, blob):
out = Blob()
for key in blob.keys():
if key in self.keys:
out[key] = blob[key]
elif hasattr(blob[key], 'h5loc') and blob[key].h5loc.startswith(
tuple(self.h5locs)):
out[key] = blob[key]
return out
[docs]class HitCounter(Module):
"""Prints the number of hits"""
[docs] def process(self, blob):
try:
self.print("Number of hits: {0}".format(len(blob['Hit'])))
except KeyError:
pass
return blob
[docs]class HitCalibrator(Module):
"""A very basic hit calibrator, which requires a `Calibration` module."""
[docs] def process(self, blob):
if self.input_key not in blob:
self.log.warn("No hits found in key '{}'.".format(self.input_key))
return blob
hits = blob[self.input_key]
chits = self.calibration.apply(hits)
blob[self.output_key] = chits
return blob
[docs]class BlobIndexer(Module):
"""Puts an incremented index in each blob for the key 'blob_index'"""
[docs] def process(self, blob):
blob['blob_index'] = self.blob_index
self.blob_index += 1
return blob
[docs]class StatusBar(Module):
"""Displays the current blob number."""
[docs] def process(self, blob):
prettyln("Blob {0:>7}".format(self.every * self.iteration))
self.iteration += 1
return blob
[docs] def finish(self):
prettyln(".", fill='=')
[docs]class TickTock(Module):
"""Display the elapsed time.
Parameters
----------
every: int, optional [default=1]
Number of iterations between printout.
"""
[docs] def process(self, blob):
t1 = (time() - self.t0) / 60
prettyln("Time/min: {0:.3f}".format(t1))
return blob
[docs]class MemoryObserver(Module):
"""Shows the maximum memory usage
Parameters
----------
every: int, optional [default=1]
Number of iterations between printout.
"""
[docs] def process(self, blob):
memory = peak_memory_usage()
prettyln("Memory peak: {0:.3f} MB".format(memory))
return blob
[docs]class Siphon(Module):
"""A siphon to accumulate a given volume of blobs.
Parameters
----------
volume: int
number of blobs to hold
flush: bool
discard blobs after accumulation
"""
[docs] def process(self, blob):
self.blob_count += 1
if self.blob_count > self.volume:
log.debug("Siphone overflow reached!")
if self.flush:
log.debug("Flushing the siphon.")
self.blob_count = 0
return blob