Source code for gwcelery.tools.nagios
"""A Nagios plugin for monitoring GWCelery.
See https://nagios-plugins.org/doc/guidelines.html.
"""
import glob
from enum import IntEnum
from pathlib import Path
from sys import exit
from traceback import format_exc, format_exception
import click
import kombu.exceptions
import lal
import numpy as np
from gwpy.time import tconvert
# Make sure that all tasks are registered
from .. import tasks # noqa: F401
[docs]class NagiosPluginStatus(IntEnum):
"""Nagios plugin status codes."""
OK = 0
WARNING = 1
CRITICAL = 2
UNKNOWN = 3
[docs]class NagiosCriticalError(Exception):
"""An exception that maps to a Nagios status of `CRITICAL`."""
[docs]def get_active_queues(inspector):
return {queue['name']
for queues in (inspector.active_queues() or {}).values()
for queue in queues}
[docs]def get_active_igwn_alert_topics(inspector):
return {node for stat in inspector.stats().values()
for node in stat.get('igwn-alert-topics', ())}
[docs]def get_expected_queues(app):
# Get the queues for all registered tasks.
result = {getattr(task, 'queue', None) for task in app.tasks.values()}
# We use 'celery' for all tasks that do not explicitly specify a queue.
result -= {None}
result |= {'celery'}
# Done.
return result
[docs]def get_expected_igwn_alert_topics(app):
return app.conf['igwn_alert_topics']
[docs]def get_active_voevent_peers(inspector):
stats = inspector.stats()
broker_peers, receiver_peers = (
{peer for stat in stats.values() for peer in stat.get(key, ())}
for key in ['voevent-broker-peers', 'voevent-receiver-peers'])
return broker_peers, receiver_peers
[docs]def get_expected_kafka_bootstep_urls(inspector):
stats = inspector.stats()
expected_kafka_urls = \
{peer for stat in stats.values() for peer in
stat.get('kafka_topic_up', {})}
return expected_kafka_urls
[docs]def get_active_kafka_bootstep_urls(inspector):
stats = inspector.stats()
active_kafka_urls = \
{kafka_url for stat in stats.values() for kafka_url, active_flag in
stat.get('kafka_topic_up', {}).items() if active_flag}
return active_kafka_urls
[docs]def get_undelivered_message_urls(inspector):
stats = inspector.stats()
undelievered_messages = \
{kafka_url for stat in stats.values() for kafka_url, active_flag in
stat.get('kafka_delivery_failures', {}).items() if active_flag}
return undelievered_messages
[docs]def get_active_kafka_consumer_bootstep_names(inspector):
stats = inspector.stats()
active_kafka_consumer_urls = {consumer for stat in stats.values() for
consumer in stat.get(
'active_kafka_consumers', ()
)}
return active_kafka_consumer_urls
[docs]def get_expected_kafka_consumer_bootstep_names(app):
return {name for name in app.conf['kafka_consumer_config'].keys()}
[docs]def get_celery_queue_length(app):
return app.backend.client.llen("celery")
[docs]def get_recent_mdc_superevents():
"""Get MDC superevents in last six hours"""
t_upper = lal.GPSTimeNow()
t_lower = t_upper - 6 * 3600
query = "{} .. {} {}".format(t_lower, t_upper, 'MDC')
recent_superevents = tasks.gracedb.get_superevents(query)
return recent_superevents, t_lower, t_upper
[docs]def get_distr_delay_latest_llhoft(app):
"""Get the GPS time of the latest llhoft data distributed to the node"""
detectors = ['H1', 'L1']
max_delays = {}
now = int(lal.GPSTimeNow())
for ifo in detectors:
pattern = app.conf['llhoft_glob'].format(detector=ifo)
filenames = sorted(glob.glob(pattern))
try:
latest_gps = int(filenames[-1].split('-')[-2])
max_delays[ifo] = now - latest_gps
except IndexError:
max_delays[ifo] = 999999999
return max_delays
[docs]def check_status(app):
# Check if '/dev/shm/kafka/' exists, otherwise skip
if Path(app.conf['llhoft_glob']).parents[1].exists():
max_llhoft_delays = get_distr_delay_latest_llhoft(app)
max_delay = 10 * 60 # 10 minutes of no llhoft is worrying
if any(np.array(list(max_llhoft_delays.values())) > max_delay):
raise NagiosCriticalError(
'Low-latency hoft is not being streamed') \
from AssertionError(
f"Newest llhoft is this many seconds old: "
f"{str(max_llhoft_delays)}")
connection = app.connection()
try:
connection.ensure_connection(max_retries=1)
except kombu.exceptions.OperationalError as e:
raise NagiosCriticalError('No connection to broker') from e
inspector = app.control.inspect()
active = get_active_queues(inspector)
expected = get_expected_queues(app)
missing = expected - active
if missing:
raise NagiosCriticalError('Not all expected queues are active') from \
AssertionError('Missing queues: ' + ', '.join(missing))
active = get_active_igwn_alert_topics(inspector)
expected = get_expected_igwn_alert_topics(app)
missing = expected - active
extra = active - expected
if missing:
raise NagiosCriticalError('Not all IGWN alert topics are subscribed') \
from AssertionError('Missing topics: ' + ', '.join(missing))
if extra:
raise NagiosCriticalError(
'Too many IGWN alert topics are subscribed') from AssertionError(
'Extra topics: ' + ', '.join(extra))
broker_peers, receiver_peers = get_active_voevent_peers(inspector)
if app.conf['voevent_broadcaster_whitelist'] and not broker_peers:
raise NagiosCriticalError(
'The VOEvent broker has no active connections') \
from AssertionError('voevent_broadcaster_whitelist: {}'.format(
app.conf['voevent_broadcaster_whitelist']))
if app.conf['voevent_receiver_address'] and not receiver_peers:
raise NagiosCriticalError(
'The VOEvent receiver has no active connections') \
from AssertionError('voevent_receiver_address: {}'.format(
app.conf['voevent_receiver_address']))
active = get_active_kafka_bootstep_urls(inspector)
expected = get_expected_kafka_bootstep_urls(inspector)
missing = expected - active
if missing:
raise NagiosCriticalError('Not all Kafka bootstep URLs are active') \
from AssertionError('Missing urls: ' + ', '.join(missing))
undelivered_messages = get_undelivered_message_urls(inspector)
if undelivered_messages:
raise NagiosCriticalError(
'Not all Kafka messages have been succesfully delivered'
) from AssertionError(
'URLs with undelivered messages: ' + ', '.join(missing)
)
celery_queue_length = get_celery_queue_length(app)
if celery_queue_length > 50:
raise NagiosCriticalError(
'Tasks are piled up in Celery queue') from AssertionError(
'Length of celery queue is {}'.format(celery_queue_length))
recent_mdc_superevents, t_lower, t_now = get_recent_mdc_superevents()
no_superevents = len(recent_mdc_superevents) == 0
to_utc = lambda t: tconvert(t).isoformat() # noqa E731
if no_superevents:
raise NagiosCriticalError(
'No MDC superevents found in past six hours') \
from AssertionError(
f'Last entry earlier than GPSTime {t_lower} = '
f'{to_utc(t_lower)} UTC')
last_superevent = recent_mdc_superevents[0]
# check presence in last hour with a tolerance
none_in_last_hour = (
t_now - tconvert(last_superevent['created'])
) > (3600 + 600)
if none_in_last_hour:
raise NagiosCriticalError(
'No MDC superevents found in last one hour') \
from AssertionError(
f"Last entry is for {last_superevent['superevent_id']}"
f"GPSTime {tconvert(last_superevent['created'])} ="
f"{last_superevent['created']}")
active = get_active_kafka_consumer_bootstep_names(inspector)
expected = get_expected_kafka_consumer_bootstep_names(app)
missing = expected - active
if missing:
raise NagiosCriticalError('Not all Kafka consumer bootstep topics are '
'active') \
from AssertionError('Missing urls: ' + ', '.join(missing))
@click.command(help=__doc__)
@click.pass_context
def nagios(ctx):
try:
check_status(ctx.obj.app)
except NagiosCriticalError as e:
status = NagiosPluginStatus.CRITICAL
output, = e.args
e = e.__cause__
detail = ''.join(format_exception(type(e), e, e.__traceback__))
except: # noqa: E722
status = NagiosPluginStatus.UNKNOWN
output = 'Unexpected error'
detail = format_exc()
else:
status = NagiosPluginStatus.OK
output = 'Running normally'
detail = None
print('{}: {}'.format(status.name, output))
if detail:
print(detail)
exit(status)