Source code for km3pipe.db

# Filename: db.py
# pylint: disable=locally-disabled
"""
Database utilities.

"""
from __future__ import absolute_import, print_function, division

from datetime import datetime
import numbers
import ssl
import io
import json
import re
import pytz
from collections import defaultdict, OrderedDict, namedtuple
try:
    from inspect import Signature, Parameter
[docs] SKIP_SIGNATURE_HINTS = False
except ImportError: SKIP_SIGNATURE_HINTS = True try: from urllib.parse import urlencode, unquote from urllib.request import ( Request, build_opener, urlopen, HTTPCookieProcessor, HTTPHandler ) from urllib.error import URLError, HTTPError from io import StringIO from http.client import IncompleteRead except ImportError: from urllib import urlencode, unquote from urllib2 import ( Request, build_opener, urlopen, HTTPCookieProcessor, HTTPHandler, URLError, HTTPError ) from StringIO import StringIO from httplib import IncompleteRead
[docs] input = raw_input
from .tools import cprint from .time import Timer from .logger import get_logger __author__ = "Tamas Gal" __copyright__ = "Copyright 2016, Tamas Gal and the KM3NeT collaboration." __credits__ = [] __license__ = "MIT" __maintainer__ = "Tamas Gal" __email__ = "tgal@km3net.de" __status__ = "Development"
[docs]log = get_logger(__name__) # pylint: disable=C0103
[docs]UTC_TZ = pytz.timezone('UTC')
# Ignore invalid certificate error ssl._create_default_https_context = ssl._create_unverified_context
[docs]BASE_URL = 'https://km3netdbweb.in2p3.fr'
[docs]def we_are_in_lyon(): """Check if we are on a Lyon machine""" import socket try: hostname = socket.gethostname() ip = socket.gethostbyname(hostname) except socket.gaierror: return False return ip.startswith("134.158.")
[docs]def read_csv(text, sep="\t"): """Create a DataFrame from CSV text""" import pandas as pd # no top level load to make a faster import of db return pd.read_csv(StringIO(text), sep="\t")
[docs]def make_empty_dataset(): """Create an empty dataset""" import pandas as pd # no top level load to make a faster import of db return pd.DataFrame()
[docs]class DBManager(object): """A wrapper for the KM3NeT Web DB""" def __init__( self, username=None, password=None, url=None, temporary=False ): "Create database connection" self._cookies = [] self._parameters = None self._doms = None self._detectors = None self._opener = None self._temporary = temporary from .config import Config config = Config() if url is not None: self._db_url = url else: self._db_url = config.db_url or BASE_URL self._login_url = self._db_url + '/home.htm' if not temporary and we_are_in_lyon(): self.restore_session( "sid=_kmcprod_134.158_lyo7783844001343100343mcprod1223user" ) return if username is not None and password is not None: self.login(username, password) elif config.db_session_cookie not in (None, ''): self.restore_session(config.db_session_cookie) else: username, password = config.db_credentials login_ok = self.login(username, password) if login_ok and not self._temporary and \ input("Request permanent session? (y/n)") in 'yY': self.request_permanent_session(username, password)
[docs] def datalog(self, parameter, run, maxrun=None, det_id='D_ARCA001'): "Retrieve datalogs for given parameter, run(s) and detector" parameter = parameter.lower() if maxrun is None: maxrun = run with Timer('Database lookup'): return self._datalog(parameter, run, maxrun, det_id)
def _datalog(self, parameter, run, maxrun, det_id): "Extract data from database" values = { 'parameter_name': parameter, 'minrun': run, 'maxrun': maxrun, 'detid': det_id, } data = urlencode(values) content = self._get_content('streamds/datalognumbers.txt?' + data) if content.startswith('ERROR'): log.error(content) return None try: dataframe = read_csv(content) except ValueError: log.warning( "Empty dataset" ) # ...probably. Waiting for more info return make_empty_dataset() else: add_datetime(dataframe) try: self._add_converted_units(dataframe, parameter) except KeyError: log.warning( "Could not add converted units for {0}".format(parameter) ) return dataframe
[docs] def run_table(self, det_id='D_ARCA001'): url = 'streamds/runs.txt?detid={0}'.format(det_id) content = self._get_content(url) try: df = read_csv(content) except ValueError: log.warning("Empty dataset") return None else: timestamp_column = 'UNIXSTARTTIME' add_datetime(df, timestamp_column) df['RUNDURATION'] = -df[timestamp_column].diff(periods=-1) / 1000 return df
def _add_converted_units(self, dataframe, parameter, key='VALUE'): """Add an additional DATA_VALUE column with converted VALUEs""" convert_unit = self.parameters.get_converter(parameter) try: log.debug("Adding unit converted DATA_VALUE to the data") dataframe[key] = dataframe['DATA_VALUE'].apply(convert_unit) except KeyError: log.warning("Missing 'VALUE': no unit conversion.") else: dataframe.unit = self.parameters.unit(parameter) @property
[docs] def detectors(self): if self._detectors is None: self._detectors = self._get_detectors() return self._detectors
def _get_detectors(self): content = self._get_content('streamds/detectors.txt') try: dataframe = read_csv(content) except ValueError: log.warning("Empty dataset") return make_empty_dataset() else: return dataframe
[docs] def get_det_id(self, det_oid): """Convert detector string representation (OID) to serialnumber""" try: return self.detectors[self.detectors.OID == det_oid ].SERIALNUMBER.iloc[0] except IndexError: log.critical("No det ID found for OID '{}'".format(det_oid)) return None
[docs] def get_det_oid(self, det_id): """Convert detector serialnumber to string representation (OID)""" try: return self.detectors[self.detectors.SERIALNUMBER == det_id ].OID.iloc[0] except IndexError: log.critical("No OID found for det ID '{}'".format(det_id)) return None
[docs] def to_det_id(self, det_id_or_det_oid): """Convert det ID or OID to det ID""" try: int(det_id_or_det_oid) except ValueError: return self.get_det_id(det_id_or_det_oid) else: return det_id_or_det_oid
[docs] def to_det_oid(self, det_id_or_det_oid): """Convert det OID or ID to det OID""" try: int(det_id_or_det_oid) except ValueError: return det_id_or_det_oid else: return self.get_det_oid(det_id_or_det_oid)
@property
[docs] def parameters(self): "Return the parameters container for quick access to their details" if self._parameters is None: self._load_parameters() return self._parameters
def _load_parameters(self): "Retrieve a list of available parameters from the database" parameters = self._get_json('allparam/s') data = {} for parameter in parameters: # There is a case-chaos in the DB data[parameter['Name'].lower()] = parameter self._parameters = ParametersContainer(data)
[docs] def trigger_setup(self, runsetup_oid): "Retrieve the trigger setup for a given runsetup OID" r = self._get_content( "jsonds/rslite/s?rs_oid={}&upifilter=1.1.2.2.3/*". format(runsetup_oid) ) data = json.loads(r)['Data'] if not data: log.error("Empty dataset.") return raw_setup = data[0] det_id = raw_setup['DetID'] name = raw_setup['Name'] description = raw_setup['Desc'] _optical_df = raw_setup['ConfGroups'][0] optical_df = {'Name': _optical_df['Name'], 'Desc': _optical_df['Desc']} for param in _optical_df['Params']: pname = self.parameters.oid2name(param['OID']).replace('DAQ_', '') try: dtype = float if '.' in param['Val'] else int val = dtype(param['Val']) except ValueError: val = param['Val'] optical_df[pname] = val _acoustic_df = raw_setup['ConfGroups'][1] acoustic_df = { 'Name': _acoustic_df['Name'], 'Desc': _acoustic_df['Desc'] } for param in _acoustic_df['Params']: pname = self.parameters.oid2name(param['OID']).replace('DAQ_', '') try: dtype = float if '.' in param['Val'] else int val = dtype(param['Val']) except ValueError: val = param['Val'] acoustic_df[pname] = val return TriggerSetup( runsetup_oid, name, det_id, description, optical_df, acoustic_df
) @property
[docs] def doms(self): if self._doms is None: self._load_doms() return self._doms
def _load_doms(self): "Retrieve DOM information from the database" doms = self._get_json('domclbupiid/s') self._doms = DOMContainer(doms)
[docs] def detx(self, det_id, t0set=None, calibration=None): """Retrieve the detector file for given detector id If t0set is given, append the calibration data. """ url = 'detx/{0}?'.format(det_id) # '?' since it's ignored if no args if t0set is not None: url += '&t0set=' + t0set if calibration is not None: url += '&calibrid=' + calibration detx = self._get_content(url) return detx
[docs] def ahrs(self, run, maxrun=None, clbupi=None, det_id='D_ARCA001'): "Retrieve AHRS values for given run(s) (optionally CLBs) and detector" if maxrun is None: maxrun = run with Timer('Database lookup'): return self._ahrs(run, maxrun, clbupi, det_id)
def _ahrs(self, run, maxrun, clbupi, det_id): values = { 'minrun': run, 'maxrun': maxrun, 'detid': det_id, } if clbupi is not None: values['clbupi'] = clbupi data = urlencode(values) content = self._get_content('streamds/ahrs.txt?' + data) if content.startswith('ERROR'): log.error(content) return None try: dataframe = read_csv(content) except ValueError: log.warning( "Empty dataset" ) # ...probably. Waiting for more info return make_empty_dataset() else: add_datetime(dataframe) return dataframe def _get_json(self, url): "Get JSON-type content" content = self._get_content('jsonds/' + url) try: json_content = json.loads(content.decode()) except AttributeError: json_content = json.loads(content) if json_content['Comment']: log.warning(json_content['Comment']) if json_content['Result'] != 'OK': raise ValueError('Error while retrieving the parameter list.') return json_content['Data'] def _get_content(self, url): "Get HTML content" target_url = self._db_url + '/' + unquote(url) # .encode('utf-8')) log.debug("Opening '{0}'".format(target_url)) try: f = self.opener.open(target_url) except HTTPError as e: log.error("HTTP error, your session may be expired.") log.error(e) if input("Request new permanent session and retry? (y/n)") in 'yY': self.request_permanent_session() return self._get_content(url) else: return None log.debug("Accessing '{0}'".format(target_url)) try: content = f.read() except IncompleteRead as icread: log.critical( "Incomplete data received from the DB, " + "the data could be corrupted." ) content = icread.partial log.debug("Got {0} bytes of data.".format(len(content))) return content.decode('utf-8') @property
[docs] def opener(self): "A reusable connection manager" if self._opener is None: log.debug("Creating connection handler") opener = build_opener() if self._cookies: log.debug("Appending cookies") else: log.debug("No cookies to append") for cookie in self._cookies: cookie_str = cookie.name + '=' + cookie.value opener.addheaders.append(('Cookie', cookie_str)) self._opener = opener else: log.debug("Reusing connection manager") return self._opener
[docs] def restore_session(self, cookie): """Establish databse connection using permanent session cookie""" log.debug("Restoring session from cookie: {}".format(cookie)) opener = build_opener() opener.addheaders.append(('Cookie', cookie)) self._opener = opener
[docs] def request_permanent_session(self, username=None, password=None): log.debug("Requesting permanent session") from .config import Config config = Config() if username is None and password is None: log.debug("Checking configuration file for DB credentials") username, password = config.db_credentials cookie = self.request_sid_cookie(username, password) try: cookie_str = str(cookie, 'utf-8') # Python 3 except TypeError: cookie_str = str(cookie) # Python 2 log.debug("Session cookie: {0}".format(cookie_str)) log.debug("Storing cookie in configuration file") config.set('DB', 'session_cookie', cookie_str) # self._cookies = [cookie] self.restore_session(cookie)
[docs] def login(self, username, password): "Login to the database and store cookies for upcoming requests." log.debug("Logging in to the DB") opener = self._build_opener() values = {'usr': username, 'pwd': password} req = self._make_request(self._login_url, values) try: log.debug("Sending login request") f = opener.open(req) except URLError as e: log.error("Failed to connect to the database -> probably down!") log.error("Error from database server:\n {0}".format(e)) return False html = f.read() failed_auth_message = 'Bad username or password' if failed_auth_message in str(html): log.error(failed_auth_message) return False return True
def _build_opener(self): log.debug("Building opener.") try: from http.cookiejar import CookieJar except ImportError: from cookielib import CookieJar cj = CookieJar() self._cookies = cj opener = build_opener(HTTPCookieProcessor(cj), HTTPHandler()) return opener def _make_request(self, url, values): data = urlencode(values) return Request(url, data.encode('utf-8')) def _post(self, url, data): pass
[docs]def add_datetime(dataframe, timestamp_key='UNIXTIME'): """Add an additional DATETIME column with standar datetime format. This currently manipulates the incoming DataFrame! """ def convert_data(timestamp): return datetime.fromtimestamp(float(timestamp) / 1e3, UTC_TZ) try: log.debug("Adding DATETIME column to the data") converted = dataframe[timestamp_key].apply(convert_data) dataframe['DATETIME'] = converted except KeyError: log.warning("Could not add DATETIME column")
[docs]class StreamDS(object): """Access to the streamds data stored in the KM3NeT database.""" def __init__( self, username=None, password=None, url=None, temporary=False ): self._db = DBManager(username, password, url, temporary) self._stream_df = None self._streams = None self._update_streams() def _update_streams(self): """Update the list of available straems""" content = self._db._get_content("streamds") self._stream_df = read_csv(content).sort_values("STREAM") self._streams = None for stream in self.streams: setattr(self, stream, self.__getattr__(stream)) def __getattr__(self, attr): """Magic getter which optionally populates the function signatures""" if attr in self.streams: stream = attr else: raise AttributeError def func(**kwargs): return self.get(stream, **kwargs) func.__doc__ = self._stream_parameter(stream, "DESCRIPTION") if not SKIP_SIGNATURE_HINTS: sig_dict = OrderedDict() for sel in self.mandatory_selectors(stream): if sel == '-': continue sig_dict[Parameter(sel, Parameter.POSITIONAL_OR_KEYWORD)] = None for sel in self.optional_selectors(stream): if sel == '-': continue sig_dict[Parameter(sel, Parameter.KEYWORD_ONLY)] = None func.__signature__ = Signature(parameters=sig_dict) return func @property
[docs] def streams(self): """A list of available streams""" if self._streams is None: self._streams = list(self._stream_df["STREAM"].values) return self._streams
def _stream_parameter(self, stream, parameter): data = self._stream_df[self._stream_df.STREAM == stream] if 'SELECTORS' in parameter: return list(data[parameter].values[0].split(',')) else: return data[parameter].values[0]
[docs] def mandatory_selectors(self, stream): """A list of mandatory selectors for a given stream""" return self._stream_parameter(stream, 'MANDATORY_SELECTORS')
[docs] def optional_selectors(self, stream): """A list of optional selectors for a given stream""" return self._stream_parameter(stream, 'OPTIONAL_SELECTORS')
[docs] def help(self, stream): """Show the help for a given stream.""" if stream not in self.streams: log.error("Stream '{}' not found in the database.".format(stream)) params = self._stream_df[self._stream_df['STREAM'] == stream].values[0] self._print_stream_parameters(params)
[docs] def print_streams(self): """Print a coloured list of streams and its parameters""" for row in self._stream_df.itertuples(): self._print_stream_parameters(row[1:])
def _print_stream_parameters(self, values): """Print a coloured help for a given tuple of stream parameters.""" cprint("{0}".format(*values), "magenta", attrs=["bold"]) print("{4}".format(*values)) cprint(" available formats: {1}".format(*values), "blue") cprint(" mandatory selectors: {2}".format(*values), "red") cprint(" optional selectors: {3}".format(*values), "green") print()
[docs] def get(self, stream, fmt='txt', **kwargs): """Get the data for a given stream manually""" sel = ''.join(["&{0}={1}".format(k, v) for (k, v) in kwargs.items()]) url = "streamds/{0}.{1}?{2}".format(stream, fmt, sel[1:]) data = self._db._get_content(url) if not data: log.error("No data found at URL '%s'." % url) return if (data.startswith("ERROR")): log.error(data) return if fmt == "txt": return read_csv(data) return data
[docs]class ParametersContainer(object): """Provides easy access to parameters""" def __init__(self, parameters): self._parameters = parameters self._converters = {} self._oid_lookup = defaultdict(lambda: None) @property
[docs] def names(self): "A list of parameter names" return list(self._parameters.keys())
[docs] def get_parameter(self, parameter): "Return a dict for given parameter" parameter = self._get_parameter_name(parameter) return self._parameters[parameter]
[docs] def get_converter(self, parameter): """Generate unit conversion function for given parameter""" if parameter not in self._converters: param = self.get_parameter(parameter) try: scale = float(param['Scale']) except KeyError: scale = 1 def convert(value): # easy_scale = float(param['EasyScale']) # easy_scale_multiplier = float(param['EasyScaleMultiplier']) return value * scale return convert
[docs] def unit(self, parameter): "Get the unit for given parameter" parameter = self._get_parameter_name(parameter).lower() return self._parameters[parameter]['Unit']
[docs] def oid2name(self, oid): "Look up the parameter name for a given OID" if not self._oid_lookup: for name, data in self._parameters.items(): self._oid_lookup[data['OID']] = data['Name'] return self._oid_lookup[oid]
def _get_parameter_name(self, name): if name in self.names: return name aliases = [n for n in self.names if n.endswith(' ' + name)] if len(aliases) == 1: log.info("Alias found for {0}: {1}".format(name, aliases[0])) return aliases[0] log.info( "Parameter '{0}' not found, trying to find alternative.". format(name) ) try: # ahrs_g[0] for example should be looked up as ahrs_g alternative = re.findall(r'(.*)\[[0-9*]\]', name)[0] log.info("Found alternative: '{0}'".format(alternative)) return alternative except IndexError: raise KeyError("Could not find alternative for '{0}'".format(name))
[docs]class DOMContainer(object): """Provides easy access to DOM parameters stored in the DB.""" def __init__(self, doms): self._json = doms self._ids = []
[docs] def ids(self, det_id): """Return a list of DOM IDs for given detector""" return [dom['DOMId'] for dom in self._json if dom['DetOID'] == det_id]
[docs] def clbupi2domid(self, clb_upi, det_id): """Return DOM ID for given CLB UPI and detector""" return self._json_list_lookup('CLBUPI', clb_upi, 'DOMId', det_id)
[docs] def clbupi2floor(self, clb_upi, det_id): """Return Floor ID for given CLB UPI and detector""" return self._json_list_lookup('CLBUPI', clb_upi, 'Floor', det_id)
[docs] def domid2floor(self, dom_id, det_id): """Return Floor ID for given DOM ID and detector""" return self._json_list_lookup('DOMId', dom_id, 'Floor', det_id)
[docs] def via_omkey(self, omkey, det_id): """Return DOM for given OMkey (DU, floor)""" du, floor = omkey try: return DOM.from_json([ d for d in self._json if d["DU"] == du and d["Floor"] == floor and d["DetOID"] == det_id ][0]) except IndexError: log.critical( "No DOM found for OMKey '{0}' and DetOID '{1}'.".format( omkey, det_id
) )
[docs] def via_dom_id(self, dom_id, det_id): """Return DOM for given dom_id""" try: return DOM.from_json([ d for d in self._json if d["DOMId"] == dom_id and d["DetOID"] == det_id ][0]) except IndexError: log.critical("No DOM found for DOM ID '{0}'".format(dom_id))
[docs] def via_clb_upi(self, clb_upi, det_id): """return DOM for given CLB UPI""" try: return DOM.from_json([ d for d in self._json if d["CLBUPI"] == clb_upi and d["DetOID"] == det_id ][0]) except IndexError: log.critical("No DOM found for CLB UPI '{0}'".format(clb_upi))
def _json_list_lookup(self, from_key, value, target_key, det_id): lookup = [ dom[target_key] for dom in self._json if dom[from_key] == value and dom['DetOID'] == det_id ] if len(lookup) > 1: log.warning( "Multiple entries found: {0}".format(lookup) + "\n" + "Returning the first one." ) return lookup[0]
[docs]class DOM(object): """Represents a DOM""" def __init__(self, clb_upi, dom_id, dom_upi, du, det_oid, floor): self.clb_upi = clb_upi self.dom_id = dom_id self.dom_upi = dom_upi self.du = du self.det_oid = det_oid self.floor = floor @classmethod
[docs] def from_json(cls, json): return cls( json["CLBUPI"], json["DOMId"], json["DOMUPI"], json["DU"], json["DetOID"], json["Floor"]
) @property
[docs] def omkey(self): return (self.du, self.floor)
def __str__(self): return "DU{0}-DOM{1}".format(self.du, self.floor) def __repr__(self): return ( "{0} - DOM ID: {1}\n" " DOM UPI: {2}\n" " CLB UPI: {3}\n" " DET OID: {4}\n".format( self.__str__(), self.dom_id, self.dom_upi, self.clb_upi, self.det_oid
) )
[docs]class TriggerSetup(object): def __init__( self, runsetup_oid, name, det_id, description, optical_df, acoustic_df ): self.runsetup_oid = runsetup_oid self.name = name self.det_id = det_id self.description = description self.optical_df = optical_df self.acoustic_df = acoustic_df def __str__(self): text = ( "Runsetup OID: {}\n" "Name: {}\n" "Detector ID: {}\n" "Description:\n {}\n\n".format( self.runsetup_oid, self.name, self.det_id, self.description ) ) for df, parameters in zip(["Optical", "Acoustic"], [self.optical_df, self.acoustic_df]): text += "{} Datafilter:\n".format(df) for parameter, value in parameters.items(): text += " {}: {}\n".format(parameter, value) text += "\n" return text def __repr__(self): return str(self)
[docs]def clbupi2ahrsupi(clb_upi): """Generate AHRS UPI from CLB UPI.""" return re.sub('.*/.*/2.', '3.4.3.4/AHRS/1.', clb_upi)
[docs]def show_ahrs_calibration(clb_upi, version='3'): """Show AHRS calibration data for given `clb_upi`.""" db = DBManager() ahrs_upi = clbupi2ahrsupi(clb_upi) print("AHRS UPI: {}".format(ahrs_upi)) content = db._get_content("show_product_test.htm?upi={0}&" "testtype=AHRS-CALIBRATION-v{1}&n=1&out=xml" .format(ahrs_upi, version)) \ .replace('\n', '') import xml.etree.ElementTree as ET try: root = ET.parse(io.StringIO(content)).getroot() except ET.ParseError: print("No calibration data found") else: for child in root: print("{}: {}".format(child.tag, child.text)) names = [c.text for c in root.findall(".//Name")] values = [[i.text for i in c] for c in root.findall(".//Values")] for name, value in zip(names, values): print("{}: {}".format(name, value))
[docs]class CLBMap(object):
[docs] par_map = {'DETOID': 'det_oid', 'UPI': 'upi', 'DOMID': 'dom_id'}
def __init__(self, det_oid): self.log = get_logger('CLBMap') if isinstance(det_oid, numbers.Integral): self.log.warning( "Det ID ('%s') provided instead of det OID " "(string representation). Trying to get the OID instead..." % det_oid ) db = DBManager() _det_oid = db.get_det_oid(det_oid) if _det_oid is not None: self.log.warning( "This is the det OID for det ID '%s', please use " "that in future: %s" % (det_oid, _det_oid) ) det_oid = _det_oid self.det_oid = det_oid sds = StreamDS() self._data = sds.clbmap(detoid=det_oid) self._by = {} def __len__(self): return len(self._data) @property
[docs] def upi(self): """A dict of CLBs with UPI as key""" parameter = 'UPI' if parameter not in self._by: self._populate(by=parameter) return self._by[parameter]
@property
[docs] def dom_id(self): """A dict of CLBs with DOM ID as key""" parameter = 'DOMID' if parameter not in self._by: self._populate(by=parameter) return self._by[parameter]
[docs] def base(self, du): """Return the base CLB for a given DU""" parameter = 'base' if parameter not in self._by: self._by[parameter] = {} for clb in self.upi.values(): if clb.floor == 0: self._by[parameter][clb.du] = clb return self._by[parameter][du]
def _populate(self, by): data = {} for _, row in self._data.iterrows(): data[row[by]] = CLB( det_oid=row['DETOID'], floor=row['FLOORID'], du=row['DUID'], serial_number=row['SERIALNUMBER'], upi=row['UPI'], dom_id=row['DOMID'] ) self._by[by] = data
[docs]CLB = namedtuple( 'CLB', ['det_oid', 'floor', 'du', 'serial_number', 'upi', 'dom_id']
)