#!/usr/bin/env python
"""Convert a showerfit file to HDF5.
Usage:
i3shower2hdf5.py INFILE [OUTFILE]
i3shower2hdf5.py -h | --help
Options:
-h --help Show this screen.
"""
from math import isnan, fabs # todo: replace with numpy?
import h5py
import pandas as pd
# order of these imports is crucial!!!
# 'from icecube import icetray' secretly loads stuff behind the scenes, it seems # noqa
# very un-pythonic, but eh
# see also http://software.icecube.wisc.edu/documentation/projects/dataclasses/faq-common-probs.html#runtimeerror-extension-class-wrapper-for-base-class-i3frameobject-has-not-been-created-yet # noqa
from icecube import icetray, dataio # noqa
from icecube.icetray import I3Module, I3Bool, I3Int
from icecube.dataclasses import I3Double
from I3Tray import I3Tray
# CRUCIAL import this after the ones before
from icecube import antares_common, antares_reader # noqa
from icecube.gulliver import I3LogLikelihoodFitParams # noqa
__author__ = "Moritz Lotze"
__copyright__ = "Copyright 2017, Moritz Lotze and the KM3NeT collaboration."
__credits__ = ["Thomas Heid"]
__license__ = "BSD-3"
__maintainer__ = "Moritz Lotze"
__email__ = "mlotze@km3net.de"
__status__ = "Development"
[docs]class WriteScalars(I3Module):
"""DUmp all Scalars to disk."""
def __init__(self, context):
I3Module.__init__(self, context)
self.AddParameter("Filename", "Name the file to write into.", "foo.h5")
self.AddParameter("Tablename", "Name of the table.", "/data")
self.AddOutBox("OutBox")
[docs] def Physics(self, frame):
out = {}
for k, v in frame.items():
if k in out:
continue
if type(v) not in {I3Double, I3Bool, I3Int}:
continue
out[k] = v.value
self.store.append(out)
self.PushFrame(frame)
[docs] def Finish(self):
arr = pd.DataFrame(self.store).to_records(index=False)
with h5py.File(self.filename, 'w') as h5:
h5.create_dataset(
self.tablename,
data=arr,
compression="gzip",
compression_opts=5,
shuffle=True,
fletcher32=True
)
[docs]class KeepReconstructed(I3Module):
"""Discard all events which don't have the full fit."""
def __init__(self, context):
I3Module.__init__(self, context)
self.AddOutBox("OutBox")
[docs] def Physics(self, frame):
if frame.Has(
"best_DusjOrcaUsingProbabilitiesFinalFit_FitResult_FinalLLHValues"
): # noqa
self.PushFrame(frame)
[docs]class ReadLLHValues(I3Module):
"""Read the LLH values of a final fit."""
def __init__(self, context):
I3Module.__init__(self, context)
self.AddParameter(
"LLHParamContainer", "Name of LLH value container", ""
)
self.AddOutBox("OutBox")
[docs] def Physics(self, frame):
try:
llh_params = frame.Get(self.llh_cont)
for name, param in llh_params.items():
try:
frame.Put(name, I3Double(param))
except RuntimeError:
continue
except KeyError:
pass
self.PushFrame(frame)
[docs]class ReadRecoParticle(I3Module):
"""Read the position, energy, ... of a fit."""
def __init__(self, context):
I3Module.__init__(self, context)
self.AddParameter("ParticleName", "Name of the reco particle", "")
self.AddOutBox("OutBox")
[docs] def Configure(self):
self.particlename = self.GetParameter("ParticleName")
[docs] def Physics(self, frame):
try:
particle = frame.Get(self.particlename)
except Exception:
self.PushFrame(frame)
return
particle_map = self._read_particle(particle)
for key, val in particle_map.items():
name = self.particlename + '_' + key
frame.Put(name, I3Double(val))
self.PushFrame(frame)
def _read_particle(self, particle):
out = {}
out["pos_x"] = particle.GetX()
out["pos_y"] = particle.GetY()
out["pos_z"] = particle.GetZ()
out["time"] = particle.GetTime()
out["theta"] = particle.GetDir().CalcTheta()
out["phi"] = particle.GetDir().CalcPhi()
out["energy"] = particle.GetEnergy()
return out
[docs]class Readrlogl(I3Module):
"""Read the Logl of the final fits."""
def __init__(self, context):
I3Module.__init__(self, context)
self.AddOutBox("OutBox")
]
[docs] def Physics(self, frame):
for fit_param in self.fit_params:
try:
rlogl = self.Readrlogl(frame, fit_param)
except Exception:
continue
if isnan(rlogl):
rlogl = -0.1
frame.Put(fit_param + "__rlogl", I3Double(rlogl))
self.PushFrame(frame)
[docs] def Readrlogl(self, frame, FitParameters):
FitParameters = frame.Get(FitParameters)
rlogl = FitParameters.rlogl
return float(rlogl)
[docs]class Compare(I3Module):
"""Compare prefit + final fit."""
def __init__(self, context):
I3Module.__init__(self, context)
self.AddParameter("particle_1", "Name of first particle", "")
self.AddParameter("particle_2", "Name of second particle", "")
self.AddOutBox("OutBox2")
[docs] def Physics(self, frame):
self.OK = True
try:
self.GetParticles(frame)
except KeyError:
pass
self.Check(frame)
[docs] def Check(self, frame):
for Particle in self.InputParticles:
self.CheckParticle(Particle, frame)
[docs] def CheckParticle(self, Particle, frame):
if isnan(Particle.GetPos().X) or isnan(Particle.GetTime()):
self.OK = False
[docs] def GetParticles(self, frame):
self.InputParticles = []
for InputParticleName in self.InputParticleName:
self.InputParticles.append(frame.Get(InputParticleName))
[docs]class Distance(Compare):
"""Spatial Distance between 2 vertices (prefit + final)."""
def __init__(self, context):
Compare.__init__(self, context)
self.AddOutBox("OutBox")
[docs] def Physics(self, frame):
try:
Compare.Physics(self, frame)
if self.OK:
self.get_vertex()
self.get_distance()
frame.Put("Distance", I3Double(self.Distance))
except IndexError:
pass
self.PushFrame(frame)
[docs] def get_vertex(self):
self.Vertex = []
for Particle in self.InputParticles:
pos = Particle.GetPos()
self.Vertex.append(pos)
[docs] def get_distance(self):
self.Distance = self.Vertex[0].CalcDistance(self.Vertex[1])
[docs]class TimeDistance(Compare):
"""Time Distance between 2 vertices (prefit + final)."""
def __init__(self, context):
Compare.__init__(self, context)
self.AddOutBox("OutBox")
[docs] def Physics(self, frame):
try:
Compare.Physics(self, frame)
if self.OK:
self.GetTime()
self.GetTimeDifference()
frame.Put("TimeDistance", I3Double(self.TimeDifference))
except IndexError:
pass
self.PushFrame(frame)
[docs] def GetTime(self):
self.Time = []
for Particle in self.InputParticles:
self.Time.append(I3Double(Particle.GetTime()).value)
[docs] def GetTimeDifference(self):
self.TimeDifference = I3Double(
fabs(float(self.Time[0]) - float(self.Time[1]))
)
[docs]def main():
"""Entry point when running as script from commandline."""
from docopt import docopt
args = docopt(__doc__)
infile = args['INFILE']
outfile = args['OUTFILE']
i3extract(infile, outfile)
if __name__ == '__main__':
main()