Source code for gwcelery.tasks.external_triggers

from pathlib import Path
from urllib.parse import urlparse

from astropy.time import Time
from celery import group
from celery.utils.log import get_logger
from lxml import etree

from .. import app
from . import (alerts, detchar, external_skymaps, gcn, gracedb, igwn_alert,
               raven)

log = get_logger(__name__)


REQUIRED_LABELS_BY_TASK = {
    # EM_READY implies preferred event sky map is available
    'compare': {'EM_READY', 'EXT_SKYMAP_READY', 'EM_COINC'},
    'SoG': {'SKYMAP_READY', 'RAVEN_ALERT', 'ADVOK'}
}
"""These labels should be present on an external event to consider it to
be ready for sky map comparison or for post-alert analysis, such as a
measurment of the speed of gravity (SoG).
"""

FERMI_GRB_CLASS_VALUE = 4
"""This is the index that denote GRBs within Fermi's Flight Position
classification."""

FERMI_GRB_CLASS_THRESH = 50
"""This values denotes the threshold of the most likely Fermi source
classification, above which we will consider a Fermi Flight Position
notice."""


[docs]@gcn.handler(gcn.NoticeType.SNEWS, queue='exttrig', shared=False) def handle_snews_gcn(payload): """Handles the GCN notice payload from SNEWS alerts. Prepares the alert to be sent to graceDB as external events, updating the info if it already exists. Parameters ---------- payload : str XML GCN notice alert packet in string format """ root = etree.fromstring(payload) # Get TrigID and Test Event Boolean trig_id = root.find("./What/Param[@name='TrigID']").attrib['value'] ext_group = 'Test' if root.attrib['role'] == 'test' else 'External' event_observatory = 'SNEWS' if 'mdc-test_event' in root.attrib['ivorn'].lower(): search = 'MDC' else: search = 'Supernova' query = 'group: External pipeline: {} grbevent.trigger_id = "{}"'.format( event_observatory, trig_id) ( gracedb.get_events.si(query=query) | _create_replace_external_event_and_skymap.s( payload, search, event_observatory, ext_group=ext_group ) ).delay()
[docs]@gcn.handler(gcn.NoticeType.FERMI_GBM_ALERT, gcn.NoticeType.FERMI_GBM_FLT_POS, gcn.NoticeType.FERMI_GBM_GND_POS, gcn.NoticeType.FERMI_GBM_FIN_POS, gcn.NoticeType.SWIFT_BAT_GRB_POS_ACK, gcn.NoticeType.FERMI_GBM_SUBTHRESH, gcn.NoticeType.INTEGRAL_WAKEUP, gcn.NoticeType.INTEGRAL_REFINED, gcn.NoticeType.INTEGRAL_OFFLINE, queue='exttrig', shared=False) def handle_grb_gcn(payload): """Handles the payload from Fermi, Swift, and INTEGRAL GCN notices. Filters out candidates likely to be noise. Creates external events from the notice if new notice, otherwise updates existing event. Then creates and/or grabs external sky map to be uploaded to the external event. More info for these notices can be found at: Fermi-GBM: https://gcn.gsfc.nasa.gov/fermi_grbs.html Fermi-GBM sub: https://gcn.gsfc.nasa.gov/fermi_gbm_subthresh_archive.html Swift: https://gcn.gsfc.nasa.gov/swift.html INTEGRAL: https://gcn.gsfc.nasa.gov/integral.html Parameters ---------- payload : str XML GCN notice alert packet in string format """ root = etree.fromstring(payload) u = urlparse(root.attrib['ivorn']) stream_path = u.path stream_obsv_dict = {'/SWIFT': 'Swift', '/Fermi': 'Fermi', '/INTEGRAL': 'INTEGRAL'} event_observatory = stream_obsv_dict[stream_path] ext_group = 'Test' if root.attrib['role'] == 'test' else 'External' # Block Test INTEGRAL events on the production server to prevent # unneeded queries of old GW data during detchar check if event_observatory == 'INTEGRAL' and ext_group == 'Test' and \ app.conf['gracedb_host'] == 'gracedb.ligo.org': return # Get TrigID elif event_observatory == 'INTEGRAL' and \ not any([x in u.fragment for x in ['O3-replay', 'MDC-test']]): # FIXME: revert all this if INTEGRAL fixes their GCN notices # If INTEGRAL, get trigger ID from ivorn rather than the TrigID field # unless O3 replay or MDC event trig_id = u.fragment.split('_')[-1].split('-')[0] # Modify the TrigID field so GraceDB has the correct value root.find("./What/Param[@name='TrigID']").attrib['value'] = \ str(trig_id).encode() # Apply changes to payload delivered to GraceDB payload = etree.tostring(root, xml_declaration=True, encoding="UTF-8") else: try: trig_id = \ root.find("./What/Param[@name='TrigID']").attrib['value'] except AttributeError: trig_id = \ root.find("./What/Param[@name='Trans_Num']").attrib['value'] notice_type = \ int(root.find("./What/Param[@name='Packet_Type']").attrib['value']) reliability = root.find("./What/Param[@name='Reliability']") if reliability is not None and int(reliability.attrib['value']) <= 4: return # Check if Fermi trigger is likely noise by checking classification # Most_Likely_Index of 4 is an astrophysical GRB # If not at least 50% chance of GRB we will not consider it for RAVEN likely_source = root.find("./What/Param[@name='Most_Likely_Index']") likely_prob = root.find("./What/Param[@name='Most_Likely_Prob']") not_likely_grb = likely_source is not None and \ (likely_source.attrib['value'] != FERMI_GRB_CLASS_VALUE or likely_prob.attrib['value'] < FERMI_GRB_CLASS_THRESH) # Check if initial Fermi alert. These are generally unreliable and should # never trigger a RAVEN alert, but will give us earlier warning of a # possible coincidence. Later notices could change this. initial_gbm_alert = notice_type == gcn.NoticeType.FERMI_GBM_ALERT # Check if Swift has lost lock. If so then veto lost_lock = \ root.find("./What/Group[@name='Solution_Status']" + "/Param[@name='StarTrack_Lost_Lock']") swift_veto = lost_lock is not None and lost_lock.attrib['value'] == 'true' # Only send alerts if likely a GRB, is not a low-confidence early Fermi # alert, and if not a Swift veto if not_likely_grb or initial_gbm_alert or swift_veto: label = 'NOT_GRB' else: label = None ivorn = root.attrib['ivorn'] if 'subthresh' in ivorn.lower(): search = 'SubGRB' elif 'mdc-test_event' in ivorn.lower(): search = 'MDC' else: search = 'GRB' if search == 'SubGRB' and event_observatory == 'Fermi': skymap_link = \ root.find("./What/Param[@name='HealPix_URL']").attrib['value'] else: skymap_link = None query = 'group: External pipeline: {} grbevent.trigger_id = "{}"'.format( event_observatory, trig_id) ( gracedb.get_events.si(query=query) | _create_replace_external_event_and_skymap.s( payload, search, event_observatory, ext_group=ext_group, label=label, notice_date=root.find("./Who/Date").text, notice_type=notice_type, skymap_link=skymap_link, use_radec=search in {'GRB', 'MDC'} ) ).delay()
[docs]@igwn_alert.handler('superevent', 'mdc_superevent', 'external_fermi', 'external_swift', 'external_integral', shared=False) def handle_grb_igwn_alert(alert): """Parse an IGWN alert message related to superevents/GRB external triggers and dispatch it to other tasks. Notes ----- This IGWN alert message handler is triggered by creating a new superevent or GRB external trigger event, a label associated with completeness of skymaps or change in state, or if a sky map file is uploaded: * New event/superevent triggers a coincidence search with :meth:`gwcelery.tasks.raven.coincidence_search`. * When both a GW and GRB sky map are available during a coincidence, indicated by the labels ``EM_READY`` and ``EXT_SKYMAP_READY`` respectively on the external event, this triggers the spacetime coinc FAR to be calculated and a combined GW-GRB sky map is created using :meth:`gwcelery.tasks.external_skymaps.create_combined_skymap`. * Re-run sky map comparison if complete, and either the GW or GRB sky map has been updated or if the preferred event changed. * Re-check RAVEN publishing conditions if the GRB was previously considered non-astrophycial but now should be considered. Parameters ---------- alert : dict IGWN alert packet """ # Determine GraceDB ID graceid = alert['uid'] # launch searches if alert['alert_type'] == 'new': if alert['object'].get('group') == 'External': # launch search with MDC events and exit if alert['object']['search'] == 'MDC': raven.coincidence_search(graceid, alert['object'], group='CBC', se_searches=['MDC']) raven.coincidence_search(graceid, alert['object'], group='Burst', se_searches=['MDC']) return if alert['object']['search'] in ['SubGRB', 'SubGRBTargeted']: # if sub-threshold GRB, launch search with that pipeline raven.coincidence_search( graceid, alert['object'], searches=['SubGRB', 'SubGRBTargeted'], se_searches=['AllSky'], pipelines=[alert['object']['pipeline']]) else: # launch standard Burst-GRB search raven.coincidence_search(graceid, alert['object'], group='Burst', se_searches=['AllSky']) # launch standard CBC-GRB search raven.coincidence_search(graceid, alert['object'], group='CBC', searches=['GRB']) elif 'S' in graceid: # launch standard GRB search based on group gw_group = alert['object']['preferred_event_data']['group'] search = alert['object']['preferred_event_data']['search'] # launch search with MDC events and exit if alert['object']['preferred_event_data']['search'] == 'MDC': raven.coincidence_search(graceid, alert['object'], group=gw_group, searches=['MDC']) return # Don't run search for BBH or IMBH Burst search elif gw_group == 'Burst' and search.lower() != 'allsky': return se_searches = [] if gw_group == 'CBC' else ['AllSky'] searches = (['SubGRB', 'SubGRBTargeted'] if gw_group == 'CBC' else ['SubGRBTargeted']) # launch standard GRB search raven.coincidence_search(graceid, alert['object'], group=gw_group, searches=['GRB'], se_searches=se_searches) # launch subthreshold search for Fermi and Swift separately to use # different time windows, for both CBC and Burst for pipeline in ['Fermi', 'Swift']: raven.coincidence_search( graceid, alert['object'], group=gw_group, searches=searches, pipelines=[pipeline]) # re-run raven pipeline and create combined sky map (if not a Swift event) # when sky maps are available elif alert['alert_type'] == 'label_added' and \ alert['object'].get('group') == 'External': if _skymaps_are_ready(alert['object'], alert['data']['name'], 'compare'): # if both sky maps present and a coincidence, re-run RAVEN # pipeline and create combined sky maps ext_event = alert['object'] superevent_id, ext_id = _get_superevent_ext_ids( graceid, ext_event) superevent = gracedb.get_superevent(superevent_id) _relaunch_raven_pipeline_with_skymaps( superevent, ext_event, graceid) elif 'EM_COINC' in alert['object']['labels']: # if not complete, check if GW sky map; apply label to external # event if GW sky map se_labels = gracedb.get_labels(alert['object']['superevent']) if 'SKYMAP_READY' in se_labels: gracedb.create_label.si('SKYMAP_READY', graceid).delay() if 'EM_READY' in se_labels: gracedb.create_label.si('EM_READY', graceid).delay() # apply labels from superevent to external event to update state # and trigger functionality requiring sky maps, etc. elif alert['alert_type'] == 'label_added' and 'S' in graceid: if 'SKYMAP_READY' in alert['object']['labels']: # if sky map in superevent, apply label to all external events # at the time group( gracedb.create_label.si('SKYMAP_READY', ext_id) for ext_id in alert['object']['em_events'] ).delay() if 'EM_READY' in alert['object']['labels']: # if sky map not in superevent but in preferred event, apply label # to all external events at the time group( gracedb.create_label.si('EM_READY', ext_id) for ext_id in alert['object']['em_events'] ).delay() if _skymaps_are_ready(alert['object'], alert['data']['name'], 'SoG') \ and alert['object']['space_coinc_far'] is not None: # if a superevent is vetted by ADVOK and a spatial joint FAR is # available, check if SoG publishing conditions are met ( gracedb.get_event.si(alert['object']['em_type']) | raven.sog_paper_pipeline.s(alert['object']) ).delay() # if new GW or external sky map after first being available, try to remake # combine sky map and rerun raven pipeline elif alert['alert_type'] == 'log' and \ 'EM_COINC' in alert['object']['labels'] and \ 'fit' in alert['data']['filename'] and \ 'flat' not in alert['data']['comment'].lower() and \ (alert['data']['filename'] != external_skymaps.COMBINED_SKYMAP_FILENAME_MULTIORDER): superevent_id, external_id = _get_superevent_ext_ids( graceid, alert['object']) if 'S' in graceid: superevent = alert['object'] else: superevent = gracedb.get_superevent(alert['object']['superevent']) external_event = alert['object'] # check if combined sky map already made, with the exception of Swift # which will fail if 'S' in graceid: # Rerun for all eligible external events for ext_id in superevent['em_events']: external_event = gracedb.get_event(ext_id) if REQUIRED_LABELS_BY_TASK['compare'].issubset( set(external_event['labels'])): _relaunch_raven_pipeline_with_skymaps( superevent, external_event, graceid, use_superevent_skymap=True) else: if REQUIRED_LABELS_BY_TASK['compare'].issubset( set(external_event['labels'])): _relaunch_raven_pipeline_with_skymaps( superevent, external_event, graceid) # Rerun the coincidence FAR calculation if possible with combined sky map # if the preferred event changes # We don't want to run this logic if PE results are present elif alert['alert_type'] == 'log' and \ 'PE_READY' not in alert['object']['labels'] and \ 'EM_COINC' in alert['object']['labels']: new_log_comment = alert['data'].get('comment', '') if 'S' in graceid and \ new_log_comment.startswith('Updated superevent parameters: ' 'preferred_event: '): superevent = alert['object'] # Rerun for all eligible external events for ext_id in superevent['em_events']: external_event = gracedb.get_event(ext_id) if REQUIRED_LABELS_BY_TASK['compare'].issubset( set(external_event['labels'])): _relaunch_raven_pipeline_with_skymaps( superevent, external_event, graceid, use_superevent_skymap=False) elif alert['alert_type'] == 'label_removed' and \ alert['object'].get('group') == 'External': if alert['data']['name'] == 'NOT_GRB' and \ 'EM_COINC' in alert['object']['labels']: # if NOT_GRB is removed, re-check publishing conditions superevent_id = alert['object']['superevent'] superevent = gracedb.get_superevent(superevent_id) gw_group = superevent['preferred_event_data']['group'] coinc_far_dict = { 'temporal_coinc_far': superevent['time_coinc_far'], 'spatiotemporal_coinc_far': superevent['space_coinc_far'] } raven.trigger_raven_alert(coinc_far_dict, superevent, graceid, alert['object'], gw_group)
[docs]@igwn_alert.handler('superevent', 'mdc_superevent', 'external_snews', shared=False) def handle_snews_igwn_alert(alert): """Parse an IGWN alert message related to superevents/Supernovae external triggers and dispatch it to other tasks. Notes ----- This igwn_alert message handler is triggered whenever a new superevent or Supernovae external event is created: * New event triggers a coincidence search with :meth:`gwcelery.tasks.raven.coincidence_search`. Parameters ---------- alert : dict IGWN alert packet """ # Determine GraceDB ID graceid = alert['uid'] if alert['alert_type'] == 'new': if alert['object'].get('superevent_id'): group = alert['object']['preferred_event_data']['group'] search = alert['object']['preferred_event_data']['search'] searches = ['MDC'] if search == 'MDC' else ['Supernova'] se_searches = ['MDC'] if search == 'MDC' else ['AllSky'] # Run only on Test and Burst superevents if group in {'Burst', 'Test'} and search in {'MDC', 'AllSky'}: raven.coincidence_search(graceid, alert['object'], group='Burst', searches=searches, se_searches=se_searches, pipelines=['SNEWS']) else: # Run on SNEWS event, either real or test search = alert['object']['search'] if search == 'MDC': raven.coincidence_search(graceid, alert['object'], group='Burst', searches=['MDC'], se_searches=['MDC'], pipelines=['SNEWS']) elif search == 'Supernova': raven.coincidence_search(graceid, alert['object'], group='Burst', searches=['Supernova'], se_searches=['AllSky'], pipelines=['SNEWS'])
[docs]@alerts.handler('fermi', 'swift') def handle_targeted_kafka_alert(alert): """Parse an alert sent via Kafka from a MOU partner in our joint subthreshold targeted search. Parameters ---------- alert : dict Kafka alert packet """ # Convert alert to VOEvent format # FIXME: This is required until native ingesting of kafka events in GraceDB payload, pipeline, time, trig_id = _kafka_to_voevent(alert) # Veto events that don't pass GRB FAR threshold far_grb = alert['far'] veto_event = \ app.conf['raven_targeted_far_thresholds']['GRB'][pipeline] < far_grb label = ('NOT_GRB' if alert['alert_type'] == "retraction" or veto_event else None) # Look whether a previous event with the same ID exists query = 'group: External pipeline: {} grbevent.trigger_id = "{}"'.format( pipeline, trig_id) ( gracedb.get_events.si(query=query) | _create_replace_external_event_and_skymap.s( payload, 'SubGRBTargeted', pipeline, label=label, notice_date=time, skymap=alert.get('healpix_file'), use_radec=('ra' in alert and 'dec' in alert) ) ).delay()
def _skymaps_are_ready(event, label, task): """Determine whether labels are complete to launch a certain task. Parameters ---------- event : dict Either Superevent or external event dictionary graceid : str GraceDB ID task : str Determines which label schmema to check for completeness Returns ------- labels_pass : bool True if all the require labels are present and the given label is part of that set """ label_set = set(event['labels']) required_labels = REQUIRED_LABELS_BY_TASK[task] return required_labels.issubset(label_set) and label in required_labels def _get_superevent_ext_ids(graceid, event): """Grab superevent and external event IDs from a given event. Parameters ---------- graceid : str GraceDB ID event : dict Either Superevent or external event dictionary Returns ------- se_id, ext_id : tuple Tuple of superevent and external event GraceDB IDs """ if 'S' in graceid: se_id = event['superevent_id'] ext_id = event['em_type'] else: se_id = event['superevent'] ext_id = event['graceid'] return se_id, ext_id
[docs]@app.task(shared=False) def _launch_external_detchar(event): """Launch detchar tasks for an external event. Parameters ---------- event : dict External event dictionary Returns ------- event : dict External event dictionary """ start = event['gpstime'] if event['pipeline'] == 'SNEWS': start, end = event['gpstime'], event['gpstime'] else: integration_time = \ event['extra_attributes']['GRB']['trigger_duration'] or 4.0 end = start + integration_time detchar.check_vectors.si(event, event['graceid'], start, end).delay() return event
def _relaunch_raven_pipeline_with_skymaps(superevent, ext_event, graceid, use_superevent_skymap=None): """Relaunch the RAVEN sky map comparison workflow, include recalculating the joint FAR with updated sky map info and creating a new combined sky map. Parameters ---------- superevent : dict Superevent dictionary exttrig : dict External event dictionary graceid : str GraceDB ID of event use_superevent_skymap : bool If True/False, use/don't use skymap info from superevent. Else if None, check SKYMAP_READY label in external event. """ gw_group = superevent['preferred_event_data']['group'] tl, th = raven._time_window(graceid, gw_group, [ext_event['pipeline']], [ext_event['search']]) # FIXME: both overlap integral and combined sky map could be # done by the same function since they are so similar use_superevent = (use_superevent_skymap if use_superevent_skymap is not None else 'SKYMAP_READY' in ext_event['labels']) canvas = raven.raven_pipeline.si( [ext_event] if 'S' in graceid else [superevent], graceid, superevent if 'S' in graceid else ext_event, tl, th, gw_group, use_superevent_skymap=use_superevent) # Create new updated combined sky map canvas |= external_skymaps.create_combined_skymap.si( superevent['superevent_id'], ext_event['graceid'], preferred_event=( None if use_superevent else superevent['preferred_event'])) canvas.delay()
[docs]@app.task(shared=False) def _create_replace_external_event_and_skymap( events, payload, search, pipeline, label=None, ext_group='External', notice_date=None, notice_type=None, skymap=None, skymap_link=None, use_radec=False): """Either create a new external event or replace an old one if applicable Then either uploads a given sky map, try to download one given a link, or create one given coordinates. Parameters ---------- events : list List of external events sharing the same trigger ID payload : str VOEvent of event being considered search : str Search of external event pipeline : str Pipeline of external evevent label : list Label to be uploaded along with external event. If None, removes 'NOT_GRB' label from event ext_group : str Group of external event, 'External' or 'Test' notice_date : str External event trigger time in ISO format notice_type : int GCN notice type integer skymap : str Base64 encoded sky map skymap_link : str Link to external sky map to be downloaded use_radec : bool If True, try to create sky map using given coordinates """ skymap_detchar_canvas = () upload_new_skymap = True # If previous event, try to append if events and ext_group == 'External': assert len(events) == 1, 'Found more than one matching GraceDB entry' event, = events graceid = event['graceid'] # If previous Fermi sky map, check if from official analysis if 'EXT_SKYMAP_READY' in event['labels'] and \ event['pipeline'] == 'Fermi': # If True, block further sky maps from being uploaded automatically # Note that sky maps can also be uploaded via the dashboard upload_new_skymap = \ (external_skymaps.FERMI_OFFICIAL_SKYMAP_FILENAME not in external_skymaps.get_skymap_filename(graceid, is_gw=False)) if label: create_replace_canvas = gracedb.create_label.si(label, graceid) else: create_replace_canvas = gracedb.remove_label.si('NOT_GRB', graceid) # Prevent SubGRBs from appending GRBs, also append if same search if search == 'GRB' or search == event['search']: # Replace event and pass already existing event dictionary create_replace_canvas |= gracedb.replace_event.si(graceid, payload) create_replace_canvas |= gracedb.get_event.si(graceid) else: # If not appending just exit return # If new event, create new entry in GraceDB and launch detchar else: create_replace_canvas = gracedb.create_event.si( filecontents=payload, search=search, group=ext_group, pipeline=pipeline, labels=[label] if label else None) skymap_detchar_canvas += _launch_external_detchar.s(), # Use sky map if provided if skymap: skymap_detchar_canvas += \ external_skymaps.read_upload_skymap_from_base64.s(skymap), # Otherwise if no official Fermi sky map, upload one based on given info elif upload_new_skymap: # Otherwise grab sky map from provided link if skymap_link: skymap_detchar_canvas += \ external_skymaps.get_upload_external_skymap.s(skymap_link), # Otherwise if threshold Fermi try to grab sky map elif pipeline == 'Fermi' and search == 'GRB': skymap_detchar_canvas += \ external_skymaps.get_upload_external_skymap.s(None), # Otherwise create sky map from given coordinates if use_radec: skymap_detchar_canvas += \ external_skymaps.create_upload_external_skymap.s( notice_type, notice_date), ( create_replace_canvas | group(skymap_detchar_canvas) ).delay()
def _kafka_to_voevent(alert): """Parse an alert sent via Kafka from a MOU partner in our joint subthreshold targeted search and convert to an equivalent XML string GCN VOEvent. Parameters ---------- alert : dict Kafka alert packet Returns ------- payload : str XML GCN notice alert packet in string format """ # Define basic values pipeline = alert['mission'] start_time = alert['trigger_time'] alert_time = alert['alert_datetime'] far = alert['far'] duration = alert['rate_duration'] id = '_'.join(str(x) for x in alert['id']) # Use central time since starting time is not well measured central_time = \ Time(start_time, format='isot', scale='utc').to_value('gps') + \ .5 * duration trigger_time = \ str(Time(central_time, format='gps', scale='utc').isot) + 'Z' # sky localization may not be available ra = alert.get('ra') dec = alert.get('dec') # Try to get dec first then ra, None if both misssing error = alert.get('dec_uncertainty') if error is None: error = alert.get('ra_uncertainty') # Argument should be list if not None if isinstance(error, list): error = error[0] # if any missing sky map info, set to zeros so will be ignored later if ra is None or dec is None or error is None: ra, dec, error = 0., 0., 0. # Load template fname = str(Path(__file__).parent / '../tests/data/{}_subgrbtargeted_template.xml'.format( pipeline.lower())) root = etree.parse(fname) # Update template values # Change ivorn to indicate this is a subthreshold targeted event root.xpath('.')[0].attrib['ivorn'] = \ 'ivo://lvk.internal/{0}#targeted_subthreshold-{1}'.format( pipeline.lower(), trigger_time).encode() # Update ID root.find("./What/Param[@name='TrigID']").attrib['value'] = \ id.encode() # Change times to chosen time root.find("./Who/Date").text = str(alert_time).encode() root.find(("./WhereWhen/ObsDataLocation/" "ObservationLocation/AstroCoords/Time/TimeInstant/" "ISOTime")).text = str(trigger_time).encode() root.find("./What/Param[@name='FAR']").attrib['value'] = \ str(far).encode() root.find("./What/Param[@name='Integ_Time']").attrib['value'] = \ str(duration).encode() # Sky position root.find(("./WhereWhen/ObsDataLocation/" "ObservationLocation/AstroCoords/Position2D/Value2/" "C1")).text = str(ra).encode() root.find(("./WhereWhen/ObsDataLocation/" "ObservationLocation/AstroCoords/Position2D/Value2/" "C2")).text = str(dec).encode() root.find(("./WhereWhen/ObsDataLocation/" "ObservationLocation/AstroCoords/Position2D/" "Error2Radius")).text = str(error).encode() return (etree.tostring(root, xml_declaration=True, encoding="UTF-8", pretty_print=True), pipeline, trigger_time.replace('Z', ''), id)