# Filename: plot.py
# -*- coding: utf-8 -*-
# pylint: disable=locally-disabled
"""
A collection of plotting functions and modules.
"""
from __future__ import absolute_import, print_function, division
from datetime import datetime
import os
import multiprocessing as mp
import time
import pandas as pd
import numpy as np
import matplotlib
# Force matplotlib to not use any Xwindows backend.
matplotlib.use('Agg')
import matplotlib.pyplot as plt # noqa
from matplotlib import pylab # noqa
try:
import healpy as hp
except ImportError:
pass
import km3pipe as kp # noqa
import km3pipe.style # noqa
[docs]def plot_dom_parameters(
data,
detector,
filename,
label,
title,
vmin=0.0,
vmax=10.0,
cmap='RdYlGn_r',
under='deepskyblue',
over='deeppink',
underfactor=1.0,
overfactor=1.0,
missing='lightgray',
hide_limits=False
):
"""Creates a plot in the classical monitoring.km3net.de style.
Parameters
----------
data: dict((du, floor) -> value)
detector: km3pipe.hardware.Detector() instance
filename: filename or filepath
label: str
title: str
underfactor: a scale factor for the points used for underflow values
overfactor: a scale factor for the points used for overflow values
hide_limits: do not show under/overflows in the plot
"""
x, y, _ = zip(*detector.doms.values())
fig, ax = plt.subplots(figsize=(10, 6))
cmap = plt.get_cmap(cmap)
cmap.set_over(over, 1.0)
cmap.set_under(under, 1.0)
m_size = 100
scatter_args = {
'edgecolors': 'None',
'vmin': vmin,
'vmax': vmax,
}
sc_inactive = ax.scatter(
x, y, c=missing, label='missing', s=m_size * 0.9, **scatter_args
)
xa, ya = map(np.array, zip(*data.keys()))
zs = np.array(list(data.values()))
in_range_idx = np.logical_and(zs >= vmin, zs <= vmax)
sc = ax.scatter(
xa[in_range_idx],
ya[in_range_idx],
c=zs[in_range_idx],
cmap=cmap,
s=m_size,
**scatter_args
)
if not hide_limits:
under_idx = zs < vmin
ax.scatter(
xa[under_idx],
ya[under_idx],
c=under,
label='< {0}'.format(vmin),
s=m_size * underfactor,
**scatter_args
)
over_idx = zs > vmax
ax.scatter(
xa[over_idx],
ya[over_idx],
c=over,
label='> {0}'.format(vmax),
s=m_size * overfactor,
**scatter_args
)
cb = plt.colorbar(sc)
cb.set_label(label)
ax.set_title(
"{0}\n{1} UTC".format(title,
datetime.utcnow().strftime("%c"))
)
ax.set_xlabel("DU")
ax.set_ylabel("DOM")
ax.set_ylim(-2)
ax.set_yticks(range(1, 18 + 1))
major_locator = pylab.MaxNLocator(integer=True)
sc_inactive.axes.xaxis.set_major_locator(major_locator)
ax.legend(
bbox_to_anchor=(0., -.16, 1., .102),
loc=1,
ncol=2,
mode="expand",
borderaxespad=0.
)
fig.tight_layout()
plt.savefig(filename, dpi=120, bbox_inches="tight")
plt.close('all')
[docs]def make_dom_map(pmt_directions, values, nside=512, d=0.2, smoothing=0.1):
"""Create a mollweide projection of a DOM with given PMTs.
The output can be used to call the `healpy.mollview` function.
"""
import healpy as hp
discs = [hp.query_disc(nside, dir, 0.2) for dir in pmt_directions]
npix = hp.nside2npix(nside)
pixels = np.zeros(npix)
for disc, value in zip(discs, values):
for d in disc:
pixels[d] = value
if smoothing > 0:
return hp.sphtfunc.smoothing(pixels, fwhm=smoothing, iter=1)
return pixels
[docs]class IntraDOMCalibrationPlotter(kp.Module):
[docs] def process(self, blob):
calibration = blob["IntraDOMCalibration"]
for process in (self.create_plot, self.save_hdf5):
proc = mp.Process(target=process, args=(calibration, ))
proc.daemon = True
proc.start()
proc.join()
return blob
[docs] def create_plot(self, calibration):
print("Creating plot...")
fig, axes = plt.subplots(
6, 3, figsize=(16, 20), sharex=True, sharey=True
)
sorted_dom_ids = sorted(
calibration.keys(),
key=lambda d: self.db.doms.
via_dom_id(dom_id=d, det_id=self.det_oid).omkey
) # by DU and FLOOR, note that DET OID is needed!
for ax, dom_id in zip(axes.flatten(), sorted_dom_ids):
calib = calibration[dom_id]
ax.plot(np.cos(calib['angles']), calib["means"], '.')
ax.plot(np.cos(calib['angles']), calib["corrected_means"], '.')
ax.set_title(
"{0} - {1}".format(
self.db.doms.via_dom_id(dom_id, self.det_oid), dom_id
)
)
ax.set_ylim((-10, 10))
plt.suptitle("{0} UTC".format(datetime.utcnow().strftime("%c")))
plt.savefig(
os.path.join(self.plots_path, "intradom.png"), bbox_inches='tight'
)
plt.close('all')
fig, axes = plt.subplots(
6, 3, figsize=(16, 20), sharex=True, sharey=True
)
for ax, dom_id in zip(axes.flatten(), sorted_dom_ids):
calib = calibration[dom_id]
ax.plot(np.cos(calib['angles']), calib["rates"], '.')
ax.plot(np.cos(calib['angles']), calib["corrected_rates"], '.')
ax.set_title(
"{0} - {1}".format(
self.db.doms.via_dom_id(dom_id, self.det_oid), dom_id
)
)
ax.set_ylim((0, 10))
plt.suptitle("{0} UTC".format(datetime.utcnow().strftime("%c")))
plt.savefig(
os.path.join(self.plots_path, "angular_k40rate_distribution.png"),
bbox_inches='tight'
)
plt.close('all')
[docs] def save_hdf5(self, calibration):
print("Saving calibration information...")
store = pd.HDFStore(
os.path.join(self.data_path, 'k40calib.h5'), mode='a'
)
now = int(time.time())
timestamps = (now, ) * 31
for dom_id, calib in calibration.items():
tdc_channels = range(31)
t0s = calib['opt_t0s'].x
dom_ids = (dom_id, ) * 31
df = pd.DataFrame({
'timestamp': timestamps,
'dom_id': dom_ids,
'tdc_channel': tdc_channels,
't0s': t0s
})
store.append('t0s', df, format='table', data_columns=True)
store.close()