Source code for gwcelery.tasks.core

"""Base classes for other Celery tasks."""
from celery import group
from celery.utils.log import get_logger

from .. import app

log = get_logger(__name__)


[docs]@app.task(shared=False) def identity(arg=None): """Identity task (returns its input).""" return arg
[docs]@app.task(shared=False) def get_first(args): """Get the first result of a group. Identity for scalar""" try: first, *_ = args except TypeError: first = args # if scalar return first
[docs]@app.task(shared=False) def get_last(args): """Get the last result of a group. Identity for scalar""" try: *_, last = args except TypeError: last = args # if scalar return last
[docs]class DispatchHandler(dict):
[docs] def process_args(self, *args, **kwargs): r"""Determine key and callback arguments. The default implementation treats the first positional argument as the key. Parameters ---------- \*args Arguments passed to :meth:`__call__`. \*\*kwargs Keyword arguments passed to :meth:`__call__`. Returns ------- key The key to determine which callback to invoke. \*args The arguments to pass to the registered callback. \*\*kwargs The keyword arguments to pass to the registered callback. """ key, *args = args return key, args, kwargs
def __call__(self, *keys, **kwargs): r"""Create a new task and register it as a callback for handling the given keys. Parameters ---------- \*keys : list Keys to match \*\*kwargs Additional keyword arguments for `celery.Celery.task`. """ def wrap(f): f = app.task(ignore_result=True, **kwargs)(f) for key in keys: self.setdefault(key, []).append(f) return f return wrap
[docs] def dispatch(self, *args, **kwargs): log.debug('considering dispatch: args=%r, kwargs=%r', args, kwargs) try: key, args, kwargs = self.process_args(*args, **kwargs) except (TypeError, ValueError): log.exception('error unpacking key') return log.debug('unpacked: key=%r, args=%r, kwargs=%r', key, args, kwargs) try: matching_handlers = self[key] except KeyError: log.warning('ignoring unrecognized key: %r', key) else: log.info('calling handlers %r for key %r', matching_handlers, key) group([handler.s() for handler in matching_handlers]).apply_async( args, kwargs)