#!/usr/bin/env python
import datetime
import logging
import os
import uuid
from dotenv import load_dotenv
import jsonschema
from jsonschema import validate
from hop import Stream
from hop.plugins.snews import SNEWSAlert, SNEWSHeartbeat, SNEWSObservation
from . import decider
from . import msgSchema
logger = logging.getLogger("snews")
def _add_parser_args(parser):
"""Parse arguments for broker, configurations and options
"""
parser.add_argument('-v', '--verbose', action='count', default=0, help="Be verbose.")
parser.add_argument('-f', '--env-file', type=str, help="The path to the .env file.")
parser.add_argument("--no-auth", action="store_true", help="If set, disable authentication.")
[docs]def validateJson(jsonData, jsonSchema):
"""
Function for validate a json data using a json schema.
:param jsonData: the data to validate.
:param jsonSchema: the schema assumed to be correct.
:return: true or false
"""
try:
validate(instance=jsonData, schema=jsonSchema)
except jsonschema.exceptions.ValidationError:
return False
return True
class Model(object):
def __init__(self, args):
"""
The constructor of the model class.
:param args: the command line arguments
"""
# load environment variables
load_dotenv(dotenv_path=args.env_file)
self.args = args
self.gcnFormat = "json"
self.coinc_threshold = int(os.getenv("COINCIDENCE_THRESHOLD"))
self.msg_expiration = int(os.getenv("MSG_EXPIRATION"))
self.db_server = os.getenv("DATABASE_SERVER")
self.drop_db = bool(os.getenv("NEW_DATABASE"))
self.regularMsgSchema = msgSchema.regularMsgSchema
logger.info(f"setting up decider at: {self.db_server}")
self.myDecider = decider.Decider(
self.coinc_threshold,
self.msg_expiration,
os.getenv("TIME_STRING_FORMAT"),
os.getenv("DATABASE_SERVER"),
self.drop_db
)
if self.drop_db:
logger.info("clearing out decider cache")
self.deciderUp = False
# specify topics
self.observation_topic = os.getenv("OBSERVATION_TOPIC")
self.alert_topic = os.getenv("ALERT_TOPIC")
# open up stream connections
self.stream = Stream(auth=(not args.no_auth), persist=True)
self.source = self.stream.open(self.observation_topic, "r")
self.sink = self.stream.open(self.alert_topic, "w")
# message types and processing algorithms
self.mapping = {
SNEWSObservation.__name__: self.processObservationMessage,
SNEWSHeartbeat.__name__: self.processHeartbeatMessage
}
def run(self):
"""
Execute the model.
:return: none
"""
self.deciderUp = True
logger.info("starting decider")
logger.info(f"processing messages from {self.observation_topic}")
for msg, meta in self.source.read(batch_size=1, metadata=True, autocommit=False):
self.processMessage(msg)
self.source.mark_done(meta)
def close(self):
"""
Close stream connections.
"""
logger.info("shutting down")
self.deciderUp = False
self.source.close()
self.sink.close()
def addObservationMsg(self, message):
self.myDecider.addMessage(message)
def processMessage(self, message):
message_type = type(message).__name__
logger.debug(f"processing {message_type}")
if message_type in self.mapping:
self.mapping[message_type](message)
def processObservationMessage(self, message):
self.addObservationMsg(message)
alert = self.myDecider.deciding()
if alert:
# publish alert message to ALERT_TOPIC
logger.info("found coincidence, sending alert")
self.sink.write(self.writeAlertMsg())
def processHeartbeatMessage(self, message):
pass
def writeAlertMsg(self):
return SNEWSAlert(
message_id=str(uuid.uuid4()),
sent_time=datetime.datetime.utcnow().strftime(os.getenv("TIME_STRING_FORMAT")),
machine_time=datetime.datetime.utcnow().strftime(os.getenv("TIME_STRING_FORMAT")),
content="SNEWS Alert: a coincidence between detectors has been observed.",
)
[docs]def main(args):
"""main function
"""
# set up logging
verbosity = [logging.WARNING, logging.INFO, logging.DEBUG]
logging.basicConfig(
level=verbosity[min(args.verbose, 2)],
format="%(asctime)s | model : %(levelname)s : %(message)s",
)
# start up
model = Model(args)
try:
model.run()
except KeyboardInterrupt:
pass
finally:
model.close()