from collections import namedtuple
import datetime
import logging
import os
import random
import time
import uuid
from dotenv import load_dotenv
from hop import Stream
from hop.plugins.snews import SNEWSHeartbeat, SNEWSObservation
logger = logging.getLogger("snews")
Detector = namedtuple("Detector", "detector_id location")
[docs]def generate_message(time_string_format, detectors, alert_probability=0.1):
"""Generate fake SNEWS alerts/heartbeats.
"""
detector = detectors[random.randint(0, len(detectors) - 1)]
if random.random() > alert_probability:
logging.debug(f"generating heartbeat from {detector.location} at {detector.detector_id}")
return SNEWSHeartbeat(
message_id=str(uuid.uuid4()),
detector_id=detector.detector_id,
sent_time=datetime.datetime.utcnow().strftime(time_string_format),
machine_time=datetime.datetime.utcnow().strftime(time_string_format),
location=detector.location,
status="On",
content="For testing",
)
else:
logging.debug(f"generating alert from {detector.location} at {detector.detector_id}")
return SNEWSObservation(
message_id=str(uuid.uuid4()),
detector_id=detector.detector_id,
sent_time=datetime.datetime.utcnow().strftime(time_string_format),
neutrino_time=datetime.datetime.utcnow().strftime(time_string_format),
machine_time=datetime.datetime.utcnow().strftime(time_string_format),
location=detector.location,
p_value=0.5,
status="On",
content="For testing",
)
def _add_parser_args(parser):
"""Parse arguments for broker, configurations and options
"""
parser.add_argument('-v', '--verbose', action='count', default=0, help="Be verbose.")
parser.add_argument('-f', '--env-file', type=str, help="The path to the .env file.")
parser.add_argument("--no-auth", action="store_true", help="If set, disable authentication.")
parser.add_argument('-d', '--detector', type=str,
help=("Set a specific detector:location pair to simulate messages from. "
"If not set, generates messages from multiple random locations."))
parser.add_argument('--rate', type=float, default=0.5,
help="Rate to send alerts, default=0.5s")
parser.add_argument('--alert-probability', type=float, default=0.1,
help="Probability of generating an alert. Default = 0.1.")
parser.add_argument('-p',
'--persist',
action="store_true",
help="If set, persist and send messages indefinitely. "
"Otherwise send a single message.")
[docs]def main(args):
"""generate synthetic observation/heartbeat messages
"""
# set up logging
verbosity = [logging.WARNING, logging.INFO, logging.DEBUG]
logging.basicConfig(
level=verbosity[min(args.verbose, 2)],
format="%(asctime)s | model : %(levelname)s : %(message)s",
)
# load environment variables
load_dotenv(dotenv_path=args.env_file)
# choose set of detector/location pairs
if args.detector:
detectors = [Detector(*args.detector.split(":"))]
else:
detectors = [
Detector("DETECTOR 1", "Houston"),
Detector("DETECTOR 2", "Seattle"),
Detector("DETECTOR 3", "Los Angeles"),
]
# configure and open observation stream
logger.info("starting up stream")
stream = Stream(auth=(not args.no_auth))
source = stream.open(os.getenv("OBSERVATION_TOPIC"), "w")
# generate messages
logger.info(f"publishing messages to {os.getenv('OBSERVATION_TOPIC')}")
try:
# send one message, then persist if specified
message = generate_message(
os.getenv("TIME_STRING_FORMAT"),
detectors,
alert_probability=args.alert_probability,
)
source.write(message)
time.sleep(args.rate)
while args.persist:
message = generate_message(
os.getenv("TIME_STRING_FORMAT"),
detectors,
alert_probability=args.alert_probability,
)
source.write(message)
time.sleep(args.rate)
except KeyboardInterrupt:
pass
finally:
logger.info("shutting down")
source.close()