# Filename: ahrs.py
# -*- coding: utf-8 -*-
# pylint: disable=locally-disabled
"""
AHRS calibration.
"""
from __future__ import absolute_import, print_function, division
import io
from collections import defaultdict
import time
from km3pipe.tools import timed_cache
import xml.etree.ElementTree as ET
import numpy as np
from numpy import cos, sin, arctan2
import km3pipe as kp
from km3pipe.io.daq import TMCHData
__author__ = "Tamas Gal"
__email__ = "tgal@km3net.de"
__status__ = "Development"
[docs]log = kp.logger.get_logger(__name__) # pylint: disable=C0103
# log.setLevel("DEBUG")
[docs]class AHRSCalibrator(kp.Module):
"""Calculates AHRS yaw, pitch and roll from median A and H of an interval.
Required Parameters
-------------------
det_id: int (detector ID, e.g. 29)
Optional Parameters
-------------------
interval: int (accumulation interval in [sec], default: 10s)
Output Keys
-----------
'AHRSCalibration':
dict: key=dom_id, value=tuple: (timestamp, du, floor, yaw, pitch, roll)
"""
[docs] def process(self, blob):
tmch_data = TMCHData(io.BytesIO(blob['CHData']))
dom_id = tmch_data.dom_id
try:
du, floor, _ = self.detector.doms[dom_id]
except KeyError: # base CLB
return blob
self.A[dom_id].append(tmch_data.A)
self.H[dom_id].append(tmch_data.H)
if time.time() - self.timestamp > self.interval:
self.timestamp = time.time()
calib = self.calibrate()
blob['AHRSCalibration'] = calib
return blob
[docs] def calibrate(self):
"""Calculate yaw, pitch and roll from the median of A and H.
After successful calibration, the `self.A` and `self.H` are reset.
DOMs with missing AHRS pre-calibration data are skipped.
Returns
-------
dict: key=dom_id, value=tuple: (timestamp, du, floor, yaw, pitch, roll)
"""
now = time.time()
dom_ids = self.A.keys()
print(
"Calibrating AHRS from median A and H for {} DOMs.".format(
len(dom_ids)
)
)
calibrations = {}
for dom_id in dom_ids:
print("Calibrating DOM ID {}".format(dom_id))
clb_upi = self.db.doms.via_dom_id(dom_id).clb_upi
ahrs_calib = get_latest_ahrs_calibration(clb_upi)
if ahrs_calib is None:
log.warning("AHRS calibration missing for '{}'".format(dom_id))
continue
du, floor, _ = self.detector.doms[dom_id]
A = np.median(self.A[dom_id], axis=0)
H = np.median(self.H[dom_id], axis=0)
cyaw, cpitch, croll = fit_ahrs(A, H, *ahrs_calib)
calibrations[dom_id] = (now, du, floor, cyaw, cpitch, croll)
self.A = defaultdict(list)
self.H = defaultdict(list)
return calibrations
[docs]def fit_ahrs(A, H, Aoff, Arot, Hoff, Hrot):
"""Calculate yaw, pitch and roll for given A/H and calibration set.
Author: Vladimir Kulikovsky
Parameters
----------
A: list, tuple or numpy.array of shape (3,)
H: list, tuple or numpy.array of shape (3,)
Aoff: numpy.array of shape(3,)
Arot: numpy.array of shape(3, 3)
Hoff: numpy.array of shape(3,)
Hrot: numpy.array of shape(3, 3)
Returns
-------
yaw, pitch, roll
"""
Acal = np.dot(A - Aoff, Arot)
Hcal = np.dot(H - Hoff, Hrot)
# invert axis for DOM upside down
for i in (1, 2):
Acal[i] = -Acal[i]
Hcal[i] = -Hcal[i]
roll = arctan2(-Acal[1], -Acal[2])
pitch = arctan2(Acal[0], np.sqrt(Acal[1] * Acal[1] + Acal[2] * Acal[2]))
yaw = arctan2(
Hcal[2] * sin(roll) - Hcal[1] * cos(roll),
sum((
Hcal[0] * cos(pitch), Hcal[1] * sin(pitch) * sin(roll),
Hcal[2] * sin(pitch) * cos(roll)
))
)
yaw = np.degrees(yaw)
while yaw < 0:
yaw += 360
# yaw = (yaw + magnetic_declination + 360 ) % 360
roll = np.degrees(roll)
pitch = np.degrees(pitch)
return yaw, pitch, roll
@timed_cache(hours=1, maxsize=None, typed=False)
[docs]def get_latest_ahrs_calibration(clb_upi, max_version=3, db=None):
"""Retrieve the latest AHRS calibration data for a given CLB
Parameters
----------
clb_upi: str
max_version: int, maximum version to check, optional
db: DBManager(), optional
Returns
-------
Aoff: numpy.array with shape(3,)
Arot: numpy.array with shape(3,3)
Hoff: numpy.array with shape(3,)
Hrot: numpy.array with shape(3,3)
or None if no calibration found.
"""
ahrs_upi = kp.db.clbupi2ahrsupi(clb_upi)
if db is None:
db = kp.db.DBManager()
for version in range(max_version, 0, -1):
raw_data = ""
for n in range(1, 100):
log.debug("Iteration #{} to get the calib data".format(n))
url = "show_product_test.htm?upi={0}&" \
"testtype=AHRS-CALIBRATION-v{1}&n={2}&out=xml" \
.format(ahrs_upi, version, n)
log.debug("AHRS calib DB URL: {}".format(url))
_raw_data = db._get_content(url).replace('\n', '')
log.debug("What I got back as AHRS calib: {}".format(_raw_data))
if len(_raw_data) == 0:
break
else:
raw_data = _raw_data
try:
xroot = ET.parse(io.StringIO(raw_data)).getroot()
except ET.ParseError:
continue
else:
return _extract_calibration(xroot)
return None
def _extract_calibration(xroot):
"""Extract AHRS calibration information from XML root.
Parameters
----------
xroot: XML root
Returns
-------
Aoff: numpy.array with shape(3,)
Arot: numpy.array with shape(3,3)
Hoff: numpy.array with shape(3,)
Hrot: numpy.array with shape(3,3)
"""
names = [c.text for c in xroot.findall(".//Name")]
val = [[i.text for i in c] for c in xroot.findall(".//Values")]
# The fields has to be reindeced, these are the index mappings
col_ic = [int(v) for v in val[names.index("AHRS_Matrix_Column(-)")]]
try:
row_ic = [int(v) for v in val[names.index("AHRS_Matrix_Row(-)")]]
except ValueError:
row_ic = [2, 2, 2, 1, 1, 1, 0, 0, 0]
try:
vec_ic = [int(v) for v in val[names.index("AHRS_Vector_Index(-)")]]
except ValueError:
vec_ic = [2, 1, 0]
Aoff_ix = names.index("AHRS_Acceleration_Offset(g/ms^2-)")
Arot_ix = names.index("AHRS_Acceleration_Rotation(-)")
Hrot_ix = names.index("AHRS_Magnetic_Rotation(-)")
Aoff = np.array(val[Aoff_ix])[vec_ic].astype(float)
Arot = np.array(val[Arot_ix]).reshape(3, 3)[col_ic, row_ic] \
.reshape(3, 3).astype(float)
Hrot = np.array(val[Hrot_ix]).reshape(3, 3)[col_ic, row_ic] \
.reshape(3, 3).astype(float)
Hoff = []
for q in 'XYZ':
values = []
for t in ('Min', 'Max'):
ix = names.index("AHRS_Magnetic_{}{}(G-)".format(q, t))
values.append(float(val[ix][0]))
Hoff.append(sum(values) / 2.)
Hoff = np.array(Hoff)
return Aoff, Arot, Hoff, Hrot