#!/usr/bin/env python
# Filename: jpp.py
# pylint: disable=
"""
Pump for the jpp file read through aanet interface.
"""
from __future__ import absolute_import, print_function, division
from km3pipe.core import Pump, Blob
from km3pipe.controlhost import Client
from km3pipe.time import Cuckoo
from km3pipe.logger import get_logger
import threading
import socket
import time
import numpy as np
from collections import deque
try:
from Queue import Queue, Empty
except ImportError:
from queue import Queue, Empty
__author__ = "Tamas Gal"
__copyright__ = "Copyright 2016, Tamas Gal and the KM3NeT collaboration."
__credits__ = []
__license__ = "MIT"
__maintainer__ = "Tamas Gal"
__email__ = "tgal@km3net.de"
__status__ = "Development"
[docs]log = get_logger(__name__) # pylint: disable=C0103
[docs]class CHPump(Pump):
"""A pump for ControlHost data."""
def _start_thread(self):
log.debug("Starting and demonising thread.")
self.thread = threading.Thread(target=self._run, args=())
self.thread.daemon = True
self.thread.start()
def _init_controlhost(self):
"""Set up the controlhost connection"""
log.debug("Connecting to JLigier")
self.client = Client(self.host, self.port)
self.client._connect()
log.debug("Subscribing to tags: {0}".format(self.tags))
for tag in self.tags.split(','):
self.client.subscribe(tag.strip(), mode=self.subscription_mode)
log.debug("Controlhost initialisation done.")
def _run(self):
log.debug("Entering the main loop.")
while True:
current_qsize = self.queue.qsize()
log.info("----- New loop cycle #{0}".format(self.loop_cycle))
log.info("Current queue size: {0}".format(current_qsize))
self.loop_cycle += 1
try:
log.debug("Waiting for data from network...")
# self._add_packet_dt()
prefix, data = self.client.get_message()
# self.performance_warn()
log.debug("{0} bytes received from network.".format(len(data)))
except EOFError:
log.warning("EOF from Ligier, trying again in 30 seconds...")
time.sleep(30)
continue
except BufferError:
log.error("Buffer error in Ligier stream, aborting...")
break
if not data:
log.critical(
"No data received, connection died.\n" +
"Trying to reconnect in 30 seconds."
)
time.sleep(30)
try:
log.debug("Reinitialising new CH connection.")
self._init_controlhost()
except socket.error:
log.error("Failed to connect to host.")
continue
if current_qsize > self.max_queue:
self.cuckoo_warn(
"Maximum queue size ({0}) reached, "
"dropping data.".format(self.max_queue)
)
else:
log.debug("Filling data into queue.")
self.queue.put((prefix, data))
log.debug("Quitting the main loop.")
[docs] def process(self, blob):
"""Wait for the next packet and put it in the blob"""
# self._add_process_dt()
try:
log.debug("Waiting for queue items.")
prefix, data = self.queue.get(timeout=self.timeout)
log.debug("Got {0} bytes from queue.".format(len(data)))
except Empty:
log.warning(
"ControlHost timeout ({0}s) reached".format(self.timeout)
)
raise StopIteration("ControlHost timeout reached.")
blob[self.key_for_prefix] = prefix
blob[self.key_for_data] = data
return blob
)
)
def _add_process_dt(self):
now = time.time()
self.process_dt.append(now - self.process_timer)
self.process_timer = now
def _add_packet_dt(self):
now = time.time()
self.packet_dt.append(now - self.packet_timer)
self.packet_timer = now
[docs] def finish(self):
"""Clean up the JLigier controlhost connection"""
log.debug("Disconnecting from JLigier.")
self.client.socket.shutdown(socket.SHUT_RDWR)
self.client._disconnect()
def __iter__(self):
return self
def __next__(self):
return self.process(Blob())
return blob
[docs] def next(self):
return self.__next__()
[docs]def CHTagger(blob):
tag = str(blob['CHPrefix'].tag)
blob[tag] = True
return blob