Source code for gwcelery.igwn_alert.bootsteps

import json
import warnings
from threading import Thread

from adc.errors import KafkaException
from celery import bootsteps
from celery.utils.log import get_logger
from hop.models import JSONBlob
from igwn_alert import client

from .signals import igwn_alert_received

__all__ = ('Receiver',)

log = get_logger(__name__)


# Implemented from https://git.ligo.org/computing/igwn-alert/client/-/blob/main/igwn_alert/client.py  # noqa: E501
# with minor differences
class IGWNAlertClient(client):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.running = False

    def listen(self, callback, topics):
        """
        Set a callback to be executed for each pubsub item received.

        Parameters
        ----------
        callback : callable
            A function of two arguments: the topic and the alert payload.
            When set to :obj:`None`, print out alert payload.
        topics : :obj:`list` of :obj:`str`
            Topic or list of topics to listen to.
        """
        self.running = True
        while self.running:
            self.stream_obj = self.open(self._construct_topic_url(topics), "r")  # noqa: E501
            try:
                with self.stream_obj as s:
                    for payload, metadata in s.read(
                            metadata=True,
                            batch_size=self.batch_size,
                            batch_timeout=self.batch_timeout):
                        # Fix in case message is in new format:
                        if isinstance(payload, JSONBlob):
                            payload = payload.content
                        else:
                            try:
                                payload = json.loads(payload)
                            except (json.JSONDecodeError, TypeError) as e:
                                warnings.warn("Payload is not valid "
                                              "json: {}".format(e))
                        if not callback:
                            print("New message from topic {topic}: {msg}"
                                  .format(topic=metadata.topic, msg=payload))
                        else:
                            callback(topic=metadata.topic.split('.')[1],
                                     payload=payload)
            # FIXME: revisit when https://git.ligo.org/computing/igwn-alert/client/-/issues/19  # noqa: E501
            # is addressed
            except KafkaException as err:
                if err.fatal:
                    # stop running and close before raising error
                    self.running = False
                    self.stream_obj.close()
                    raise
                else:
                    log.warning(
                        "non-fatal error from kafka: {}".format(err.name))


class IGWNAlertBootStep(bootsteps.ConsumerStep):
    """Generic boot step to limit us to appropriate kinds of workers.

    Only include this bootstep in workers that are started with the
    ``--igwn-alerts`` command line option.
    """

    def __init__(self, consumer, igwn_alert=False, **kwargs):
        self.enabled = bool(igwn_alert)

    def start(self, consumer):
        log.info('Starting %s', self.name)

    def stop(self, consumer):
        log.info('Stopping %s', self.name)


def _send_igwn_alert(topic, payload):
    """Shim to send Celery signal."""
    igwn_alert_received.send(None, topic=topic, payload=payload)


[docs]class Receiver(IGWNAlertBootStep): """Run the global IGWN alert receiver in background thread.""" name = 'IGWN Alert client'
[docs] def start(self, consumer): super().start(consumer) self._client = IGWNAlertClient( server=consumer.app.conf['igwn_alert_server'], noauth=consumer.app.conf['igwn_alert_noauth'], group=consumer.app.conf['igwn_alert_group']) self.thread = Thread( target=self._client.listen, args=(_send_igwn_alert, consumer.app.conf['igwn_alert_topics']), name='IGWNReceiverThread') self.thread.start()
[docs] def stop(self, consumer): super().stop(consumer) if self._client.running: self._client.running = False self._client.stream_obj._consumer.stop() self.thread.join()
[docs] def info(self, consumer): return {'igwn-alert-topics': consumer.app.conf[ 'igwn_alert_topics'].intersection(self._client.get_topics())}