# Filename: tools.py
# pylint: disable=C0103
"""
Some unsorted, frequently used logic.
"""
from __future__ import absolute_import, print_function, division
import base64
import collections
from datetime import datetime, timedelta
import functools
import os
import re
import socket
import subprocess
import sys
import numpy as np
__author__ = "Tamas Gal and Moritz Lotze"
__copyright__ = "Copyright 2016, Tamas Gal and the KM3NeT collaboration."
__credits__ = ["Konstantin Lepa <konstantin.lepa@gmail.com> for termcolor"]
__license__ = "MIT"
__maintainer__ = "Tamas Gal and Moritz Lotze"
__email__ = "tgal@km3net.de"
__status__ = "Development"
[docs]def ifiles(irods_path):
"""Return a list of filenames for given iRODS path (recursively)"""
raw_output = subprocess.check_output(
"ils -r --bundle {0}"
" | grep 'Bundle file:'"
" | awk '{{print $3}}'".format(irods_path),
shell=True
)
filenames = raw_output.decode('ascii').strip().split("\n")
return filenames
[docs]def iexists(irods_path):
"""Returns True of iRODS path exists, otherwise False"""
try:
subprocess.check_output(
'ils {}'.format(irods_path),
shell=True,
stderr=subprocess.PIPE,
)
return True
except subprocess.CalledProcessError:
return False
[docs]def token_urlsafe(nbytes=32):
"""Return a random URL-safe text string, in Base64 encoding.
This is taken and slightly modified from the Python 3.6 stdlib.
The string has *nbytes* random bytes. If *nbytes* is ``None``
or not supplied, a reasonable default is used.
>>> token_urlsafe(16) #doctest:+SKIP
'Drmhze6EPcv0fN_81Bj-nA'
"""
tok = os.urandom(nbytes)
return base64.urlsafe_b64encode(tok).rstrip(b'=').decode('ascii')
[docs]def insert_prefix_to_dtype(arr, prefix):
new_cols = [prefix + '_' + col for col in arr.dtype.names]
arr.dtype.names = new_cols
return arr
[docs]def prettyln(text, fill='-', align='^', prefix='[ ', suffix=' ]', length=69):
"""Wrap `text` in a pretty line with maximum length."""
text = '{prefix}{0}{suffix}'.format(text, prefix=prefix, suffix=suffix)
print(
"{0:{fill}{align}{length}}".format(
text, fill=fill, align=align, length=length
)
)
[docs]def irods_filepath(det_id, run_id):
"""Generate the iRODS filepath for given detector (O)ID and run ID"""
data_path = "/in2p3/km3net/data/raw/sea"
from km3pipe.db import DBManager
if not isinstance(det_id, int):
dts = DBManager().detectors
det_id = int(dts[dts.OID == det_id].SERIALNUMBER.values[0])
return data_path + "/KM3NeT_{0:08}/{2}/KM3NeT_{0:08}_{1:08}.root" \
.format(det_id, run_id, run_id//1000)
[docs]def unpack_nfirst(seq, nfirst):
"""Unpack the nfrist items from the list and return the rest.
>>> a, b, c, rest = unpack_nfirst((1, 2, 3, 4, 5), 3)
>>> a, b, c
(1, 2, 3)
>>> rest
(4, 5)
"""
iterator = iter(seq)
for _ in range(nfirst):
yield next(iterator, None)
yield tuple(iterator)
[docs]def split(string, callback=None, sep=None):
"""Split the string and execute the callback function on each part.
>>> string = "1 2 3 4"
>>> parts = split(string, int)
>>> parts
[1, 2, 3, 4]
"""
if callback is not None:
return [callback(i) for i in string.split(sep)]
else:
return string.split(sep)
[docs]def namedtuple_with_defaults(typename, field_names, default_values=[]):
"""Create a namedtuple with default values
>>> Node = namedtuple_with_defaults('Node', 'val left right')
>>> Node()
Node(val=None, left=None, right=None)
>>> Node = namedtuple_with_defaults('Node', 'val left right', [1, 2, 3])
>>> Node()
Node(val=1, left=2, right=3)
>>> Node = namedtuple_with_defaults('Node', 'val left right', {'right':7})
>>> Node()
Node(val=None, left=None, right=7)
>>> Node(4)
Node(val=4, left=None, right=7)
"""
the_tuple = collections.namedtuple(typename, field_names)
the_tuple.__new__.__defaults__ = (None, ) * len(the_tuple._fields)
if isinstance(default_values, collections.Mapping):
prototype = the_tuple(**default_values)
else:
prototype = the_tuple(*default_values)
the_tuple.__new__.__defaults__ = tuple(prototype)
return the_tuple
[docs]def remain_file_pointer(function):
"""Remain the file pointer position after calling the decorated function
This decorator assumes that the last argument is the file handler.
"""
def wrapper(*args, **kwargs):
"""Wrap the function and remain its parameters and return values"""
file_obj = args[-1]
old_position = file_obj.tell()
return_value = function(*args, **kwargs)
file_obj.seek(old_position, 0)
return return_value
return wrapper
[docs]def itervalues(d):
return iter(d.values())
[docs]def iteritems(d):
return iter(d.items())
[docs]def decamelise(text):
"""Convert CamelCase to lower_and_underscore."""
s = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', text)
return re.sub('([a-z0-9])([A-Z])', r'\1_\2', s).lower()
[docs]def camelise(text, capital_first=True):
"""Convert lower_underscore to CamelCase."""
def camelcase():
if not capital_first:
yield str.lower
while True:
yield str.capitalize
if istype(text, 'unicode'):
text = text.encode('utf8')
c = camelcase()
return "".join(next(c)(x) if x else '_' for x in text.split("_"))
[docs]ATTRIBUTES = dict(
list(
zip([
'bold', 'dark', '', 'underline', 'blink', '', 'reverse',
'concealed'
], list(range(1, 9)))
)
)
del ATTRIBUTES['']
[docs]ATTRIBUTES_RE = r'\033\[(?:%s)m' % '|' \
.join(['%d' % v for v in ATTRIBUTES.values()])
[docs]HIGHLIGHTS = dict(
list(
zip([
'on_grey', 'on_red', 'on_green', 'on_yellow', 'on_blue',
'on_magenta', 'on_cyan', 'on_white'
], list(range(40, 48)))
)
)
[docs]HIGHLIGHTS_RE = r'\033\[(?:%s)m' % '|' \
.join(['%d' % v for v in HIGHLIGHTS.values()])
[docs]COLORS = dict(
list(
zip([
'grey',
'red',
'green',
'yellow',
'blue',
'magenta',
'cyan',
'white',
], list(range(30, 38)))
)
)
[docs]COLORS_RE = r'\033\[(?:%s)m' % '|'.join(['%d' % v for v in COLORS.values()])
[docs]def colored(text, color=None, on_color=None, attrs=None, ansi_code=None):
"""Colorize text, while stripping nested ANSI color sequences.
Author: Konstantin Lepa <konstantin.lepa@gmail.com> / termcolor
Available text colors:
red, green, yellow, blue, magenta, cyan, white.
Available text highlights:
on_red, on_green, on_yellow, on_blue, on_magenta, on_cyan, on_white.
Available attributes:
bold, dark, underline, blink, reverse, concealed.
Example:
colored('Hello, World!', 'red', 'on_grey', ['blue', 'blink'])
colored('Hello, World!', 'green')
"""
if os.getenv('ANSI_COLORS_DISABLED') is None:
if ansi_code is not None:
return "\033[38;5;{}m{}\033[0m".format(ansi_code, text)
fmt_str = '\033[%dm%s'
if color is not None:
text = re.sub(COLORS_RE + '(.*?)' + RESET_RE, r'\1', text)
text = fmt_str % (COLORS[color], text)
if on_color is not None:
text = re.sub(HIGHLIGHTS_RE + '(.*?)' + RESET_RE, r'\1', text)
text = fmt_str % (HIGHLIGHTS[on_color], text)
if attrs is not None:
text = re.sub(ATTRIBUTES_RE + '(.*?)' + RESET_RE, r'\1', text)
for attr in attrs:
text = fmt_str % (ATTRIBUTES[attr], text)
return text + RESET
else:
return text
[docs]def cprint(text, color=None, on_color=None, attrs=None):
"""Print colorize text.
Author: Konstantin Lepa <konstantin.lepa@gmail.com> / termcolor
It accepts arguments of print function.
"""
print((colored(text, color, on_color, attrs)))
[docs]def issorted(arr):
"""Check if array is sorted."""
return np.all(np.diff(arr) >= 0)
[docs]def lstrip(text):
"""Remove leading whitespace from each line of a multiline string."""
return '\n'.join(l.lstrip() for l in text.lstrip().split('\n'))
[docs]def chunks(l, n):
"""Yield successive n-sized chunks from l."""
for i in range(0, len(l), n):
yield l[i:i + n]
[docs]def is_coherent(seq):
"""Find out if list of subsequent integers is complete.
Adapted from https://stackoverflow.com/questions/18131741/python-find-out-whether-a-list-of-integers-is-coherent
```
is_coherent([1, 2, 3, 4, 5]) -> True
is_coherent([1, 3, 4, 5]) -> False
```
"""
return np.array_equal(seq, range(seq[0], int(seq[-1] + 1)))
[docs]class AnyBar():
"""A lightweight interface to the AnyBar macOS app."""
def __init__(self, port=1738, address='localhost'):
self.port = port
self.address = address
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
[docs] def change(self, color):
self.sock.sendto(color.encode('utf-8'), (self.address, self.port))
[docs]def zero_pad(m, n=1):
"""Pad a matrix with zeros, on all sides."""
return np.pad(m, (n, n), mode='constant', constant_values=[0])
[docs]def istype(obj, typename):
"""Drop-in replacement for `isinstance` to avoid imports"""
return type(obj).__name__ == typename
[docs]def isnotebook():
"""Check if running within a Jupyter notebook"""
try:
shell = get_ipython().__class__.__name__
if shell == 'ZMQInteractiveShell':
return True # Jupyter notebook or qtconsole
elif shell == 'TerminalInteractiveShell':
return False # Terminal running IPython
else:
return False # Other type (?)
except NameError:
return False
[docs]def supports_color():
"""Checks if the terminal supports color."""
if isnotebook():
return True
supported_platform = sys.platform != 'win32' or 'ANSICON' in os.environ
is_a_tty = hasattr(sys.stdout, 'isatty') and sys.stdout.isatty()
if not supported_platform or not is_a_tty:
return False
return True
[docs]def get_jpp_revision(via_command='JPrint'):
"""Retrieves the Jpp revision number"""
try:
output = subprocess.check_output([via_command, '-v'],
stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as e:
if e.returncode == 1:
output = e.output
else:
return None
except OSError:
return None
revision = output.decode().split('\n')[0].split()[1].strip()
return revision
[docs]def timed_cache(**timed_cache_kwargs):
"""LRU cache decorator with timeout.
Parameters
----------
days: int
seconds: int
microseconds: int
milliseconds: int
minutes: int
hours: int
weeks: int
maxsise: int [default: 128]
typed: bool [default: False]
"""
def _wrapper(f):
maxsize = timed_cache_kwargs.pop('maxsize', 128)
typed = timed_cache_kwargs.pop('typed', False)
update_delta = timedelta(**timed_cache_kwargs)
# nonlocal workaround to support Python 2
# https://technotroph.wordpress.com/2012/10/01/python-closures-and-the-python-2-7-nonlocal-solution/
d = {'next_update': datetime.utcnow() - update_delta}
try:
f = functools.lru_cache(maxsize=maxsize, typed=typed)(f)
except AttributeError:
print(
"LRU caching is not available in Pyton 2.7, "
"this will have no effect!"
)
pass
@functools.wraps(f)
def _wrapped(*args, **kwargs):
now = datetime.utcnow()
if now >= d['next_update']:
try:
f.cache_clear()
except AttributeError:
pass
d['next_update'] = now + update_delta
return f(*args, **kwargs)
return _wrapped
return _wrapper