Source code for snews.decider

from abc import ABC, abstractmethod
import datetime

from . import storage


[docs]class IDecider(ABC): @abstractmethod def deciding(self): pass @abstractmethod def addMessage(self, time, neutrino_time, message): pass @abstractmethod def getAllMessages(self): pass
[docs]class Decider(IDecider): detector_id = { 1: "Super-K", 2: "Hyper-K", 3: "SNO+", 4: "KamLAND", 5: "LVD", 6: "ICE", 7: "Borexino", 8: "HALO-1kT", 9: "HALO", 10: "NOvA", 11: "KM3NeT", 12: "Baksan", 13: "JUNO", 14: "LZ ", 15: "DUNE", 16: "MicroBooNe", 17: "SBND", 18: "DS-20K", 19: "XENONnT", 20: "PandaX-4T", } def __init__(self, coinc_threshold=10, msg_expiration=120, datetime_format="%y/%m/%d %H:%M:%S", mongo_server="mongodb://localhost:27017/", drop_db=True): """ The constructor. :param coinc_threshold: maximum time (s) between messages for them to be considered coincident :param msg_expiration: maximum time (s) that a message will be stored in the database cache before expiring :param datetime_format: date format to convert from a string :param mongo_server: URL string of the mongodb server address :param drop_db: boolean specifying whether to clear previous database storage """ # intialize and use mongo storage self.db = storage.MongoStorage(msg_expiration, datetime_format, mongo_server, drop_db) self.coinc_threshold = coinc_threshold
[docs] def deciding(self): """Implements the SNEWS coincidence requirement protocol to check cached messages and determine whether an alert message will be sent. :return: True or false indicating a coincidence between messages """ if not self.db.cacheEmpty(): cacheMsgs = self.db.getCacheMsgs() prev = datetime.datetime.min prev_location = "FOO LOCATION" for msg in cacheMsgs: neutrinoTime = msg["neutrino_time"] # go through messages to check if any two or more are within the time threshold if neutrinoTime - datetime.timedelta(seconds=self.coinc_threshold) <= prev: # verify the locations are different if msg["location"] != prev_location: return True prev = neutrinoTime prev_location = msg["location"] return False
# return not self.db.cacheEmpty()
[docs] def addMessage(self, message): """ :param message: message from a hopskotch kafka stream :return: """ # insert message into the deque self.db.insert(message.sent_time, message.neutrino_time, message.asdict())
[docs] def getCacheMessages(self): """ Get messages that have not expired. :return: """ return self.db.getCacheMsgs()
[docs] def getAllMessages(self): """ Get all messages in history. :return: """ return self.db.getAllMessages()