Source code for gwcelery.tools.condor

"""Shortcuts for HTCondor commands to manage deployment of GWCelery on LIGO
Data Grid clusters.

These commands apply to the GWCelery instance that is
running in the current working directory.
"""
import json
import os
import shlex
import subprocess
import sys
import time
from importlib import resources

import click
import lxml.etree

from .. import data

SUBMIT_FILE = str(resources.files(data).joinpath('gwcelery.sub'))


@click.group(help=__doc__)
def condor():
    # Just register the condor command group;
    # no need to do anything here.
    pass


[docs]def get_constraints(): return '-constraint', 'JobBatchName=={} && Iwd=={}'.format( json.dumps('gwcelery'), # JSON string literal escape sequences json.dumps(os.getcwd()) # are a close match to HTCondor ClassAds. )
[docs]def run_exec(*args): print(' '.join(shlex.quote(arg) for arg in args)) os.execvp(args[0], args)
[docs]def running(): """Determine if GWCelery is already running under HTCondor.""" status = subprocess.check_output(('condor_q', '-xml', *get_constraints())) classads = lxml.etree.fromstring(status) return classads.find('.//c') is not None
@condor.command() @click.pass_context def submit(ctx): """Submit all GWCelery jobs to HTCondor (if not already running).""" if running(): print('error: GWCelery jobs are already running in this directory.\n' 'First remove existing jobs with "gwcelery condor rm".\n' 'To see the status of those jobs, run "gwcelery condor q".', file=sys.stderr) sys.exit(1) else: accounting_group = ctx.obj.app.conf['condor_accounting_group'] run_exec('condor_submit', 'accounting_group={}'.format(accounting_group), SUBMIT_FILE) @condor.command() @click.pass_context def resubmit(ctx): """Remove any running GWCelery jobs and resubmit to HTCondor.""" if running(): subprocess.check_call(('condor_rm', *get_constraints())) timeout = 120 start = time.monotonic() while time.monotonic() - start < timeout: if not running(): break time.sleep(1) else: print('error: Could not stop all GWCelery jobs', file=sys.stderr) sys.exit(1) accounting_group = ctx.obj.app.conf['condor_accounting_group'] run_exec('condor_submit', 'accounting_group={}'.format(accounting_group), SUBMIT_FILE) @condor.command() def rm(): """Remove all GWCelery jobs.""" run_exec('condor_rm', *get_constraints()) @condor.command() def hold(): """Put all GWCelery jobs on hold.""" run_exec('condor_hold', *get_constraints()) @condor.command() def release(): """Release all GWCelery jobs from hold status.""" run_exec('condor_release', *get_constraints()) @condor.command() def q(): """Show status of all GWCelery jobs.""" run_exec('condor_q', '-nobatch', *get_constraints())