.. note:: :class: sphx-glr-download-link-note Click :ref:`here ` to download the full example code .. rst-class:: sphx-glr-example-title .. _sphx_glr_auto_examples_monitoring_trigger_rates.py: ==================== Trigger Rate Monitor ==================== A (messy) script to monitor the trigger rates. .. code-block:: python from __future__ import absolute_import, print_function, division from datetime import datetime from collections import defaultdict, deque, OrderedDict import sys from io import BytesIO import os import shutil import time import threading import matplotlib # Force matplotlib to not use any Xwindows backend. matplotlib.use('Agg') # noqa import matplotlib.pyplot as plt import matplotlib.dates as md import matplotlib.ticker as ticker import km3pipe as kp from km3pipe.config import Config from km3pipe.io import CHPump from km3pipe.io.daq import (DAQPreamble, DAQEvent) import km3pipe.style km3pipe.style.use('km3pipe') log = kp.logger.get_logger("trigger_rate") PLOTS_PATH = 'km3web/plots' xfmt = md.DateFormatter('%Y-%m-%d %H:%M') lock = threading.Lock() general_style = dict(markersize=6, linestyle='None') styles = { "Overall": dict( marker='D', markerfacecolor='None', markeredgecolor='tomato', markeredgewidth=1 ), "3DMuon": dict(marker='X', markerfacecolor='dodgerblue'), "MXShower": dict(marker='v', markerfacecolor='orange'), "3DShower": dict(marker='o', markerfacecolor='greenyellow'), } def trigger_rate_sampling_period(): try: return int(Config().get("Monitoring", "trigger_rate_sampling_period")) except (TypeError, ValueError): return 180 def is_3dshower(trigger_mask): return bool(trigger_mask & 1) def is_mxshower(trigger_mask): return bool(trigger_mask & 2) def is_3dmuon(trigger_mask): return bool(trigger_mask & 16) class TriggerRate(kp.Module): def configure(self): self.run = True self.interval = self.get("interval") or trigger_rate_sampling_period() print("Update interval: {}s".format(self.interval)) self.trigger_counts = defaultdict(int) self.trigger_rates = OrderedDict() for trigger in ["Overall", "3DMuon", "MXShower", "3DShower"]: self.trigger_rates[trigger] = deque( maxlen=int(60 * 24 / (self.interval / 60)) ) self.thread = threading.Thread(target=self.plot).start() def process(self, blob): if not str(blob['CHPrefix'].tag) == 'IO_EVT': return blob sys.stdout.write('.') sys.stdout.flush() data = blob['CHData'] data_io = BytesIO(data) preamble = DAQPreamble(file_obj=data_io) # noqa event = DAQEvent(file_obj=data_io) tm = event.trigger_mask with lock: self.trigger_counts["Overall"] += 1 self.trigger_counts["3DShower"] += is_3dshower(tm) self.trigger_counts["MXShower"] += is_mxshower(tm) self.trigger_counts["3DMuon"] += is_3dmuon(tm) print(self.trigger_counts) return blob def plot(self): while self.run: time.sleep(self.interval) self.create_plot() def create_plot(self): print('\n' + self.__class__.__name__ + ": updating plot.") timestamp = datetime.utcnow() with lock: for trigger, n_events in self.trigger_counts.items(): trigger_rate = n_events / self.interval self.trigger_rates[trigger].append((timestamp, trigger_rate)) self.trigger_counts = defaultdict(int) fig, ax = plt.subplots(figsize=(16, 4)) for trigger, rates in self.trigger_rates.items(): timestamps, trigger_rates = zip(*rates) ax.plot( timestamps, trigger_rates, **styles[trigger], **general_style, label=trigger ) ax.set_title( "Trigger Rates\n{0} UTC".format(datetime.utcnow().strftime("%c")) ) ax.set_xlabel("time") ax.set_ylabel("trigger rate [Hz]") ax.xaxis.set_major_formatter(xfmt) ax.yaxis.set_major_locator( ticker.LogLocator(base=10.0, subs=(1.0, ), numticks=100) ) ax.grid(True) ax.minorticks_on() plt.legend() fig.tight_layout() filename = os.path.join(PLOTS_PATH, 'trigger_rates_lin_test.png') filename_tmp = os.path.join( PLOTS_PATH, 'trigger_rates_lin_test_tmp.png' ) plt.savefig(filename_tmp, dpi=120, bbox_inches="tight") shutil.move(filename_tmp, filename) try: ax.set_yscale('log') except ValueError: pass filename = os.path.join(PLOTS_PATH, 'trigger_rates_test.png') filename_tmp = os.path.join(PLOTS_PATH, 'trigger_rates_test_tmp.png') plt.savefig(filename_tmp, dpi=120, bbox_inches="tight") shutil.move(filename_tmp, filename) plt.close('all') print("Plot updated at '{}'.".format(filename)) def finish(self): self.run = False if self.thread is not None: self.thread.stop() pipe = kp.Pipeline() pipe.attach( CHPump, host='127.0.0.1', port=5553, tags='IO_EVT', timeout=60 * 60 * 24 * 7, max_queue=200000 ) pipe.attach(TriggerRate, interval=60) pipe.drain() **Total running time of the script:** ( 0 minutes 0.000 seconds) .. _sphx_glr_download_auto_examples_monitoring_trigger_rates.py: .. only :: html .. container:: sphx-glr-footer :class: sphx-glr-footer-example .. container:: sphx-glr-download :download:`Download Python source code: trigger_rates.py ` .. container:: sphx-glr-download :download:`Download Jupyter notebook: trigger_rates.ipynb ` .. only:: html .. rst-class:: sphx-glr-signature `Gallery generated by Sphinx-Gallery `_