docker.images/ansible.awx/awx-17.1.0/awx/main/signals.py

673 lines
28 KiB
Python

# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved.
# Python
import contextlib
import logging
import threading
import json
import sys
# Django
from django.db import connection
from django.conf import settings
from django.db.models.signals import (
pre_save,
post_save,
pre_delete,
post_delete,
m2m_changed,
)
from django.dispatch import receiver
from django.contrib.auth import SESSION_KEY
from django.contrib.sessions.models import Session
from django.utils import timezone
# Django-CRUM
from crum import get_current_request, get_current_user
from crum.signals import current_user_getter
# AWX
from awx.main.models import (
ActivityStream, Group, Host, InstanceGroup, Inventory, InventorySource,
Job, JobHostSummary, JobTemplate, OAuth2AccessToken, Organization, Project,
Role, SystemJob, SystemJobTemplate, UnifiedJob, UnifiedJobTemplate, User,
UserSessionMembership, WorkflowJobTemplateNode, WorkflowApproval,
WorkflowApprovalTemplate, ROLE_SINGLETON_SYSTEM_ADMINISTRATOR
)
from awx.main.constants import CENSOR_VALUE
from awx.main.utils import model_instance_diff, model_to_dict, camelcase_to_underscore, get_current_apps
from awx.main.utils import ignore_inventory_computed_fields, ignore_inventory_group_removal, _inventory_updates
from awx.main.tasks import update_inventory_computed_fields
from awx.main.fields import (
is_implicit_parent,
update_role_parentage_for_instance,
)
from awx.main import consumers
from awx.conf.utils import conf_to_dict
__all__ = []
logger = logging.getLogger('awx.main.signals')
analytics_logger = logging.getLogger('awx.analytics.activity_stream')
# Update has_active_failures for inventory/groups when a Host/Group is deleted,
# when a Host-Group or Group-Group relationship is updated, or when a Job is deleted
def get_activity_stream_class():
if 'migrate' in sys.argv:
return get_current_apps().get_model('main', 'ActivityStream')
else:
return ActivityStream
def get_current_user_or_none():
u = get_current_user()
if not isinstance(u, User):
return None
return u
def emit_update_inventory_on_created_or_deleted(sender, **kwargs):
if getattr(_inventory_updates, 'is_updating', False):
return
instance = kwargs['instance']
if ('created' in kwargs and kwargs['created']) or \
kwargs['signal'] == post_delete:
pass
else:
return
sender_name = str(sender._meta.verbose_name)
logger.debug("%s created or deleted, updating inventory computed fields: %r %r",
sender_name, sender, kwargs)
try:
inventory = instance.inventory
except Inventory.DoesNotExist:
pass
else:
if inventory is not None:
connection.on_commit(
lambda: update_inventory_computed_fields.delay(inventory.id)
)
def rebuild_role_ancestor_list(reverse, model, instance, pk_set, action, **kwargs):
'When a role parent is added or removed, update our role hierarchy list'
if action == 'post_add':
if reverse:
model.rebuild_role_ancestor_list(list(pk_set), [])
else:
model.rebuild_role_ancestor_list([instance.id], [])
if action in ['post_remove', 'post_clear']:
if reverse:
model.rebuild_role_ancestor_list([], list(pk_set))
else:
model.rebuild_role_ancestor_list([], [instance.id])
def sync_superuser_status_to_rbac(instance, **kwargs):
'When the is_superuser flag is changed on a user, reflect that in the membership of the System Admnistrator role'
update_fields = kwargs.get('update_fields', None)
if update_fields and 'is_superuser' not in update_fields:
return
if instance.is_superuser:
Role.singleton(ROLE_SINGLETON_SYSTEM_ADMINISTRATOR).members.add(instance)
else:
Role.singleton(ROLE_SINGLETON_SYSTEM_ADMINISTRATOR).members.remove(instance)
def sync_rbac_to_superuser_status(instance, sender, **kwargs):
'When the is_superuser flag is false but a user has the System Admin role, update the database to reflect that'
if kwargs['action'] in ['post_add', 'post_remove', 'post_clear']:
new_status_value = bool(kwargs['action'] == 'post_add')
if hasattr(instance, 'singleton_name'): # duck typing, role.members.add() vs user.roles.add()
role = instance
if role.singleton_name == ROLE_SINGLETON_SYSTEM_ADMINISTRATOR:
if kwargs['pk_set']:
kwargs['model'].objects.filter(pk__in=kwargs['pk_set']).update(is_superuser=new_status_value)
elif kwargs['action'] == 'post_clear':
kwargs['model'].objects.all().update(is_superuser=False)
else:
user = instance
if kwargs['action'] == 'post_clear':
user.is_superuser = False
user.save(update_fields=['is_superuser'])
elif kwargs['model'].objects.filter(pk__in=kwargs['pk_set'], singleton_name=ROLE_SINGLETON_SYSTEM_ADMINISTRATOR).exists():
user.is_superuser = new_status_value
user.save(update_fields=['is_superuser'])
def rbac_activity_stream(instance, sender, **kwargs):
# Only if we are associating/disassociating
if kwargs['action'] in ['pre_add', 'pre_remove']:
if hasattr(instance, 'content_type'): # Duck typing, migration-independent isinstance(instance, Role)
if instance.content_type_id is None and instance.singleton_name == ROLE_SINGLETON_SYSTEM_ADMINISTRATOR:
# Skip entries for the system admin role because user serializer covers it
# System auditor role is shown in the serializer, but its relationship is
# managed separately, its value is incorrect, and a correction entry is needed
return
# This juggles which role to use, because could be A->B or B->A association
if sender.__name__ == 'Role_parents':
role = kwargs['model'].objects.filter(pk__in=kwargs['pk_set']).first()
# don't record implicit creation / parents in activity stream
if role is not None and is_implicit_parent(parent_role=role, child_role=instance):
return
else:
role = instance
# If a singleton role is the instance, the singleton role is acted on
# otherwise the related object is considered to be acted on
if instance.content_object:
instance = instance.content_object
else:
# Association with actor, like role->user
role = kwargs['model'].objects.filter(pk__in=kwargs['pk_set']).first()
activity_stream_associate(sender, instance, role=role, **kwargs)
def cleanup_detached_labels_on_deleted_parent(sender, instance, **kwargs):
for label in instance.labels.all():
if label.is_candidate_for_detach():
label.delete()
def save_related_job_templates(sender, instance, **kwargs):
'''save_related_job_templates loops through all of the
job templates that use an Inventory that have had their
Organization updated. This triggers the rebuilding of the RBAC hierarchy
and ensures the proper access restrictions.
'''
if sender is not Inventory:
raise ValueError('This signal callback is only intended for use with Project or Inventory')
update_fields = kwargs.get('update_fields', None)
if ((update_fields and not ('organization' in update_fields or 'organization_id' in update_fields)) or
kwargs.get('created', False)):
return
if instance._prior_values_store.get('organization_id') != instance.organization_id:
jtq = JobTemplate.objects.filter(**{sender.__name__.lower(): instance})
for jt in jtq:
parents_added, parents_removed = update_role_parentage_for_instance(jt)
if parents_added or parents_removed:
logger.info('Permissions on JT {} changed due to inventory {} organization change from {} to {}.'.format(
jt.pk, instance.pk, instance._prior_values_store.get('organization_id'), instance.organization_id
))
def connect_computed_field_signals():
post_save.connect(emit_update_inventory_on_created_or_deleted, sender=Host)
post_delete.connect(emit_update_inventory_on_created_or_deleted, sender=Host)
post_save.connect(emit_update_inventory_on_created_or_deleted, sender=Group)
post_delete.connect(emit_update_inventory_on_created_or_deleted, sender=Group)
post_save.connect(emit_update_inventory_on_created_or_deleted, sender=InventorySource)
post_delete.connect(emit_update_inventory_on_created_or_deleted, sender=InventorySource)
post_save.connect(emit_update_inventory_on_created_or_deleted, sender=Job)
post_delete.connect(emit_update_inventory_on_created_or_deleted, sender=Job)
connect_computed_field_signals()
post_save.connect(save_related_job_templates, sender=Inventory)
m2m_changed.connect(rebuild_role_ancestor_list, Role.parents.through)
m2m_changed.connect(rbac_activity_stream, Role.members.through)
m2m_changed.connect(rbac_activity_stream, Role.parents.through)
post_save.connect(sync_superuser_status_to_rbac, sender=User)
m2m_changed.connect(sync_rbac_to_superuser_status, Role.members.through)
pre_delete.connect(cleanup_detached_labels_on_deleted_parent, sender=UnifiedJob)
pre_delete.connect(cleanup_detached_labels_on_deleted_parent, sender=UnifiedJobTemplate)
# Migrate hosts, groups to parent group(s) whenever a group is deleted
@receiver(pre_delete, sender=Group)
def save_related_pks_before_group_delete(sender, **kwargs):
if getattr(_inventory_updates, 'is_removing', False):
return
instance = kwargs['instance']
instance._saved_inventory_pk = instance.inventory.pk
instance._saved_parents_pks = set(instance.parents.values_list('pk', flat=True))
instance._saved_hosts_pks = set(instance.hosts.values_list('pk', flat=True))
instance._saved_children_pks = set(instance.children.values_list('pk', flat=True))
@receiver(post_delete, sender=Group)
def migrate_children_from_deleted_group_to_parent_groups(sender, **kwargs):
if getattr(_inventory_updates, 'is_removing', False):
return
instance = kwargs['instance']
parents_pks = getattr(instance, '_saved_parents_pks', [])
hosts_pks = getattr(instance, '_saved_hosts_pks', [])
children_pks = getattr(instance, '_saved_children_pks', [])
is_updating = getattr(_inventory_updates, 'is_updating', False)
with ignore_inventory_group_removal():
with ignore_inventory_computed_fields():
if parents_pks:
for parent_group in Group.objects.filter(pk__in=parents_pks):
for child_host in Host.objects.filter(pk__in=hosts_pks):
logger.debug('adding host %s to parent %s after group deletion',
child_host, parent_group)
parent_group.hosts.add(child_host)
for child_group in Group.objects.filter(pk__in=children_pks):
logger.debug('adding group %s to parent %s after group deletion',
child_group, parent_group)
parent_group.children.add(child_group)
inventory_pk = getattr(instance, '_saved_inventory_pk', None)
if inventory_pk and not is_updating:
try:
inventory = Inventory.objects.get(pk=inventory_pk)
inventory.update_computed_fields()
except (Inventory.DoesNotExist, Project.DoesNotExist):
pass
# Update host pointers to last_job and last_job_host_summary when a job is deleted
def _update_host_last_jhs(host):
jhs_qs = JobHostSummary.objects.filter(host__pk=host.pk)
try:
jhs = jhs_qs.order_by('-job__pk')[0]
except IndexError:
jhs = None
update_fields = []
try:
last_job = jhs.job if jhs else None
except Job.DoesNotExist:
# The job (and its summaries) have already been/are currently being
# deleted, so there's no need to update the host w/ a reference to it
return
if host.last_job != last_job:
host.last_job = last_job
update_fields.append('last_job')
if host.last_job_host_summary != jhs:
host.last_job_host_summary = jhs
update_fields.append('last_job_host_summary')
if update_fields:
host.save(update_fields=update_fields)
@receiver(pre_delete, sender=Job)
def save_host_pks_before_job_delete(sender, **kwargs):
instance = kwargs['instance']
hosts_qs = Host.objects.filter( last_job__pk=instance.pk)
instance._saved_hosts_pks = set(hosts_qs.values_list('pk', flat=True))
@receiver(post_delete, sender=Job)
def update_host_last_job_after_job_deleted(sender, **kwargs):
instance = kwargs['instance']
hosts_pks = getattr(instance, '_saved_hosts_pks', [])
for host in Host.objects.filter(pk__in=hosts_pks):
_update_host_last_jhs(host)
# Set via ActivityStreamRegistrar to record activity stream events
class ActivityStreamEnabled(threading.local):
def __init__(self):
self.enabled = True
def __bool__(self):
return bool(self.enabled and getattr(settings, 'ACTIVITY_STREAM_ENABLED', True))
activity_stream_enabled = ActivityStreamEnabled()
@contextlib.contextmanager
def disable_activity_stream():
'''
Context manager to disable capturing activity stream changes.
'''
try:
previous_value = activity_stream_enabled.enabled
activity_stream_enabled.enabled = False
yield
finally:
activity_stream_enabled.enabled = previous_value
@contextlib.contextmanager
def disable_computed_fields():
post_save.disconnect(emit_update_inventory_on_created_or_deleted, sender=Host)
post_delete.disconnect(emit_update_inventory_on_created_or_deleted, sender=Host)
post_save.disconnect(emit_update_inventory_on_created_or_deleted, sender=Group)
post_delete.disconnect(emit_update_inventory_on_created_or_deleted, sender=Group)
post_save.disconnect(emit_update_inventory_on_created_or_deleted, sender=InventorySource)
post_delete.disconnect(emit_update_inventory_on_created_or_deleted, sender=InventorySource)
post_save.disconnect(emit_update_inventory_on_created_or_deleted, sender=Job)
post_delete.disconnect(emit_update_inventory_on_created_or_deleted, sender=Job)
yield
connect_computed_field_signals()
def model_serializer_mapping():
from awx.api import serializers
from awx.main import models
from awx.conf.models import Setting
from awx.conf.serializers import SettingSerializer
return {
Setting: SettingSerializer,
models.User: serializers.UserActivityStreamSerializer,
models.Organization: serializers.OrganizationSerializer,
models.Inventory: serializers.InventorySerializer,
models.Host: serializers.HostSerializer,
models.Group: serializers.GroupSerializer,
models.InstanceGroup: serializers.InstanceGroupSerializer,
models.InventorySource: serializers.InventorySourceSerializer,
models.CustomInventoryScript: serializers.CustomInventoryScriptSerializer,
models.Credential: serializers.CredentialSerializer,
models.Team: serializers.TeamSerializer,
models.Project: serializers.ProjectSerializer,
models.JobTemplate: serializers.JobTemplateWithSpecSerializer,
models.Job: serializers.JobSerializer,
models.AdHocCommand: serializers.AdHocCommandSerializer,
models.NotificationTemplate: serializers.NotificationTemplateSerializer,
models.Notification: serializers.NotificationSerializer,
models.CredentialType: serializers.CredentialTypeSerializer,
models.Schedule: serializers.ScheduleSerializer,
models.Label: serializers.LabelSerializer,
models.WorkflowJobTemplate: serializers.WorkflowJobTemplateWithSpecSerializer,
models.WorkflowJobTemplateNode: serializers.WorkflowJobTemplateNodeSerializer,
models.WorkflowApproval: serializers.WorkflowApprovalActivityStreamSerializer,
models.WorkflowApprovalTemplate: serializers.WorkflowApprovalTemplateSerializer,
models.WorkflowJob: serializers.WorkflowJobSerializer,
models.OAuth2AccessToken: serializers.OAuth2TokenSerializer,
models.OAuth2Application: serializers.OAuth2ApplicationSerializer,
}
def emit_activity_stream_change(instance):
if 'migrate' in sys.argv:
# don't emit activity stream external logs during migrations, it
# could be really noisy
return
from awx.api.serializers import ActivityStreamSerializer
actor = None
if instance.actor:
actor = instance.actor.username
summary_fields = ActivityStreamSerializer(instance).get_summary_fields(instance)
analytics_logger.info('Activity Stream update entry for %s' % str(instance.object1),
extra=dict(changes=instance.changes, relationship=instance.object_relationship_type,
actor=actor, operation=instance.operation,
object1=instance.object1, object2=instance.object2, summary_fields=summary_fields))
def activity_stream_create(sender, instance, created, **kwargs):
if created and activity_stream_enabled:
_type = type(instance)
if getattr(_type, '_deferred', False):
return
object1 = camelcase_to_underscore(instance.__class__.__name__)
changes = model_to_dict(instance, model_serializer_mapping())
# Special case where Job survey password variables need to be hidden
if type(instance) == Job:
changes['credentials'] = [
'{} ({})'.format(c.name, c.id)
for c in instance.credentials.iterator()
]
changes['labels'] = [label.name for label in instance.labels.iterator()]
if 'extra_vars' in changes:
changes['extra_vars'] = instance.display_extra_vars()
if type(instance) == OAuth2AccessToken:
changes['token'] = CENSOR_VALUE
activity_entry = get_activity_stream_class()(
operation='create',
object1=object1,
changes=json.dumps(changes),
actor=get_current_user_or_none())
#TODO: Weird situation where cascade SETNULL doesn't work
# it might actually be a good idea to remove all of these FK references since
# we don't really use them anyway.
if instance._meta.model_name != 'setting': # Is not conf.Setting instance
activity_entry.save()
getattr(activity_entry, object1).add(instance.pk)
else:
activity_entry.setting = conf_to_dict(instance)
activity_entry.save()
connection.on_commit(
lambda: emit_activity_stream_change(activity_entry)
)
def activity_stream_update(sender, instance, **kwargs):
if instance.id is None:
return
if not activity_stream_enabled:
return
try:
old = sender.objects.get(id=instance.id)
except sender.DoesNotExist:
return
new = instance
changes = model_instance_diff(old, new, model_serializer_mapping())
if changes is None:
return
_type = type(instance)
if getattr(_type, '_deferred', False):
return
object1 = camelcase_to_underscore(instance.__class__.__name__)
activity_entry = get_activity_stream_class()(
operation='update',
object1=object1,
changes=json.dumps(changes),
actor=get_current_user_or_none())
if instance._meta.model_name != 'setting': # Is not conf.Setting instance
activity_entry.save()
getattr(activity_entry, object1).add(instance.pk)
else:
activity_entry.setting = conf_to_dict(instance)
activity_entry.save()
connection.on_commit(
lambda: emit_activity_stream_change(activity_entry)
)
def activity_stream_delete(sender, instance, **kwargs):
if not activity_stream_enabled:
return
# Inventory delete happens in the task system rather than request-response-cycle.
# If we trigger this handler there we may fall into db-integrity-related race conditions.
# So we add flag verification to prevent normal signal handling. This funciton will be
# explicitly called with flag on in Inventory.schedule_deletion.
changes = {}
if isinstance(instance, Inventory):
if not kwargs.get('inventory_delete_flag', False):
return
# Add additional data about child hosts / groups that will be deleted
changes['coalesced_data'] = {
'hosts_deleted': instance.hosts.count(),
'groups_deleted': instance.groups.count()
}
elif isinstance(instance, (Host, Group)) and instance.inventory.pending_deletion:
return # accounted for by inventory entry, above
_type = type(instance)
if getattr(_type, '_deferred', False):
return
changes.update(model_to_dict(instance, model_serializer_mapping()))
object1 = camelcase_to_underscore(instance.__class__.__name__)
if type(instance) == OAuth2AccessToken:
changes['token'] = CENSOR_VALUE
activity_entry = get_activity_stream_class()(
operation='delete',
changes=json.dumps(changes),
object1=object1,
actor=get_current_user_or_none())
activity_entry.save()
connection.on_commit(
lambda: emit_activity_stream_change(activity_entry)
)
def activity_stream_associate(sender, instance, **kwargs):
if not activity_stream_enabled:
return
if kwargs['action'] in ['pre_add', 'pre_remove']:
if kwargs['action'] == 'pre_add':
action = 'associate'
elif kwargs['action'] == 'pre_remove':
action = 'disassociate'
else:
return
obj1 = instance
_type = type(instance)
if getattr(_type, '_deferred', False):
return
object1=camelcase_to_underscore(obj1.__class__.__name__)
obj_rel = sender.__module__ + "." + sender.__name__
for entity_acted in kwargs['pk_set']:
obj2 = kwargs['model']
obj2_id = entity_acted
obj2_actual = obj2.objects.filter(id=obj2_id)
if not obj2_actual.exists():
continue
obj2_actual = obj2_actual[0]
_type = type(obj2_actual)
if getattr(_type, '_deferred', False):
return
if isinstance(obj2_actual, Role) and obj2_actual.content_object is not None:
obj2_actual = obj2_actual.content_object
object2 = camelcase_to_underscore(obj2_actual.__class__.__name__)
else:
object2 = camelcase_to_underscore(obj2.__name__)
# Skip recording any inventory source, or system job template changes here.
if isinstance(obj1, InventorySource) or isinstance(obj2_actual, InventorySource):
continue
if isinstance(obj1, SystemJobTemplate) or isinstance(obj2_actual, SystemJobTemplate):
continue
if isinstance(obj1, SystemJob) or isinstance(obj2_actual, SystemJob):
continue
activity_entry = get_activity_stream_class()(
changes=json.dumps(dict(object1=object1,
object1_pk=obj1.pk,
object2=object2,
object2_pk=obj2_id,
action=action,
relationship=obj_rel)),
operation=action,
object1=object1,
object2=object2,
object_relationship_type=obj_rel,
actor=get_current_user_or_none())
activity_entry.save()
getattr(activity_entry, object1).add(obj1.pk)
getattr(activity_entry, object2).add(obj2_actual.pk)
# Record the role for RBAC changes
if 'role' in kwargs:
role = kwargs['role']
if role.content_object is not None:
obj_rel = '.'.join([role.content_object.__module__,
role.content_object.__class__.__name__,
role.role_field])
# If the m2m is from the User side we need to
# set the content_object of the Role for our entry.
if type(instance) == User and role.content_object is not None:
getattr(activity_entry, role.content_type.name.replace(' ', '_')).add(role.content_object)
activity_entry.role.add(role)
activity_entry.object_relationship_type = obj_rel
activity_entry.save()
connection.on_commit(
lambda: emit_activity_stream_change(activity_entry)
)
@receiver(current_user_getter)
def get_current_user_from_drf_request(sender, **kwargs):
'''
Provider a signal handler to return the current user from the current
request when using Django REST Framework. Requires that the APIView set
drf_request on the underlying Django Request object.
'''
request = get_current_request()
drf_request_user = getattr(request, 'drf_request_user', False)
return (drf_request_user, 0)
@receiver(pre_delete, sender=Organization)
def delete_inventory_for_org(sender, instance, **kwargs):
inventories = Inventory.objects.filter(organization__pk=instance.pk)
user = get_current_user_or_none()
for inventory in inventories:
try:
inventory.schedule_deletion(user_id=getattr(user, 'id', None))
except RuntimeError as e:
logger.debug(e)
@receiver(pre_delete, sender=WorkflowJobTemplateNode)
def delete_approval_templates(sender, instance, **kwargs):
if type(instance.unified_job_template) is WorkflowApprovalTemplate:
instance.unified_job_template.delete()
@receiver(pre_save, sender=WorkflowJobTemplateNode)
def delete_approval_node_type_change(sender, instance, **kwargs):
try:
old = WorkflowJobTemplateNode.objects.get(id=instance.id)
except sender.DoesNotExist:
return
if old.unified_job_template == instance.unified_job_template:
return
if type(old.unified_job_template) is WorkflowApprovalTemplate:
old.unified_job_template.delete()
@receiver(pre_delete, sender=WorkflowApprovalTemplate)
def deny_orphaned_approvals(sender, instance, **kwargs):
for approval in WorkflowApproval.objects.filter(workflow_approval_template=instance, status='pending'):
approval.deny()
@receiver(post_save, sender=Session)
def save_user_session_membership(sender, **kwargs):
session = kwargs.get('instance', None)
if not session:
return
user_id = session.get_decoded().get(SESSION_KEY, None)
if not user_id:
return
if UserSessionMembership.objects.filter(user=user_id, session=session).exists():
return
# check if user_id from session has an id match in User before saving
if User.objects.filter(id=int(user_id)).exists():
UserSessionMembership(user_id=user_id, session=session, created=timezone.now()).save()
expired = UserSessionMembership.get_memberships_over_limit(user_id)
for membership in expired:
Session.objects.filter(session_key__in=[membership.session_id]).delete()
membership.delete()
if len(expired):
consumers.emit_channel_notification(
'control-limit_reached_{}'.format(user_id),
dict(group_name='control', reason='limit_reached')
)
@receiver(post_save, sender=OAuth2AccessToken)
def create_access_token_user_if_missing(sender, **kwargs):
obj = kwargs['instance']
if obj.application and obj.application.user:
obj.user = obj.application.user
post_save.disconnect(create_access_token_user_if_missing, sender=OAuth2AccessToken)
obj.save()
post_save.connect(create_access_token_user_if_missing, sender=OAuth2AccessToken)
# Connect the Instance Group to Activity Stream receivers.
post_save.connect(activity_stream_create, sender=InstanceGroup, dispatch_uid=str(InstanceGroup) + "_create")
pre_save.connect(activity_stream_update, sender=InstanceGroup, dispatch_uid=str(InstanceGroup) + "_update")
pre_delete.connect(activity_stream_delete, sender=InstanceGroup, dispatch_uid=str(InstanceGroup) + "_delete")