Source code for gwcelery.tasks.gwskynet

"""GWSkyNet annotation with GWSkyNet model"""
import json
import re
from functools import cache

import numpy as np

from .. import app
from ..util.tempfile import NamedTemporaryFile
from . import gracedb, igwn_alert, superevents

manual_pref_event_change_regexp = re.compile(
    app.conf['views_manual_preferred_event_log_message'].replace('.', '\\.')
    .replace('{}', '.+')
)


[docs]@cache def GWSkyNet_model(): # FIXME Remove import from function scope once importing GWSkyNet is not a # slow operation from GWSkyNet import GWSkyNet return GWSkyNet.load_GWSkyNet_model()
[docs]@app.task(queue='skynet', shared=False) def gwskynet_annotation(input_list, SNRs, superevent_id): """Perform the series of tasks necessary for GWSkyNet to Parameters ---------- input_list : list The output of _download_and_keep_file_name that includes the downloaded the skymap and the versioned file name of the skymap. This list is in the form [skymap, skymap_filename]. snr : numpy array of floats detector SNRs. superevent_id : str superevent uid skymap_filename : str versioned filename for skymap """ # FIXME Remove import from function scope once importing GWSkyNet is not a # slow operation from GWSkyNet import GWSkyNet filecontents, skymap_filename = input_list with NamedTemporaryFile(content=filecontents) as fitsfile: GWSkyNet_input = GWSkyNet.prepare_data(fitsfile.name) # One of the inputs from BAYESTAR to GWSkyNet is the list of instruments, # i.e., metadata['instruments'], which is converted to a binary array with # three elements, i.e. GWSkyNet_input[2], for H1, L1 and V1. # GWSkyNet 2.4.0 uses this array to indicate detector with SNR >= 4.5 GWSkyNet_input[2][0] = np.where(SNRs >= app.conf['gwskynet_snr_threshold'], 1, 0) gwskynet_score = GWSkyNet.predict(GWSkyNet_model(), GWSkyNet_input) FAP, FNP = GWSkyNet.get_rates(gwskynet_score) fap = FAP[0] fnp = FNP[0] gs = gwskynet_score[0] gwskynet_output = {'superevent_id': superevent_id, 'file': skymap_filename, 'GWSkyNet_score': gs, 'GWSkyNet_FAP': fap, 'GWSkyNet_FNP': fnp} return json.dumps(gwskynet_output)
[docs]def get_cbc_event_snr(event): """Get detector SNRs from the LVAlert packet. Parameters ---------- event : dict Event dictionary (e.g., the return value from :meth:`gwcelery.tasks.gracedb.get_event`, or ``preferred_event_data`` in igwn-alert packet.) Returns ------- snr : numpy array of floats detector SNRs. """ # GWSkyNet 2.4.0 uses this SNR array to modify one of the inputs, so # snr needs to be formatted such that index 0, 1 and 2 points to H1, # L1 and V1 respectively snr = np.zeros(3) attribs = event['extra_attributes']['SingleInspiral'] for det in attribs: if det['ifo'] == 'H1': snr[0] = det['snr'] if det['ifo'] == 'L1': snr[1] = det['snr'] if det['ifo'] == 'V1': snr[2] = det['snr'] return snr
[docs]@gracedb.task(shared=False) def _download_and_return_file_name(filename, graceid): """Wrapper around gracedb.download that returns the file name.""" filecontents = gracedb.download(filename, graceid) return [filecontents, filename]
[docs]@gracedb.task(shared=False) def _unpack_gwskynet_annotation_and_upload(gwskynet_output, graceid): filename = 'gwskynet.json' gwskynet_output_dict = json.loads(gwskynet_output) message = ('GWSkyNet annotation from <a href=' '"/api/events/{graceid}/files/' '{skymap_filename}">' '{skymap_filename}</a>.' ' GWSkyNet score: {cs},' ' GWSkyNet FAP: {GWSkyNet_FAP},' ' GWSkyNet FNP: {GWSkyNet_FNP}.').format( graceid=graceid, skymap_filename=gwskynet_output_dict['file'], cs=np.round(gwskynet_output_dict['GWSkyNet_score'], 3), GWSkyNet_FAP=np.round(gwskynet_output_dict['GWSkyNet_FAP'], 3), GWSkyNet_FNP=np.round(gwskynet_output_dict['GWSkyNet_FNP'], 3) ) return gracedb.upload(gwskynet_output, filename, graceid, message=message, tags=['em_follow', 'public'])
def _should_annotate(preferred_event, new_label, new_log_comment, labels, alert_type): # First check if the event passes all of GWSkyNet's annotation criteria SNRs = get_cbc_event_snr(preferred_event) if not (preferred_event['search'].lower() == 'allsky' and preferred_event['far'] <= app.conf['gwskynet_upper_far_threshold'] and (SNRs >= app.conf['gwskynet_snr_threshold']).sum() >= 2 and np.sqrt(sum(SNRs**2)) >= app.conf['gwskynet_network_snr_threshold']): return False annotate = False # Check if the GWSkyNet should annotate in response to this IGWN-Alert if alert_type == 'label_added': if superevents.should_publish(preferred_event, significant=False) is \ False and new_label == 'SKYMAP_READY': # if the superevent is with FAR higher than the preliminary alert # threshold, GWSkyNet will anotate the superevent directly. annotate = True elif new_label == 'GCN_PRELIM_SENT' or \ new_label == 'LOW_SIGNIF_PRELIM_SENT': # if the FAR is lower than the preliminary alert threshold then # GWSkyNet annotates the superevent if the preliminary alert has # been sent. annotate = True elif 'GCN_PRELIM_SENT' not in labels and 'LOW_SIGNIF_PRELIM_SENT' not in \ labels: # GWSkyNet annotations not applied until after initial prelim sent when # FAR passes alert threshold pass elif new_log_comment.startswith('Localization copied from '): # GWSkyNet will also annotate the superevent if the sky map # has been changed (i.e. a sky map from a new g-event has been copied) annotate = True elif manual_pref_event_change_regexp.match(new_log_comment): # Need to check for a different log comment if the preferred event has # been changed manually annotate = True return annotate
[docs]@igwn_alert.handler('superevent', shared=False) def handle_cbc_superevent(alert): """"Annotate the CBC preferred events of superevents using GWSkyNet """ if alert['object']['preferred_event_data']['group'] != 'CBC': return if alert['alert_type'] != 'label_added' and \ alert['alert_type'] != 'log': return superevent_id = alert['uid'] preferred_event = alert['object']['preferred_event_data'] new_label = alert['data'].get('name', '') new_log_comment = alert['data'].get('comment', '') labels = alert['object'].get('labels', []) SNRs = get_cbc_event_snr(preferred_event) if _should_annotate(preferred_event, new_label, new_log_comment, labels, alert['alert_type']): ( gracedb.get_latest_file.s(superevent_id, 'bayestar.multiorder.fits') | _download_and_return_file_name.s(superevent_id) | gwskynet_annotation.s(SNRs, superevent_id) | _unpack_gwskynet_annotation_and_upload.s(superevent_id) ).apply_async()