.. note:: :class: sphx-glr-download-link-note Click :ref:`here ` to download the full example code .. rst-class:: sphx-glr-example-title .. _sphx_glr_auto_examples_monitoring_hrv.py: =========== HRV Monitor =========== The following script checks the high-rate-veto for each PMT. .. code-block:: python from __future__ import absolute_import, print_function, division # Author: Tamas Gal # License: MIT from datetime import datetime import io from collections import defaultdict import threading import time import km3pipe as kp from km3pipe.io.daq import TMCHData import numpy as np import matplotlib matplotlib.use('Agg') # noqa import matplotlib.pyplot as plt import km3pipe.style as kpst kpst.use("km3pipe") __author__ = "Tamas Gal" __email__ = "tgal@km3net.de" VERSION = "1.0" log = kp.logger.get_logger("HRV") class PMTRates(kp.Module): def configure(self): self.detector = self.require("detector") self.du = self.require("du") self.interval = self.get("interval") or 10 self.plot_path = self.get("plot_path") or "km3web/plots/hrv.png" self.max_x = 800 self.index = 0 self.hrv = defaultdict(list) self.hrv_matrix = np.full((18 * 31, self.max_x), np.nan) self.lock = threading.Lock() self.thread = threading.Thread(target=self.run, args=()) self.thread.daemon = True self.thread.start() def run(self): interval = self.interval while True: time.sleep(interval) now = datetime.now() self.add_column() self.update_plot() with self.lock: self.hrv = defaultdict(list) delta_t = (datetime.now() - now).total_seconds() remaining_t = self.interval - delta_t print( "Delta t: {} -> waiting for {}s".format( delta_t, self.interval - delta_t ) ) if (remaining_t < 0): log.error( "Can't keep up with plot production. " "Increase the interval!" ) interval = 1 else: interval = remaining_t def add_column(self): m = np.roll(self.hrv_matrix, -1, 1) y_range = 18 * 31 mean_hrv = np.full(y_range, np.nan) for i in range(y_range): if i not in self.hrv: continue mean_hrv[i] = np.mean(self.hrv[i]) m[:, self.max_x - 1] = mean_hrv self.hrv_matrix = m print(self.hrv_matrix) def update_plot(self): print("Updating plot at {}".format(self.plot_path)) now = time.time() max_x = self.max_x interval = self.interval def xlabel_func(timestamp): return datetime.utcfromtimestamp(timestamp).strftime("%H:%M") m = self.hrv_matrix fig, ax = plt.subplots(figsize=(10, 6)) ax.imshow(m, origin='lower') ax.set_title( "HRV Ratios for DU-{}\n{}".format(self.du, datetime.utcnow()) ) ax.set_xlabel("UTC time [{}s/px]".format(interval)) plt.yticks([i * 31 for i in range(18)], ["Floor {}".format(f) for f in range(1, 19)]) xtics_int = range(0, max_x, int(max_x / 10)) plt.xticks([i for i in xtics_int], [ xlabel_func(now - (max_x - i) * interval) for i in xtics_int ]) fig.tight_layout() plt.savefig(self.plot_path) plt.close('all') def process(self, blob): tmch_data = TMCHData(io.BytesIO(blob['CHData'])) dom_id = tmch_data.dom_id if dom_id not in self.detector.doms: return blob du, floor, _ = self.detector.doms[dom_id] if du != self.du: return blob hrv_flags = reversed("{0:b}".format(tmch_data.hrvbmp).zfill(32)) y_base = (floor - 1) * 31 for channel_id, hrv_flag in enumerate(hrv_flags): idx = y_base + channel_id with self.lock: self.hrv[idx].append(int(hrv_flag)) return blob def main(): detector = kp.hardware.Detector(det_id=29) pipe = kp.Pipeline(timeit=True) pipe.attach( kp.io.CHPump, host='192.168.0.110', port=5553, tags='IO_MONIT', timeout=60 * 60 * 24 * 7, max_queue=1000 ) pipe.attach(PMTRates, detector=detector, du=2, interval=10) pipe.drain() if __name__ == "__main__": main() **Total running time of the script:** ( 0 minutes 0.000 seconds) .. _sphx_glr_download_auto_examples_monitoring_hrv.py: .. only :: html .. container:: sphx-glr-footer :class: sphx-glr-footer-example .. container:: sphx-glr-download :download:`Download Python source code: hrv.py ` .. container:: sphx-glr-download :download:`Download Jupyter notebook: hrv.ipynb ` .. only:: html .. rst-class:: sphx-glr-signature `Gallery generated by Sphinx-Gallery `_