docker.images/ansible.awx/awx-17.1.0/awx/main/dispatch/publish.py

119 lines
3.4 KiB
Python

import inspect
import logging
import sys
import json
from uuid import uuid4
from django.conf import settings
from . import pg_bus_conn
logger = logging.getLogger('awx.main.dispatch')
def serialize_task(f):
return '.'.join([f.__module__, f.__name__])
class task:
"""
Used to decorate a function or class so that it can be run asynchronously
via the task dispatcher. Tasks can be simple functions:
@task()
def add(a, b):
return a + b
...or classes that define a `run` method:
@task()
class Adder:
def run(self, a, b):
return a + b
# Tasks can be run synchronously...
assert add(1, 1) == 2
assert Adder().run(1, 1) == 2
# ...or published to a queue:
add.apply_async([1, 1])
Adder.apply_async([1, 1])
# Tasks can also define a specific target queue or use the special fan-out queue tower_broadcast:
@task(queue='slow-tasks')
def snooze():
time.sleep(10)
@task(queue='tower_broadcast')
def announce():
print("Run this everywhere!")
"""
def __init__(self, queue=None):
self.queue = queue
def __call__(self, fn=None):
queue = self.queue
class PublisherMixin(object):
queue = None
@classmethod
def delay(cls, *args, **kwargs):
return cls.apply_async(args, kwargs)
@classmethod
def apply_async(cls, args=None, kwargs=None, queue=None, uuid=None, **kw):
task_id = uuid or str(uuid4())
args = args or []
kwargs = kwargs or {}
queue = (
queue or
getattr(cls.queue, 'im_func', cls.queue)
)
if not queue:
msg = f'{cls.name}: Queue value required and may not be None'
logger.error(msg)
raise ValueError(msg)
obj = {
'uuid': task_id,
'args': args,
'kwargs': kwargs,
'task': cls.name
}
obj.update(**kw)
if callable(queue):
queue = queue()
if not settings.IS_TESTING(sys.argv):
with pg_bus_conn() as conn:
conn.notify(queue, json.dumps(obj))
return (obj, queue)
# If the object we're wrapping *is* a class (e.g., RunJob), return
# a *new* class that inherits from the wrapped class *and* BaseTask
# In this way, the new class returned by our decorator is the class
# being decorated *plus* PublisherMixin so cls.apply_async() and
# cls.delay() work
bases = []
ns = {'name': serialize_task(fn), 'queue': queue}
if inspect.isclass(fn):
bases = list(fn.__bases__)
ns.update(fn.__dict__)
cls = type(
fn.__name__,
tuple(bases + [PublisherMixin]),
ns
)
if inspect.isclass(fn):
return cls
# if the object being decorated is *not* a class (it's a Python
# function), make fn.apply_async and fn.delay proxy through to the
# PublisherMixin we dynamically created above
setattr(fn, 'name', cls.name)
setattr(fn, 'apply_async', cls.apply_async)
setattr(fn, 'delay', cls.delay)
return fn