3093 lines
138 KiB
Python
3093 lines
138 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
# Copyright (c) 2015 Ansible, Inc.
|
|
# All Rights Reserved.
|
|
|
|
# Python
|
|
from collections import OrderedDict, namedtuple
|
|
import errno
|
|
import functools
|
|
import importlib
|
|
import json
|
|
import logging
|
|
import os
|
|
import shutil
|
|
import stat
|
|
import tempfile
|
|
import time
|
|
import traceback
|
|
from distutils.dir_util import copy_tree
|
|
from distutils.version import LooseVersion as Version
|
|
import yaml
|
|
import fcntl
|
|
from pathlib import Path
|
|
from uuid import uuid4
|
|
import urllib.parse as urlparse
|
|
|
|
# Django
|
|
from django.conf import settings
|
|
from django.db import transaction, DatabaseError, IntegrityError, ProgrammingError, connection
|
|
from django.db.models.fields.related import ForeignKey
|
|
from django.utils.timezone import now, timedelta
|
|
from django.utils.encoding import smart_str
|
|
from django.contrib.auth.models import User
|
|
from django.utils.translation import ugettext_lazy as _, gettext_noop
|
|
from django.core.cache import cache
|
|
from django.core.exceptions import ObjectDoesNotExist
|
|
|
|
# Kubernetes
|
|
from kubernetes.client.rest import ApiException
|
|
|
|
# Django-CRUM
|
|
from crum import impersonate
|
|
|
|
# GitPython
|
|
import git
|
|
from gitdb.exc import BadName as BadGitName
|
|
|
|
# Runner
|
|
import ansible_runner
|
|
|
|
# AWX
|
|
from awx import __version__ as awx_application_version
|
|
from awx.main.constants import PRIVILEGE_ESCALATION_METHODS, STANDARD_INVENTORY_UPDATE_ENV
|
|
from awx.main.access import access_registry
|
|
from awx.main.analytics import all_collectors, expensive_collectors
|
|
from awx.main.redact import UriCleaner
|
|
from awx.main.models import (
|
|
Schedule, TowerScheduleState, Instance, InstanceGroup,
|
|
UnifiedJob, Notification,
|
|
Inventory, InventorySource, SmartInventoryMembership,
|
|
Job, AdHocCommand, ProjectUpdate, InventoryUpdate, SystemJob,
|
|
JobEvent, ProjectUpdateEvent, InventoryUpdateEvent, AdHocCommandEvent, SystemJobEvent,
|
|
build_safe_env
|
|
)
|
|
from awx.main.constants import ACTIVE_STATES
|
|
from awx.main.exceptions import AwxTaskError, PostRunError
|
|
from awx.main.queue import CallbackQueueDispatcher
|
|
from awx.main.isolated import manager as isolated_manager
|
|
from awx.main.dispatch.publish import task
|
|
from awx.main.dispatch import get_local_queuename, reaper
|
|
from awx.main.utils import (update_scm_url,
|
|
ignore_inventory_computed_fields,
|
|
ignore_inventory_group_removal, extract_ansible_vars, schedule_task_manager,
|
|
get_awx_version)
|
|
from awx.main.utils.ansible import read_ansible_config
|
|
from awx.main.utils.common import get_custom_venv_choices
|
|
from awx.main.utils.external_logging import reconfigure_rsyslog
|
|
from awx.main.utils.safe_yaml import safe_dump, sanitize_jinja
|
|
from awx.main.utils.reload import stop_local_services
|
|
from awx.main.utils.pglock import advisory_lock
|
|
from awx.main.utils.handlers import SpecialInventoryHandler
|
|
from awx.main.consumers import emit_channel_notification
|
|
from awx.main import analytics
|
|
from awx.conf import settings_registry
|
|
from awx.conf.license import get_license
|
|
|
|
from rest_framework.exceptions import PermissionDenied
|
|
|
|
|
|
__all__ = ['RunJob', 'RunSystemJob', 'RunProjectUpdate', 'RunInventoryUpdate',
|
|
'RunAdHocCommand', 'handle_work_error', 'handle_work_success', 'apply_cluster_membership_policies',
|
|
'update_inventory_computed_fields', 'update_host_smart_inventory_memberships',
|
|
'send_notifications', 'purge_old_stdout_files']
|
|
|
|
HIDDEN_PASSWORD = '**********'
|
|
|
|
OPENSSH_KEY_ERROR = u'''\
|
|
It looks like you're trying to use a private key in OpenSSH format, which \
|
|
isn't supported by the installed version of OpenSSH on this instance. \
|
|
Try upgrading OpenSSH or providing your private key in an different format. \
|
|
'''
|
|
|
|
logger = logging.getLogger('awx.main.tasks')
|
|
|
|
|
|
class InvalidVirtualenvError(Exception):
|
|
|
|
def __init__(self, message):
|
|
self.message = message
|
|
|
|
|
|
def dispatch_startup():
|
|
startup_logger = logging.getLogger('awx.main.tasks')
|
|
startup_logger.debug("Syncing Schedules")
|
|
for sch in Schedule.objects.all():
|
|
try:
|
|
sch.update_computed_fields()
|
|
except Exception:
|
|
logger.exception("Failed to rebuild schedule {}.".format(sch))
|
|
|
|
#
|
|
# When the dispatcher starts, if the instance cannot be found in the database,
|
|
# automatically register it. This is mostly useful for openshift-based
|
|
# deployments where:
|
|
#
|
|
# 2 Instances come online
|
|
# Instance B encounters a network blip, Instance A notices, and
|
|
# deprovisions it
|
|
# Instance B's connectivity is restored, the dispatcher starts, and it
|
|
# re-registers itself
|
|
#
|
|
# In traditional container-less deployments, instances don't get
|
|
# deprovisioned when they miss their heartbeat, so this code is mostly a
|
|
# no-op.
|
|
#
|
|
apply_cluster_membership_policies()
|
|
cluster_node_heartbeat()
|
|
if Instance.objects.me().is_controller():
|
|
awx_isolated_heartbeat()
|
|
|
|
# Update Tower's rsyslog.conf file based on loggins settings in the db
|
|
reconfigure_rsyslog()
|
|
|
|
|
|
def inform_cluster_of_shutdown():
|
|
try:
|
|
this_inst = Instance.objects.get(hostname=settings.CLUSTER_HOST_ID)
|
|
this_inst.capacity = 0 # No thank you to new jobs while shut down
|
|
this_inst.save(update_fields=['capacity', 'modified'])
|
|
try:
|
|
reaper.reap(this_inst)
|
|
except Exception:
|
|
logger.exception('failed to reap jobs for {}'.format(this_inst.hostname))
|
|
logger.warning('Normal shutdown signal for instance {}, '
|
|
'removed self from capacity pool.'.format(this_inst.hostname))
|
|
except Exception:
|
|
logger.exception('Encountered problem with normal shutdown signal.')
|
|
|
|
|
|
@task(queue=get_local_queuename)
|
|
def apply_cluster_membership_policies():
|
|
started_waiting = time.time()
|
|
with advisory_lock('cluster_policy_lock', wait=True):
|
|
lock_time = time.time() - started_waiting
|
|
if lock_time > 1.0:
|
|
to_log = logger.info
|
|
else:
|
|
to_log = logger.debug
|
|
to_log('Waited {} seconds to obtain lock name: cluster_policy_lock'.format(lock_time))
|
|
started_compute = time.time()
|
|
all_instances = list(Instance.objects.order_by('id'))
|
|
all_groups = list(InstanceGroup.objects.prefetch_related('instances'))
|
|
iso_hostnames = set([])
|
|
for ig in all_groups:
|
|
if ig.controller_id is not None:
|
|
iso_hostnames.update(ig.policy_instance_list)
|
|
|
|
considered_instances = [inst for inst in all_instances if inst.hostname not in iso_hostnames]
|
|
total_instances = len(considered_instances)
|
|
actual_groups = []
|
|
actual_instances = []
|
|
Group = namedtuple('Group', ['obj', 'instances', 'prior_instances'])
|
|
Node = namedtuple('Instance', ['obj', 'groups'])
|
|
|
|
# Process policy instance list first, these will represent manually managed memberships
|
|
instance_hostnames_map = {inst.hostname: inst for inst in all_instances}
|
|
for ig in all_groups:
|
|
group_actual = Group(obj=ig, instances=[], prior_instances=[
|
|
instance.pk for instance in ig.instances.all() # obtained in prefetch
|
|
])
|
|
for hostname in ig.policy_instance_list:
|
|
if hostname not in instance_hostnames_map:
|
|
logger.info("Unknown instance {} in {} policy list".format(hostname, ig.name))
|
|
continue
|
|
inst = instance_hostnames_map[hostname]
|
|
group_actual.instances.append(inst.id)
|
|
# NOTE: arguable behavior: policy-list-group is not added to
|
|
# instance's group count for consideration in minimum-policy rules
|
|
if group_actual.instances:
|
|
logger.debug("Policy List, adding Instances {} to Group {}".format(group_actual.instances, ig.name))
|
|
|
|
if ig.controller_id is None:
|
|
actual_groups.append(group_actual)
|
|
else:
|
|
# For isolated groups, _only_ apply the policy_instance_list
|
|
# do not add to in-memory list, so minimum rules not applied
|
|
logger.debug('Committing instances to isolated group {}'.format(ig.name))
|
|
ig.instances.set(group_actual.instances)
|
|
|
|
# Process Instance minimum policies next, since it represents a concrete lower bound to the
|
|
# number of instances to make available to instance groups
|
|
actual_instances = [Node(obj=i, groups=[]) for i in considered_instances if i.managed_by_policy]
|
|
logger.debug("Total non-isolated instances:{} available for policy: {}".format(
|
|
total_instances, len(actual_instances)))
|
|
for g in sorted(actual_groups, key=lambda x: len(x.instances)):
|
|
policy_min_added = []
|
|
for i in sorted(actual_instances, key=lambda x: len(x.groups)):
|
|
if len(g.instances) >= g.obj.policy_instance_minimum:
|
|
break
|
|
if i.obj.id in g.instances:
|
|
# If the instance is already _in_ the group, it was
|
|
# applied earlier via the policy list
|
|
continue
|
|
g.instances.append(i.obj.id)
|
|
i.groups.append(g.obj.id)
|
|
policy_min_added.append(i.obj.id)
|
|
if policy_min_added:
|
|
logger.debug("Policy minimum, adding Instances {} to Group {}".format(policy_min_added, g.obj.name))
|
|
|
|
# Finally, process instance policy percentages
|
|
for g in sorted(actual_groups, key=lambda x: len(x.instances)):
|
|
policy_per_added = []
|
|
for i in sorted(actual_instances, key=lambda x: len(x.groups)):
|
|
if i.obj.id in g.instances:
|
|
# If the instance is already _in_ the group, it was
|
|
# applied earlier via a minimum policy or policy list
|
|
continue
|
|
if 100 * float(len(g.instances)) / len(actual_instances) >= g.obj.policy_instance_percentage:
|
|
break
|
|
g.instances.append(i.obj.id)
|
|
i.groups.append(g.obj.id)
|
|
policy_per_added.append(i.obj.id)
|
|
if policy_per_added:
|
|
logger.debug("Policy percentage, adding Instances {} to Group {}".format(policy_per_added, g.obj.name))
|
|
|
|
# Determine if any changes need to be made
|
|
needs_change = False
|
|
for g in actual_groups:
|
|
if set(g.instances) != set(g.prior_instances):
|
|
needs_change = True
|
|
break
|
|
if not needs_change:
|
|
logger.debug('Cluster policy no-op finished in {} seconds'.format(time.time() - started_compute))
|
|
return
|
|
|
|
# On a differential basis, apply instances to non-isolated groups
|
|
with transaction.atomic():
|
|
for g in actual_groups:
|
|
if g.obj.is_containerized:
|
|
logger.debug('Skipping containerized group {} for policy calculation'.format(g.obj.name))
|
|
continue
|
|
instances_to_add = set(g.instances) - set(g.prior_instances)
|
|
instances_to_remove = set(g.prior_instances) - set(g.instances)
|
|
if instances_to_add:
|
|
logger.debug('Adding instances {} to group {}'.format(list(instances_to_add), g.obj.name))
|
|
g.obj.instances.add(*instances_to_add)
|
|
if instances_to_remove:
|
|
logger.debug('Removing instances {} from group {}'.format(list(instances_to_remove), g.obj.name))
|
|
g.obj.instances.remove(*instances_to_remove)
|
|
logger.debug('Cluster policy computation finished in {} seconds'.format(time.time() - started_compute))
|
|
|
|
|
|
@task(queue='tower_broadcast_all')
|
|
def handle_setting_changes(setting_keys):
|
|
orig_len = len(setting_keys)
|
|
for i in range(orig_len):
|
|
for dependent_key in settings_registry.get_dependent_settings(setting_keys[i]):
|
|
setting_keys.append(dependent_key)
|
|
cache_keys = set(setting_keys)
|
|
logger.debug('cache delete_many(%r)', cache_keys)
|
|
cache.delete_many(cache_keys)
|
|
|
|
if any([
|
|
setting.startswith('LOG_AGGREGATOR')
|
|
for setting in setting_keys
|
|
]):
|
|
reconfigure_rsyslog()
|
|
|
|
|
|
@task(queue='tower_broadcast_all')
|
|
def delete_project_files(project_path):
|
|
# TODO: possibly implement some retry logic
|
|
lock_file = project_path + '.lock'
|
|
if os.path.exists(project_path):
|
|
try:
|
|
shutil.rmtree(project_path)
|
|
logger.debug('Success removing project files {}'.format(project_path))
|
|
except Exception:
|
|
logger.exception('Could not remove project directory {}'.format(project_path))
|
|
if os.path.exists(lock_file):
|
|
try:
|
|
os.remove(lock_file)
|
|
logger.debug('Success removing {}'.format(lock_file))
|
|
except Exception:
|
|
logger.exception('Could not remove lock file {}'.format(lock_file))
|
|
|
|
|
|
@task(queue='tower_broadcast_all')
|
|
def profile_sql(threshold=1, minutes=1):
|
|
if threshold <= 0:
|
|
cache.delete('awx-profile-sql-threshold')
|
|
logger.error('SQL PROFILING DISABLED')
|
|
else:
|
|
cache.set(
|
|
'awx-profile-sql-threshold',
|
|
threshold,
|
|
timeout=minutes * 60
|
|
)
|
|
logger.error('SQL QUERIES >={}s ENABLED FOR {} MINUTE(S)'.format(threshold, minutes))
|
|
|
|
|
|
@task(queue=get_local_queuename)
|
|
def send_notifications(notification_list, job_id=None):
|
|
if not isinstance(notification_list, list):
|
|
raise TypeError("notification_list should be of type list")
|
|
if job_id is not None:
|
|
job_actual = UnifiedJob.objects.get(id=job_id)
|
|
|
|
notifications = Notification.objects.filter(id__in=notification_list)
|
|
if job_id is not None:
|
|
job_actual.notifications.add(*notifications)
|
|
|
|
for notification in notifications:
|
|
update_fields = ['status', 'notifications_sent']
|
|
try:
|
|
sent = notification.notification_template.send(notification.subject, notification.body)
|
|
notification.status = "successful"
|
|
notification.notifications_sent = sent
|
|
except Exception as e:
|
|
logger.exception("Send Notification Failed {}".format(e))
|
|
notification.status = "failed"
|
|
notification.error = smart_str(e)
|
|
update_fields.append('error')
|
|
finally:
|
|
try:
|
|
notification.save(update_fields=update_fields)
|
|
except Exception:
|
|
logger.exception('Error saving notification {} result.'.format(notification.id))
|
|
|
|
|
|
@task(queue=get_local_queuename)
|
|
def gather_analytics():
|
|
def _gather_and_ship(subset, since, until):
|
|
tgzfiles = []
|
|
try:
|
|
tgzfiles = analytics.gather(subset=subset, since=since, until=until)
|
|
# empty analytics without raising an exception is not an error
|
|
if not tgzfiles:
|
|
return True
|
|
logger.info('Gathered analytics from {} to {}: {}'.format(since, until, tgzfiles))
|
|
for tgz in tgzfiles:
|
|
analytics.ship(tgz)
|
|
except Exception:
|
|
logger.exception('Error gathering and sending analytics for {} to {}.'.format(since,until))
|
|
return False
|
|
finally:
|
|
if tgzfiles:
|
|
for tgz in tgzfiles:
|
|
if os.path.exists(tgz):
|
|
os.remove(tgz)
|
|
return True
|
|
|
|
from awx.conf.models import Setting
|
|
from rest_framework.fields import DateTimeField
|
|
from awx.main.signals import disable_activity_stream
|
|
if not settings.INSIGHTS_TRACKING_STATE:
|
|
return
|
|
if not (settings.AUTOMATION_ANALYTICS_URL and settings.REDHAT_USERNAME and settings.REDHAT_PASSWORD):
|
|
logger.debug('Not gathering analytics, configuration is invalid')
|
|
return
|
|
last_gather = Setting.objects.filter(key='AUTOMATION_ANALYTICS_LAST_GATHER').first()
|
|
if last_gather:
|
|
last_time = DateTimeField().to_internal_value(last_gather.value)
|
|
else:
|
|
last_time = None
|
|
gather_time = now()
|
|
if not last_time or ((gather_time - last_time).total_seconds() > settings.AUTOMATION_ANALYTICS_GATHER_INTERVAL):
|
|
with advisory_lock('gather_analytics_lock', wait=False) as acquired:
|
|
if acquired is False:
|
|
logger.debug('Not gathering analytics, another task holds lock')
|
|
return
|
|
subset = list(all_collectors().keys())
|
|
incremental_collectors = []
|
|
for collector in expensive_collectors():
|
|
if collector in subset:
|
|
subset.remove(collector)
|
|
incremental_collectors.append(collector)
|
|
|
|
# Cap gathering at 4 weeks of data if there has been no data gathering
|
|
since = last_time or (gather_time - timedelta(weeks=4))
|
|
|
|
if incremental_collectors:
|
|
start = since
|
|
until = None
|
|
while start < gather_time:
|
|
until = start + timedelta(hours = 4)
|
|
if (until > gather_time):
|
|
until = gather_time
|
|
if not _gather_and_ship(incremental_collectors, since=start, until=until):
|
|
break
|
|
start = until
|
|
with disable_activity_stream():
|
|
settings.AUTOMATION_ANALYTICS_LAST_GATHER = until
|
|
if subset:
|
|
_gather_and_ship(subset, since=since, until=gather_time)
|
|
|
|
|
|
@task(queue=get_local_queuename)
|
|
def purge_old_stdout_files():
|
|
nowtime = time.time()
|
|
for f in os.listdir(settings.JOBOUTPUT_ROOT):
|
|
if os.path.getctime(os.path.join(settings.JOBOUTPUT_ROOT,f)) < nowtime - settings.LOCAL_STDOUT_EXPIRE_TIME:
|
|
os.unlink(os.path.join(settings.JOBOUTPUT_ROOT,f))
|
|
logger.debug("Removing {}".format(os.path.join(settings.JOBOUTPUT_ROOT,f)))
|
|
|
|
|
|
@task(queue=get_local_queuename)
|
|
def cluster_node_heartbeat():
|
|
logger.debug("Cluster node heartbeat task.")
|
|
nowtime = now()
|
|
instance_list = list(Instance.objects.all_non_isolated())
|
|
this_inst = None
|
|
lost_instances = []
|
|
|
|
(changed, instance) = Instance.objects.get_or_register()
|
|
if changed:
|
|
logger.info("Registered tower node '{}'".format(instance.hostname))
|
|
|
|
for inst in list(instance_list):
|
|
if inst.hostname == settings.CLUSTER_HOST_ID:
|
|
this_inst = inst
|
|
instance_list.remove(inst)
|
|
elif inst.is_lost(ref_time=nowtime):
|
|
lost_instances.append(inst)
|
|
instance_list.remove(inst)
|
|
if this_inst:
|
|
startup_event = this_inst.is_lost(ref_time=nowtime)
|
|
this_inst.refresh_capacity()
|
|
if startup_event:
|
|
logger.warning('Rejoining the cluster as instance {}.'.format(this_inst.hostname))
|
|
return
|
|
else:
|
|
raise RuntimeError("Cluster Host Not Found: {}".format(settings.CLUSTER_HOST_ID))
|
|
# IFF any node has a greater version than we do, then we'll shutdown services
|
|
for other_inst in instance_list:
|
|
if other_inst.version == "":
|
|
continue
|
|
if Version(other_inst.version.split('-', 1)[0]) > Version(awx_application_version.split('-', 1)[0]) and not settings.DEBUG:
|
|
logger.error("Host {} reports version {}, but this node {} is at {}, shutting down".format(
|
|
other_inst.hostname,
|
|
other_inst.version,
|
|
this_inst.hostname,
|
|
this_inst.version
|
|
))
|
|
# Shutdown signal will set the capacity to zero to ensure no Jobs get added to this instance.
|
|
# The heartbeat task will reset the capacity to the system capacity after upgrade.
|
|
stop_local_services(communicate=False)
|
|
raise RuntimeError("Shutting down.")
|
|
for other_inst in lost_instances:
|
|
try:
|
|
reaper.reap(other_inst)
|
|
except Exception:
|
|
logger.exception('failed to reap jobs for {}'.format(other_inst.hostname))
|
|
try:
|
|
# Capacity could already be 0 because:
|
|
# * It's a new node and it never had a heartbeat
|
|
# * It was set to 0 by another tower node running this method
|
|
# * It was set to 0 by this node, but auto deprovisioning is off
|
|
#
|
|
# If auto deprovisining is on, don't bother setting the capacity to 0
|
|
# since we will delete the node anyway.
|
|
if other_inst.capacity != 0 and not settings.AWX_AUTO_DEPROVISION_INSTANCES:
|
|
other_inst.capacity = 0
|
|
other_inst.save(update_fields=['capacity'])
|
|
logger.error("Host {} last checked in at {}, marked as lost.".format(
|
|
other_inst.hostname, other_inst.modified))
|
|
elif settings.AWX_AUTO_DEPROVISION_INSTANCES:
|
|
deprovision_hostname = other_inst.hostname
|
|
other_inst.delete()
|
|
logger.info("Host {} Automatically Deprovisioned.".format(deprovision_hostname))
|
|
except DatabaseError as e:
|
|
if 'did not affect any rows' in str(e):
|
|
logger.debug('Another instance has marked {} as lost'.format(other_inst.hostname))
|
|
else:
|
|
logger.exception('Error marking {} as lost'.format(other_inst.hostname))
|
|
|
|
|
|
@task(queue=get_local_queuename)
|
|
def awx_k8s_reaper():
|
|
from awx.main.scheduler.kubernetes import PodManager # prevent circular import
|
|
for group in InstanceGroup.objects.filter(credential__isnull=False).iterator():
|
|
if group.is_containerized:
|
|
logger.debug("Checking for orphaned k8s pods for {}.".format(group))
|
|
for job in UnifiedJob.objects.filter(
|
|
pk__in=list(PodManager.list_active_jobs(group))
|
|
).exclude(status__in=ACTIVE_STATES):
|
|
logger.debug('{} is no longer active, reaping orphaned k8s pod'.format(job.log_format))
|
|
try:
|
|
PodManager(job).delete()
|
|
except Exception:
|
|
logger.exception("Failed to delete orphaned pod {} from {}".format(
|
|
job.log_format, group
|
|
))
|
|
|
|
|
|
|
|
@task(queue=get_local_queuename)
|
|
def awx_isolated_heartbeat():
|
|
local_hostname = settings.CLUSTER_HOST_ID
|
|
logger.debug("Controlling node checking for any isolated management tasks.")
|
|
poll_interval = settings.AWX_ISOLATED_PERIODIC_CHECK
|
|
# Get isolated instances not checked since poll interval - some buffer
|
|
nowtime = now()
|
|
accept_before = nowtime - timedelta(seconds=(poll_interval - 10))
|
|
isolated_instance_qs = Instance.objects.filter(
|
|
rampart_groups__controller__instances__hostname=local_hostname,
|
|
)
|
|
isolated_instance_qs = isolated_instance_qs.filter(
|
|
last_isolated_check__lt=accept_before
|
|
) | isolated_instance_qs.filter(
|
|
last_isolated_check=None
|
|
)
|
|
# Fast pass of isolated instances, claiming the nodes to update
|
|
with transaction.atomic():
|
|
for isolated_instance in isolated_instance_qs:
|
|
isolated_instance.last_isolated_check = nowtime
|
|
# Prevent modified time from being changed, as in normal heartbeat
|
|
isolated_instance.save(update_fields=['last_isolated_check'])
|
|
# Slow pass looping over isolated IGs and their isolated instances
|
|
if len(isolated_instance_qs) > 0:
|
|
logger.debug("Managing isolated instances {}.".format(','.join([inst.hostname for inst in isolated_instance_qs])))
|
|
isolated_manager.IsolatedManager(CallbackQueueDispatcher.dispatch).health_check(isolated_instance_qs)
|
|
|
|
|
|
@task(queue=get_local_queuename)
|
|
def awx_periodic_scheduler():
|
|
with advisory_lock('awx_periodic_scheduler_lock', wait=False) as acquired:
|
|
if acquired is False:
|
|
logger.debug("Not running periodic scheduler, another task holds lock")
|
|
return
|
|
logger.debug("Starting periodic scheduler")
|
|
|
|
run_now = now()
|
|
state = TowerScheduleState.get_solo()
|
|
last_run = state.schedule_last_run
|
|
logger.debug("Last scheduler run was: %s", last_run)
|
|
state.schedule_last_run = run_now
|
|
state.save()
|
|
|
|
old_schedules = Schedule.objects.enabled().before(last_run)
|
|
for schedule in old_schedules:
|
|
schedule.update_computed_fields()
|
|
schedules = Schedule.objects.enabled().between(last_run, run_now)
|
|
|
|
invalid_license = False
|
|
try:
|
|
access_registry[Job](None).check_license(quiet=True)
|
|
except PermissionDenied as e:
|
|
invalid_license = e
|
|
|
|
for schedule in schedules:
|
|
template = schedule.unified_job_template
|
|
schedule.update_computed_fields() # To update next_run timestamp.
|
|
if template.cache_timeout_blocked:
|
|
logger.warn("Cache timeout is in the future, bypassing schedule for template %s" % str(template.id))
|
|
continue
|
|
try:
|
|
job_kwargs = schedule.get_job_kwargs()
|
|
new_unified_job = schedule.unified_job_template.create_unified_job(**job_kwargs)
|
|
logger.debug('Spawned {} from schedule {}-{}.'.format(
|
|
new_unified_job.log_format, schedule.name, schedule.pk))
|
|
|
|
if invalid_license:
|
|
new_unified_job.status = 'failed'
|
|
new_unified_job.job_explanation = str(invalid_license)
|
|
new_unified_job.save(update_fields=['status', 'job_explanation'])
|
|
new_unified_job.websocket_emit_status("failed")
|
|
raise invalid_license
|
|
can_start = new_unified_job.signal_start()
|
|
except Exception:
|
|
logger.exception('Error spawning scheduled job.')
|
|
continue
|
|
if not can_start:
|
|
new_unified_job.status = 'failed'
|
|
new_unified_job.job_explanation = gettext_noop("Scheduled job could not start because it \
|
|
was not in the right state or required manual credentials")
|
|
new_unified_job.save(update_fields=['status', 'job_explanation'])
|
|
new_unified_job.websocket_emit_status("failed")
|
|
emit_channel_notification('schedules-changed', dict(id=schedule.id, group_name="schedules"))
|
|
state.save()
|
|
|
|
|
|
@task(queue=get_local_queuename)
|
|
def handle_work_success(task_actual):
|
|
try:
|
|
instance = UnifiedJob.get_instance_by_type(task_actual['type'], task_actual['id'])
|
|
except ObjectDoesNotExist:
|
|
logger.warning('Missing {} `{}` in success callback.'.format(task_actual['type'], task_actual['id']))
|
|
return
|
|
if not instance:
|
|
return
|
|
|
|
schedule_task_manager()
|
|
|
|
|
|
@task(queue=get_local_queuename)
|
|
def handle_work_error(task_id, *args, **kwargs):
|
|
subtasks = kwargs.get('subtasks', None)
|
|
logger.debug('Executing error task id %s, subtasks: %s' % (task_id, str(subtasks)))
|
|
first_instance = None
|
|
first_instance_type = ''
|
|
if subtasks is not None:
|
|
for each_task in subtasks:
|
|
try:
|
|
instance = UnifiedJob.get_instance_by_type(each_task['type'], each_task['id'])
|
|
if not instance:
|
|
# Unknown task type
|
|
logger.warn("Unknown task type: {}".format(each_task['type']))
|
|
continue
|
|
except ObjectDoesNotExist:
|
|
logger.warning('Missing {} `{}` in error callback.'.format(each_task['type'], each_task['id']))
|
|
continue
|
|
|
|
if first_instance is None:
|
|
first_instance = instance
|
|
first_instance_type = each_task['type']
|
|
|
|
if instance.celery_task_id != task_id and not instance.cancel_flag and not instance.status == 'successful':
|
|
instance.status = 'failed'
|
|
instance.failed = True
|
|
if not instance.job_explanation:
|
|
instance.job_explanation = 'Previous Task Failed: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}' % \
|
|
(first_instance_type, first_instance.name, first_instance.id)
|
|
instance.save()
|
|
instance.websocket_emit_status("failed")
|
|
|
|
# We only send 1 job complete message since all the job completion message
|
|
# handling does is trigger the scheduler. If we extend the functionality of
|
|
# what the job complete message handler does then we may want to send a
|
|
# completion event for each job here.
|
|
if first_instance:
|
|
schedule_task_manager()
|
|
pass
|
|
|
|
|
|
@task(queue=get_local_queuename)
|
|
def handle_success_and_failure_notifications(job_id):
|
|
uj = UnifiedJob.objects.get(pk=job_id)
|
|
retries = 0
|
|
while retries < 5:
|
|
if uj.finished:
|
|
uj.send_notification_templates('succeeded' if uj.status == 'successful' else 'failed')
|
|
return
|
|
else:
|
|
# wait a few seconds to avoid a race where the
|
|
# events are persisted _before_ the UJ.status
|
|
# changes from running -> successful
|
|
retries += 1
|
|
time.sleep(1)
|
|
uj = UnifiedJob.objects.get(pk=job_id)
|
|
|
|
logger.warn(f"Failed to even try to send notifications for job '{uj}' due to job not being in finished state.")
|
|
|
|
|
|
@task(queue=get_local_queuename)
|
|
def update_inventory_computed_fields(inventory_id):
|
|
'''
|
|
Signal handler and wrapper around inventory.update_computed_fields to
|
|
prevent unnecessary recursive calls.
|
|
'''
|
|
i = Inventory.objects.filter(id=inventory_id)
|
|
if not i.exists():
|
|
logger.error("Update Inventory Computed Fields failed due to missing inventory: " + str(inventory_id))
|
|
return
|
|
i = i[0]
|
|
try:
|
|
i.update_computed_fields()
|
|
except DatabaseError as e:
|
|
if 'did not affect any rows' in str(e):
|
|
logger.debug('Exiting duplicate update_inventory_computed_fields task.')
|
|
return
|
|
raise
|
|
|
|
|
|
def update_smart_memberships_for_inventory(smart_inventory):
|
|
current = set(SmartInventoryMembership.objects.filter(inventory=smart_inventory).values_list('host_id', flat=True))
|
|
new = set(smart_inventory.hosts.values_list('id', flat=True))
|
|
additions = new - current
|
|
removals = current - new
|
|
if additions or removals:
|
|
with transaction.atomic():
|
|
if removals:
|
|
SmartInventoryMembership.objects.filter(inventory=smart_inventory, host_id__in=removals).delete()
|
|
if additions:
|
|
add_for_inventory = [
|
|
SmartInventoryMembership(inventory_id=smart_inventory.id, host_id=host_id)
|
|
for host_id in additions
|
|
]
|
|
SmartInventoryMembership.objects.bulk_create(add_for_inventory, ignore_conflicts=True)
|
|
logger.debug('Smart host membership cached for {}, {} additions, {} removals, {} total count.'.format(
|
|
smart_inventory.pk, len(additions), len(removals), len(new)
|
|
))
|
|
return True # changed
|
|
return False
|
|
|
|
|
|
@task(queue=get_local_queuename)
|
|
def update_host_smart_inventory_memberships():
|
|
smart_inventories = Inventory.objects.filter(kind='smart', host_filter__isnull=False, pending_deletion=False)
|
|
changed_inventories = set([])
|
|
for smart_inventory in smart_inventories:
|
|
try:
|
|
changed = update_smart_memberships_for_inventory(smart_inventory)
|
|
if changed:
|
|
changed_inventories.add(smart_inventory)
|
|
except IntegrityError:
|
|
logger.exception('Failed to update smart inventory memberships for {}'.format(smart_inventory.pk))
|
|
# Update computed fields for changed inventories outside atomic action
|
|
for smart_inventory in changed_inventories:
|
|
smart_inventory.update_computed_fields()
|
|
|
|
|
|
@task(queue=get_local_queuename)
|
|
def migrate_legacy_event_data(tblname):
|
|
#
|
|
# NOTE: this function is not actually in use anymore,
|
|
# but has been intentionally kept for historical purposes,
|
|
# and to serve as an illustration if we ever need to perform
|
|
# bulk modification/migration of event data in the future.
|
|
#
|
|
if 'event' not in tblname:
|
|
return
|
|
with advisory_lock(f'bigint_migration_{tblname}', wait=False) as acquired:
|
|
if acquired is False:
|
|
return
|
|
chunk = settings.JOB_EVENT_MIGRATION_CHUNK_SIZE
|
|
|
|
def _remaining():
|
|
try:
|
|
cursor.execute(f'SELECT MAX(id) FROM _old_{tblname};')
|
|
return cursor.fetchone()[0]
|
|
except ProgrammingError:
|
|
# the table is gone (migration is unnecessary)
|
|
return None
|
|
|
|
with connection.cursor() as cursor:
|
|
total_rows = _remaining()
|
|
while total_rows:
|
|
with transaction.atomic():
|
|
cursor.execute(
|
|
f'INSERT INTO {tblname} SELECT * FROM _old_{tblname} ORDER BY id DESC LIMIT {chunk} RETURNING id;'
|
|
)
|
|
last_insert_pk = cursor.fetchone()
|
|
if last_insert_pk is None:
|
|
# this means that the SELECT from the old table was
|
|
# empty, and there was nothing to insert (so we're done)
|
|
break
|
|
last_insert_pk = last_insert_pk[0]
|
|
cursor.execute(
|
|
f'DELETE FROM _old_{tblname} WHERE id IN (SELECT id FROM _old_{tblname} ORDER BY id DESC LIMIT {chunk});'
|
|
)
|
|
logger.warn(
|
|
f'migrated int -> bigint rows to {tblname} from _old_{tblname}; # ({last_insert_pk} rows remaining)'
|
|
)
|
|
|
|
if _remaining() is None:
|
|
cursor.execute(f'DROP TABLE IF EXISTS _old_{tblname}')
|
|
logger.warn(f'{tblname} primary key migration to bigint has finished')
|
|
|
|
|
|
@task(queue=get_local_queuename)
|
|
def delete_inventory(inventory_id, user_id, retries=5):
|
|
# Delete inventory as user
|
|
if user_id is None:
|
|
user = None
|
|
else:
|
|
try:
|
|
user = User.objects.get(id=user_id)
|
|
except Exception:
|
|
user = None
|
|
with ignore_inventory_computed_fields(), ignore_inventory_group_removal(), impersonate(user):
|
|
try:
|
|
i = Inventory.objects.get(id=inventory_id)
|
|
for host in i.hosts.iterator():
|
|
host.job_events_as_primary_host.update(host=None)
|
|
i.delete()
|
|
emit_channel_notification(
|
|
'inventories-status_changed',
|
|
{'group_name': 'inventories', 'inventory_id': inventory_id, 'status': 'deleted'}
|
|
)
|
|
logger.debug('Deleted inventory {} as user {}.'.format(inventory_id, user_id))
|
|
except Inventory.DoesNotExist:
|
|
logger.exception("Delete Inventory failed due to missing inventory: " + str(inventory_id))
|
|
return
|
|
except DatabaseError:
|
|
logger.exception('Database error deleting inventory {}, but will retry.'.format(inventory_id))
|
|
if retries > 0:
|
|
time.sleep(10)
|
|
delete_inventory(inventory_id, user_id, retries=retries - 1)
|
|
|
|
|
|
def with_path_cleanup(f):
|
|
@functools.wraps(f)
|
|
def _wrapped(self, *args, **kwargs):
|
|
try:
|
|
return f(self, *args, **kwargs)
|
|
finally:
|
|
for p in self.cleanup_paths:
|
|
try:
|
|
if os.path.isdir(p):
|
|
shutil.rmtree(p, ignore_errors=True)
|
|
elif os.path.exists(p):
|
|
os.remove(p)
|
|
except OSError:
|
|
logger.exception("Failed to remove tmp file: {}".format(p))
|
|
self.cleanup_paths = []
|
|
return _wrapped
|
|
|
|
|
|
class BaseTask(object):
|
|
model = None
|
|
event_model = None
|
|
abstract = True
|
|
proot_show_paths = []
|
|
|
|
def __init__(self):
|
|
self.cleanup_paths = []
|
|
self.parent_workflow_job_id = None
|
|
self.host_map = {}
|
|
|
|
def update_model(self, pk, _attempt=0, **updates):
|
|
"""Reload the model instance from the database and update the
|
|
given fields.
|
|
"""
|
|
try:
|
|
with transaction.atomic():
|
|
# Retrieve the model instance.
|
|
instance = self.model.objects.get(pk=pk)
|
|
|
|
# Update the appropriate fields and save the model
|
|
# instance, then return the new instance.
|
|
if updates:
|
|
update_fields = ['modified']
|
|
for field, value in updates.items():
|
|
setattr(instance, field, value)
|
|
update_fields.append(field)
|
|
if field == 'status':
|
|
update_fields.append('failed')
|
|
instance.save(update_fields=update_fields)
|
|
return instance
|
|
except DatabaseError as e:
|
|
# Log out the error to the debug logger.
|
|
logger.debug('Database error updating %s, retrying in 5 '
|
|
'seconds (retry #%d): %s',
|
|
self.model._meta.object_name, _attempt + 1, e)
|
|
|
|
# Attempt to retry the update, assuming we haven't already
|
|
# tried too many times.
|
|
if _attempt < 5:
|
|
time.sleep(5)
|
|
return self.update_model(
|
|
pk,
|
|
_attempt=_attempt + 1,
|
|
**updates
|
|
)
|
|
else:
|
|
logger.error('Failed to update %s after %d retries.',
|
|
self.model._meta.object_name, _attempt)
|
|
|
|
def get_path_to(self, *args):
|
|
'''
|
|
Return absolute path relative to this file.
|
|
'''
|
|
return os.path.abspath(os.path.join(os.path.dirname(__file__), *args))
|
|
|
|
def build_private_data(self, instance, private_data_dir):
|
|
'''
|
|
Return SSH private key data (only if stored in DB as ssh_key_data).
|
|
Return structure is a dict of the form:
|
|
'''
|
|
|
|
def build_private_data_dir(self, instance):
|
|
'''
|
|
Create a temporary directory for job-related files.
|
|
'''
|
|
bwrap_path = tempfile.mkdtemp(
|
|
prefix=f'bwrap_{instance.pk}_',
|
|
dir=settings.AWX_PROOT_BASE_PATH
|
|
)
|
|
os.chmod(bwrap_path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
|
|
if settings.AWX_CLEANUP_PATHS:
|
|
self.cleanup_paths.append(bwrap_path)
|
|
|
|
path = tempfile.mkdtemp(
|
|
prefix='awx_%s_' % instance.pk,
|
|
dir=bwrap_path,
|
|
)
|
|
os.chmod(path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
|
|
runner_project_folder = os.path.join(path, 'project')
|
|
if not os.path.exists(runner_project_folder):
|
|
# Ansible Runner requires that this directory exists.
|
|
# Specifically, when using process isolation
|
|
os.mkdir(runner_project_folder)
|
|
return path
|
|
|
|
def build_private_data_files(self, instance, private_data_dir):
|
|
'''
|
|
Creates temporary files containing the private data.
|
|
Returns a dictionary i.e.,
|
|
|
|
{
|
|
'credentials': {
|
|
<awx.main.models.Credential>: '/path/to/decrypted/data',
|
|
<awx.main.models.Credential>: '/path/to/decrypted/data',
|
|
...
|
|
},
|
|
'certificates': {
|
|
<awx.main.models.Credential>: /path/to/signed/ssh/certificate,
|
|
<awx.main.models.Credential>: /path/to/signed/ssh/certificate,
|
|
...
|
|
}
|
|
}
|
|
'''
|
|
private_data = self.build_private_data(instance, private_data_dir)
|
|
private_data_files = {'credentials': {}}
|
|
if private_data is not None:
|
|
for credential, data in private_data.get('credentials', {}).items():
|
|
# OpenSSH formatted keys must have a trailing newline to be
|
|
# accepted by ssh-add.
|
|
if 'OPENSSH PRIVATE KEY' in data and not data.endswith('\n'):
|
|
data += '\n'
|
|
# For credentials used with ssh-add, write to a named pipe which
|
|
# will be read then closed, instead of leaving the SSH key on disk.
|
|
if credential and credential.credential_type.namespace in ('ssh', 'scm'):
|
|
try:
|
|
os.mkdir(os.path.join(private_data_dir, 'env'))
|
|
except OSError as e:
|
|
if e.errno != errno.EEXIST:
|
|
raise
|
|
path = os.path.join(private_data_dir, 'env', 'ssh_key')
|
|
ansible_runner.utils.open_fifo_write(path, data.encode())
|
|
private_data_files['credentials']['ssh'] = path
|
|
# Ansible network modules do not yet support ssh-agent.
|
|
# Instead, ssh private key file is explicitly passed via an
|
|
# env variable.
|
|
else:
|
|
handle, path = tempfile.mkstemp(dir=private_data_dir)
|
|
f = os.fdopen(handle, 'w')
|
|
f.write(data)
|
|
f.close()
|
|
os.chmod(path, stat.S_IRUSR | stat.S_IWUSR)
|
|
private_data_files['credentials'][credential] = path
|
|
for credential, data in private_data.get('certificates', {}).items():
|
|
artifact_dir = os.path.join(private_data_dir, 'artifacts', str(self.instance.id))
|
|
if not os.path.exists(artifact_dir):
|
|
os.makedirs(artifact_dir, mode=0o700)
|
|
path = os.path.join(artifact_dir, 'ssh_key_data-cert.pub')
|
|
with open(path, 'w') as f:
|
|
f.write(data)
|
|
f.close()
|
|
os.chmod(path, stat.S_IRUSR | stat.S_IWUSR)
|
|
return private_data_files
|
|
|
|
def build_passwords(self, instance, runtime_passwords):
|
|
'''
|
|
Build a dictionary of passwords for responding to prompts.
|
|
'''
|
|
return {
|
|
'yes': 'yes',
|
|
'no': 'no',
|
|
'': '',
|
|
}
|
|
|
|
def build_extra_vars_file(self, instance, private_data_dir):
|
|
'''
|
|
Build ansible yaml file filled with extra vars to be passed via -e@file.yml
|
|
'''
|
|
|
|
def build_params_process_isolation(self, instance, private_data_dir, cwd):
|
|
'''
|
|
Build ansible runner .run() parameters for process isolation.
|
|
'''
|
|
process_isolation_params = dict()
|
|
if self.should_use_proot(instance):
|
|
local_paths = [private_data_dir]
|
|
if cwd != private_data_dir and Path(private_data_dir) not in Path(cwd).parents:
|
|
local_paths.append(cwd)
|
|
show_paths = self.proot_show_paths + local_paths + \
|
|
settings.AWX_PROOT_SHOW_PATHS
|
|
|
|
pi_path = os.path.split(private_data_dir)[0]
|
|
|
|
process_isolation_params = {
|
|
'process_isolation': True,
|
|
'process_isolation_path': pi_path,
|
|
'process_isolation_show_paths': show_paths,
|
|
'process_isolation_hide_paths': [
|
|
settings.AWX_PROOT_BASE_PATH,
|
|
'/etc/tower',
|
|
'/etc/ssh',
|
|
'/var/lib/awx',
|
|
'/var/log',
|
|
'/home',
|
|
'/var/tmp',
|
|
settings.PROJECTS_ROOT,
|
|
settings.JOBOUTPUT_ROOT,
|
|
] + getattr(settings, 'AWX_PROOT_HIDE_PATHS', None) or [],
|
|
'process_isolation_ro_paths': [settings.ANSIBLE_VENV_PATH, settings.AWX_VENV_PATH],
|
|
}
|
|
if getattr(instance, 'ansible_virtualenv_path', settings.ANSIBLE_VENV_PATH) != settings.ANSIBLE_VENV_PATH:
|
|
process_isolation_params['process_isolation_ro_paths'].append(instance.ansible_virtualenv_path)
|
|
return process_isolation_params
|
|
|
|
def build_params_resource_profiling(self, instance, private_data_dir):
|
|
resource_profiling_params = {}
|
|
if self.should_use_resource_profiling(instance):
|
|
cpu_poll_interval = settings.AWX_RESOURCE_PROFILING_CPU_POLL_INTERVAL
|
|
mem_poll_interval = settings.AWX_RESOURCE_PROFILING_MEMORY_POLL_INTERVAL
|
|
pid_poll_interval = settings.AWX_RESOURCE_PROFILING_PID_POLL_INTERVAL
|
|
|
|
results_dir = os.path.join(private_data_dir, 'artifacts/playbook_profiling')
|
|
if not os.path.isdir(results_dir):
|
|
os.makedirs(results_dir, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC)
|
|
|
|
logger.debug('Collected the following resource profiling intervals: cpu: {} mem: {} pid: {}'
|
|
.format(cpu_poll_interval, mem_poll_interval, pid_poll_interval))
|
|
|
|
resource_profiling_params.update({'resource_profiling': True,
|
|
'resource_profiling_base_cgroup': 'ansible-runner',
|
|
'resource_profiling_cpu_poll_interval': cpu_poll_interval,
|
|
'resource_profiling_memory_poll_interval': mem_poll_interval,
|
|
'resource_profiling_pid_poll_interval': pid_poll_interval,
|
|
'resource_profiling_results_dir': results_dir})
|
|
|
|
return resource_profiling_params
|
|
|
|
def _write_extra_vars_file(self, private_data_dir, vars, safe_dict={}):
|
|
env_path = os.path.join(private_data_dir, 'env')
|
|
try:
|
|
os.mkdir(env_path, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC)
|
|
except OSError as e:
|
|
if e.errno != errno.EEXIST:
|
|
raise
|
|
|
|
path = os.path.join(env_path, 'extravars')
|
|
handle = os.open(path, os.O_RDWR | os.O_CREAT, stat.S_IREAD | stat.S_IWRITE)
|
|
f = os.fdopen(handle, 'w')
|
|
if settings.ALLOW_JINJA_IN_EXTRA_VARS == 'always':
|
|
f.write(yaml.safe_dump(vars))
|
|
else:
|
|
f.write(safe_dump(vars, safe_dict))
|
|
f.close()
|
|
os.chmod(path, stat.S_IRUSR)
|
|
return path
|
|
|
|
def add_ansible_venv(self, venv_path, env, isolated=False):
|
|
env['VIRTUAL_ENV'] = venv_path
|
|
env['PATH'] = os.path.join(venv_path, "bin") + ":" + env['PATH']
|
|
venv_libdir = os.path.join(venv_path, "lib")
|
|
|
|
if not isolated and (
|
|
not os.path.exists(venv_libdir) or
|
|
os.path.join(venv_path, '') not in get_custom_venv_choices()
|
|
):
|
|
raise InvalidVirtualenvError(_(
|
|
'Invalid virtual environment selected: {}'.format(venv_path)
|
|
))
|
|
|
|
isolated_manager.set_pythonpath(venv_libdir, env)
|
|
|
|
def add_awx_venv(self, env):
|
|
env['VIRTUAL_ENV'] = settings.AWX_VENV_PATH
|
|
env['PATH'] = os.path.join(settings.AWX_VENV_PATH, "bin") + ":" + env['PATH']
|
|
|
|
def build_env(self, instance, private_data_dir, isolated, private_data_files=None):
|
|
'''
|
|
Build environment dictionary for ansible-playbook.
|
|
'''
|
|
env = dict(os.environ.items())
|
|
# Add ANSIBLE_* settings to the subprocess environment.
|
|
for attr in dir(settings):
|
|
if attr == attr.upper() and attr.startswith('ANSIBLE_'):
|
|
env[attr] = str(getattr(settings, attr))
|
|
# Also set environment variables configured in AWX_TASK_ENV setting.
|
|
for key, value in settings.AWX_TASK_ENV.items():
|
|
env[key] = str(value)
|
|
# Set environment variables needed for inventory and job event
|
|
# callbacks to work.
|
|
# Update PYTHONPATH to use local site-packages.
|
|
# NOTE:
|
|
# Derived class should call add_ansible_venv() or add_awx_venv()
|
|
if self.should_use_proot(instance):
|
|
env['PROOT_TMP_DIR'] = settings.AWX_PROOT_BASE_PATH
|
|
env['AWX_PRIVATE_DATA_DIR'] = private_data_dir
|
|
return env
|
|
|
|
def should_use_resource_profiling(self, job):
|
|
'''
|
|
Return whether this task should use resource profiling
|
|
'''
|
|
return False
|
|
|
|
def should_use_proot(self, instance):
|
|
'''
|
|
Return whether this task should use proot.
|
|
'''
|
|
return False
|
|
|
|
def build_inventory(self, instance, private_data_dir):
|
|
script_params = dict(hostvars=True, towervars=True)
|
|
if hasattr(instance, 'job_slice_number'):
|
|
script_params['slice_number'] = instance.job_slice_number
|
|
script_params['slice_count'] = instance.job_slice_count
|
|
script_data = instance.inventory.get_script_data(**script_params)
|
|
# maintain a list of host_name --> host_id
|
|
# so we can associate emitted events to Host objects
|
|
self.host_map = {
|
|
hostname: hv.pop('remote_tower_id', '')
|
|
for hostname, hv in script_data.get('_meta', {}).get('hostvars', {}).items()
|
|
}
|
|
json_data = json.dumps(script_data)
|
|
handle, path = tempfile.mkstemp(dir=private_data_dir)
|
|
f = os.fdopen(handle, 'w')
|
|
f.write('#! /usr/bin/env python\n# -*- coding: utf-8 -*-\nprint(%r)\n' % json_data)
|
|
f.close()
|
|
os.chmod(path, stat.S_IRUSR | stat.S_IXUSR | stat.S_IWUSR)
|
|
return path
|
|
|
|
def build_args(self, instance, private_data_dir, passwords):
|
|
raise NotImplementedError
|
|
|
|
def write_args_file(self, private_data_dir, args):
|
|
env_path = os.path.join(private_data_dir, 'env')
|
|
try:
|
|
os.mkdir(env_path, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC)
|
|
except OSError as e:
|
|
if e.errno != errno.EEXIST:
|
|
raise
|
|
|
|
path = os.path.join(env_path, 'cmdline')
|
|
handle = os.open(path, os.O_RDWR | os.O_CREAT, stat.S_IREAD | stat.S_IWRITE)
|
|
f = os.fdopen(handle, 'w')
|
|
f.write(ansible_runner.utils.args2cmdline(*args))
|
|
f.close()
|
|
os.chmod(path, stat.S_IRUSR)
|
|
return path
|
|
|
|
def build_cwd(self, instance, private_data_dir):
|
|
raise NotImplementedError
|
|
|
|
def build_credentials_list(self, instance):
|
|
return []
|
|
|
|
def get_instance_timeout(self, instance):
|
|
global_timeout_setting_name = instance._global_timeout_setting()
|
|
if global_timeout_setting_name:
|
|
global_timeout = getattr(settings, global_timeout_setting_name, 0)
|
|
local_timeout = getattr(instance, 'timeout', 0)
|
|
job_timeout = global_timeout if local_timeout == 0 else local_timeout
|
|
job_timeout = 0 if local_timeout < 0 else job_timeout
|
|
else:
|
|
job_timeout = 0
|
|
return job_timeout
|
|
|
|
def get_password_prompts(self, passwords={}):
|
|
'''
|
|
Return a dictionary where keys are strings or regular expressions for
|
|
prompts, and values are password lookup keys (keys that are returned
|
|
from build_passwords).
|
|
'''
|
|
return OrderedDict()
|
|
|
|
def create_expect_passwords_data_struct(self, password_prompts, passwords):
|
|
expect_passwords = {}
|
|
for k, v in password_prompts.items():
|
|
expect_passwords[k] = passwords.get(v, '') or ''
|
|
return expect_passwords
|
|
|
|
def pre_run_hook(self, instance, private_data_dir):
|
|
'''
|
|
Hook for any steps to run before the job/task starts
|
|
'''
|
|
|
|
def post_run_hook(self, instance, status):
|
|
'''
|
|
Hook for any steps to run before job/task is marked as complete.
|
|
'''
|
|
|
|
def final_run_hook(self, instance, status, private_data_dir, fact_modification_times, isolated_manager_instance=None):
|
|
'''
|
|
Hook for any steps to run after job/task is marked as complete.
|
|
'''
|
|
job_profiling_dir = os.path.join(private_data_dir, 'artifacts/playbook_profiling')
|
|
awx_profiling_dir = '/var/log/tower/playbook_profiling/'
|
|
if not os.path.exists(awx_profiling_dir):
|
|
os.mkdir(awx_profiling_dir)
|
|
if os.path.isdir(job_profiling_dir):
|
|
shutil.copytree(job_profiling_dir, os.path.join(awx_profiling_dir, str(instance.pk)))
|
|
|
|
if instance.is_containerized:
|
|
from awx.main.scheduler.kubernetes import PodManager # prevent circular import
|
|
pm = PodManager(instance)
|
|
logger.debug(f"Deleting pod {pm.pod_name}")
|
|
pm.delete()
|
|
|
|
|
|
def event_handler(self, event_data):
|
|
#
|
|
# ⚠️ D-D-D-DANGER ZONE ⚠️
|
|
# This method is called once for *every event* emitted by Ansible
|
|
# Runner as a playbook runs. That means that changes to the code in
|
|
# this method are _very_ likely to introduce performance regressions.
|
|
#
|
|
# Even if this function is made on average .05s slower, it can have
|
|
# devastating performance implications for playbooks that emit
|
|
# tens or hundreds of thousands of events.
|
|
#
|
|
# Proceed with caution!
|
|
#
|
|
'''
|
|
Ansible runner puts a parent_uuid on each event, no matter what the type.
|
|
AWX only saves the parent_uuid if the event is for a Job.
|
|
'''
|
|
# cache end_line locally for RunInventoryUpdate tasks
|
|
# which generate job events from two 'streams':
|
|
# ansible-inventory and the awx.main.commands.inventory_import
|
|
# logger
|
|
if isinstance(self, RunInventoryUpdate):
|
|
self.end_line = event_data['end_line']
|
|
|
|
if event_data.get(self.event_data_key, None):
|
|
if self.event_data_key != 'job_id':
|
|
event_data.pop('parent_uuid', None)
|
|
if self.parent_workflow_job_id:
|
|
event_data['workflow_job_id'] = self.parent_workflow_job_id
|
|
if self.host_map:
|
|
host = event_data.get('event_data', {}).get('host', '').strip()
|
|
if host:
|
|
event_data['host_name'] = host
|
|
if host in self.host_map:
|
|
event_data['host_id'] = self.host_map[host]
|
|
else:
|
|
event_data['host_name'] = ''
|
|
event_data['host_id'] = ''
|
|
if event_data.get('event') == 'playbook_on_stats':
|
|
event_data['host_map'] = self.host_map
|
|
|
|
if isinstance(self, RunProjectUpdate):
|
|
# it's common for Ansible's SCM modules to print
|
|
# error messages on failure that contain the plaintext
|
|
# basic auth credentials (username + password)
|
|
# it's also common for the nested event data itself (['res']['...'])
|
|
# to contain unredacted text on failure
|
|
# this is a _little_ expensive to filter
|
|
# with regex, but project updates don't have many events,
|
|
# so it *should* have a negligible performance impact
|
|
task = event_data.get('event_data', {}).get('task_action')
|
|
try:
|
|
if task in ('git', 'svn'):
|
|
event_data_json = json.dumps(event_data)
|
|
event_data_json = UriCleaner.remove_sensitive(event_data_json)
|
|
event_data = json.loads(event_data_json)
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
event_data.setdefault(self.event_data_key, self.instance.id)
|
|
self.dispatcher.dispatch(event_data)
|
|
self.event_ct += 1
|
|
|
|
'''
|
|
Handle artifacts
|
|
'''
|
|
if event_data.get('event_data', {}).get('artifact_data', {}):
|
|
self.instance.artifacts = event_data['event_data']['artifact_data']
|
|
self.instance.save(update_fields=['artifacts'])
|
|
|
|
return False
|
|
|
|
def cancel_callback(self):
|
|
'''
|
|
Ansible runner callback to tell the job when/if it is canceled
|
|
'''
|
|
unified_job_id = self.instance.pk
|
|
self.instance = self.update_model(unified_job_id)
|
|
if not self.instance:
|
|
logger.error('unified job {} was deleted while running, canceling'.format(unified_job_id))
|
|
return True
|
|
if self.instance.cancel_flag or self.instance.status == 'canceled':
|
|
cancel_wait = (now() - self.instance.modified).seconds if self.instance.modified else 0
|
|
if cancel_wait > 5:
|
|
logger.warn('Request to cancel {} took {} seconds to complete.'.format(self.instance.log_format, cancel_wait))
|
|
return True
|
|
return False
|
|
|
|
def finished_callback(self, runner_obj):
|
|
'''
|
|
Ansible runner callback triggered on finished run
|
|
'''
|
|
event_data = {
|
|
'event': 'EOF',
|
|
'final_counter': self.event_ct,
|
|
}
|
|
event_data.setdefault(self.event_data_key, self.instance.id)
|
|
self.dispatcher.dispatch(event_data)
|
|
|
|
def status_handler(self, status_data, runner_config):
|
|
'''
|
|
Ansible runner callback triggered on status transition
|
|
'''
|
|
if status_data['status'] == 'starting':
|
|
job_env = dict(runner_config.env)
|
|
'''
|
|
Take the safe environment variables and overwrite
|
|
'''
|
|
for k, v in self.safe_env.items():
|
|
if k in job_env:
|
|
job_env[k] = v
|
|
self.instance = self.update_model(self.instance.pk, job_args=json.dumps(runner_config.command),
|
|
job_cwd=runner_config.cwd, job_env=job_env)
|
|
|
|
def check_handler(self, config):
|
|
'''
|
|
IsolatedManager callback triggered by the repeated checks of the isolated node
|
|
'''
|
|
job_env = build_safe_env(config['env'])
|
|
for k, v in self.safe_cred_env.items():
|
|
if k in job_env:
|
|
job_env[k] = v
|
|
self.instance = self.update_model(self.instance.pk,
|
|
job_args=json.dumps(config['command']),
|
|
job_cwd=config['cwd'],
|
|
job_env=job_env)
|
|
|
|
|
|
@with_path_cleanup
|
|
def run(self, pk, **kwargs):
|
|
'''
|
|
Run the job/task and capture its output.
|
|
'''
|
|
self.instance = self.model.objects.get(pk=pk)
|
|
containerized = self.instance.is_containerized
|
|
pod_manager = None
|
|
if containerized:
|
|
# Here we are trying to launch a pod before transitioning the job into a running
|
|
# state. For some scenarios (like waiting for resources to become available) we do this
|
|
# rather than marking the job as error or failed. This is not always desirable. Cases
|
|
# such as invalid authentication should surface as an error.
|
|
pod_manager = self.deploy_container_group_pod(self.instance)
|
|
if not pod_manager:
|
|
return
|
|
|
|
# self.instance because of the update_model pattern and when it's used in callback handlers
|
|
self.instance = self.update_model(pk, status='running',
|
|
start_args='') # blank field to remove encrypted passwords
|
|
|
|
self.instance.websocket_emit_status("running")
|
|
status, rc = 'error', None
|
|
extra_update_fields = {}
|
|
fact_modification_times = {}
|
|
self.event_ct = 0
|
|
|
|
'''
|
|
Needs to be an object property because status_handler uses it in a callback context
|
|
'''
|
|
self.safe_env = {}
|
|
self.safe_cred_env = {}
|
|
private_data_dir = None
|
|
isolated_manager_instance = None
|
|
|
|
# store a reference to the parent workflow job (if any) so we can include
|
|
# it in event data JSON
|
|
if self.instance.spawned_by_workflow:
|
|
self.parent_workflow_job_id = self.instance.get_workflow_job().id
|
|
|
|
try:
|
|
isolated = self.instance.is_isolated()
|
|
self.instance.send_notification_templates("running")
|
|
private_data_dir = self.build_private_data_dir(self.instance)
|
|
self.pre_run_hook(self.instance, private_data_dir)
|
|
if self.instance.cancel_flag:
|
|
self.instance = self.update_model(self.instance.pk, status='canceled')
|
|
if self.instance.status != 'running':
|
|
# Stop the task chain and prevent starting the job if it has
|
|
# already been canceled.
|
|
self.instance = self.update_model(pk)
|
|
status = self.instance.status
|
|
raise RuntimeError('not starting %s task' % self.instance.status)
|
|
|
|
if not os.path.exists(settings.AWX_PROOT_BASE_PATH):
|
|
raise RuntimeError('AWX_PROOT_BASE_PATH=%s does not exist' % settings.AWX_PROOT_BASE_PATH)
|
|
|
|
# store a record of the venv used at runtime
|
|
if hasattr(self.instance, 'custom_virtualenv'):
|
|
self.update_model(pk, custom_virtualenv=getattr(self.instance, 'ansible_virtualenv_path', settings.ANSIBLE_VENV_PATH))
|
|
|
|
# Fetch "cached" fact data from prior runs and put on the disk
|
|
# where ansible expects to find it
|
|
if getattr(self.instance, 'use_fact_cache', False):
|
|
self.instance.start_job_fact_cache(
|
|
os.path.join(private_data_dir, 'artifacts', str(self.instance.id), 'fact_cache'),
|
|
fact_modification_times,
|
|
)
|
|
|
|
# May have to serialize the value
|
|
private_data_files = self.build_private_data_files(self.instance, private_data_dir)
|
|
passwords = self.build_passwords(self.instance, kwargs)
|
|
self.build_extra_vars_file(self.instance, private_data_dir)
|
|
args = self.build_args(self.instance, private_data_dir, passwords)
|
|
cwd = self.build_cwd(self.instance, private_data_dir)
|
|
resource_profiling_params = self.build_params_resource_profiling(self.instance,
|
|
private_data_dir)
|
|
process_isolation_params = self.build_params_process_isolation(self.instance,
|
|
private_data_dir,
|
|
cwd)
|
|
env = self.build_env(self.instance, private_data_dir, isolated,
|
|
private_data_files=private_data_files)
|
|
self.safe_env = build_safe_env(env)
|
|
|
|
credentials = self.build_credentials_list(self.instance)
|
|
|
|
for credential in credentials:
|
|
if credential:
|
|
credential.credential_type.inject_credential(
|
|
credential, env, self.safe_cred_env, args, private_data_dir
|
|
)
|
|
|
|
self.safe_env.update(self.safe_cred_env)
|
|
|
|
self.write_args_file(private_data_dir, args)
|
|
|
|
password_prompts = self.get_password_prompts(passwords)
|
|
expect_passwords = self.create_expect_passwords_data_struct(password_prompts, passwords)
|
|
|
|
params = {
|
|
'ident': self.instance.id,
|
|
'private_data_dir': private_data_dir,
|
|
'project_dir': cwd,
|
|
'playbook': self.build_playbook_path_relative_to_cwd(self.instance, private_data_dir),
|
|
'inventory': self.build_inventory(self.instance, private_data_dir),
|
|
'passwords': expect_passwords,
|
|
'envvars': env,
|
|
'event_handler': self.event_handler,
|
|
'cancel_callback': self.cancel_callback,
|
|
'finished_callback': self.finished_callback,
|
|
'status_handler': self.status_handler,
|
|
'settings': {
|
|
'job_timeout': self.get_instance_timeout(self.instance),
|
|
'suppress_ansible_output': True,
|
|
**process_isolation_params,
|
|
**resource_profiling_params,
|
|
},
|
|
}
|
|
|
|
if containerized:
|
|
# We don't want HOME passed through to container groups.
|
|
params['envvars'].pop('HOME')
|
|
|
|
if isinstance(self.instance, AdHocCommand):
|
|
params['module'] = self.build_module_name(self.instance)
|
|
params['module_args'] = self.build_module_args(self.instance)
|
|
|
|
if getattr(self.instance, 'use_fact_cache', False):
|
|
# Enable Ansible fact cache.
|
|
params['fact_cache_type'] = 'jsonfile'
|
|
else:
|
|
# Disable Ansible fact cache.
|
|
params['fact_cache_type'] = ''
|
|
|
|
'''
|
|
Delete parameters if the values are None or empty array
|
|
'''
|
|
for v in ['passwords', 'playbook', 'inventory']:
|
|
if not params[v]:
|
|
del params[v]
|
|
|
|
self.dispatcher = CallbackQueueDispatcher()
|
|
if self.instance.is_isolated() or containerized:
|
|
module_args = None
|
|
if 'module_args' in params:
|
|
# if it's adhoc, copy the module args
|
|
module_args = ansible_runner.utils.args2cmdline(
|
|
params.get('module_args'),
|
|
)
|
|
shutil.move(
|
|
params.pop('inventory'),
|
|
os.path.join(private_data_dir, 'inventory')
|
|
)
|
|
|
|
ansible_runner.utils.dump_artifacts(params)
|
|
isolated_manager_instance = isolated_manager.IsolatedManager(
|
|
self.event_handler,
|
|
canceled_callback=lambda: self.update_model(self.instance.pk).cancel_flag,
|
|
check_callback=self.check_handler,
|
|
pod_manager=pod_manager
|
|
)
|
|
status, rc = isolated_manager_instance.run(self.instance,
|
|
private_data_dir,
|
|
params.get('playbook'),
|
|
params.get('module'),
|
|
module_args,
|
|
ident=str(self.instance.pk))
|
|
self.finished_callback(None)
|
|
else:
|
|
res = ansible_runner.interface.run(**params)
|
|
status = res.status
|
|
rc = res.rc
|
|
|
|
if status == 'timeout':
|
|
self.instance.job_explanation = "Job terminated due to timeout"
|
|
status = 'failed'
|
|
extra_update_fields['job_explanation'] = self.instance.job_explanation
|
|
# ensure failure notification sends even if playbook_on_stats event is not triggered
|
|
handle_success_and_failure_notifications.apply_async([self.instance.job.id])
|
|
|
|
except InvalidVirtualenvError as e:
|
|
extra_update_fields['job_explanation'] = e.message
|
|
logger.error('{} {}'.format(self.instance.log_format, e.message))
|
|
except Exception:
|
|
# this could catch programming or file system errors
|
|
extra_update_fields['result_traceback'] = traceback.format_exc()
|
|
logger.exception('%s Exception occurred while running task', self.instance.log_format)
|
|
finally:
|
|
logger.debug('%s finished running, producing %s events.', self.instance.log_format, self.event_ct)
|
|
|
|
try:
|
|
self.post_run_hook(self.instance, status)
|
|
except PostRunError as exc:
|
|
if status == 'successful':
|
|
status = exc.status
|
|
extra_update_fields['job_explanation'] = exc.args[0]
|
|
if exc.tb:
|
|
extra_update_fields['result_traceback'] = exc.tb
|
|
except Exception:
|
|
logger.exception('{} Post run hook errored.'.format(self.instance.log_format))
|
|
|
|
self.instance = self.update_model(pk)
|
|
self.instance = self.update_model(pk, status=status,
|
|
emitted_events=self.event_ct,
|
|
**extra_update_fields)
|
|
|
|
try:
|
|
self.final_run_hook(self.instance, status, private_data_dir, fact_modification_times, isolated_manager_instance=isolated_manager_instance)
|
|
except Exception:
|
|
logger.exception('{} Final run hook errored.'.format(self.instance.log_format))
|
|
|
|
self.instance.websocket_emit_status(status)
|
|
if status != 'successful':
|
|
if status == 'canceled':
|
|
raise AwxTaskError.TaskCancel(self.instance, rc)
|
|
else:
|
|
raise AwxTaskError.TaskError(self.instance, rc)
|
|
|
|
|
|
def deploy_container_group_pod(self, task):
|
|
from awx.main.scheduler.kubernetes import PodManager # Avoid circular import
|
|
pod_manager = PodManager(self.instance)
|
|
try:
|
|
log_name = task.log_format
|
|
logger.debug(f"Launching pod for {log_name}.")
|
|
pod_manager.deploy()
|
|
except (ApiException, Exception) as exc:
|
|
if isinstance(exc, ApiException) and exc.status == 403:
|
|
try:
|
|
if 'exceeded quota' in json.loads(exc.body)['message']:
|
|
# If the k8s cluster does not have capacity, we move the
|
|
# job back into pending and wait until the next run of
|
|
# the task manager. This does not exactly play well with
|
|
# our current instance group precendence logic, since it
|
|
# will just sit here forever if kubernetes returns this
|
|
# error.
|
|
logger.warn(exc.body)
|
|
logger.warn(f"Could not launch pod for {log_name}. Exceeded quota.")
|
|
self.update_model(task.pk, status='pending')
|
|
return
|
|
except Exception:
|
|
logger.exception(f"Unable to handle response from Kubernetes API for {log_name}.")
|
|
|
|
logger.exception(f"Error when launching pod for {log_name}")
|
|
self.update_model(task.pk, status='error', result_traceback=traceback.format_exc())
|
|
return
|
|
|
|
self.update_model(task.pk, execution_node=pod_manager.pod_name)
|
|
return pod_manager
|
|
|
|
|
|
|
|
|
|
|
|
@task(queue=get_local_queuename)
|
|
class RunJob(BaseTask):
|
|
'''
|
|
Run a job using ansible-playbook.
|
|
'''
|
|
|
|
model = Job
|
|
event_model = JobEvent
|
|
event_data_key = 'job_id'
|
|
|
|
def build_private_data(self, job, private_data_dir):
|
|
'''
|
|
Returns a dict of the form
|
|
{
|
|
'credentials': {
|
|
<awx.main.models.Credential>: <credential_decrypted_ssh_key_data>,
|
|
<awx.main.models.Credential>: <credential_decrypted_ssh_key_data>,
|
|
...
|
|
},
|
|
'certificates': {
|
|
<awx.main.models.Credential>: <signed SSH certificate data>,
|
|
<awx.main.models.Credential>: <signed SSH certificate data>,
|
|
...
|
|
}
|
|
}
|
|
'''
|
|
private_data = {'credentials': {}}
|
|
for credential in job.credentials.prefetch_related('input_sources__source_credential').all():
|
|
# If we were sent SSH credentials, decrypt them and send them
|
|
# back (they will be written to a temporary file).
|
|
if credential.has_input('ssh_key_data'):
|
|
private_data['credentials'][credential] = credential.get_input('ssh_key_data', default='')
|
|
if credential.has_input('ssh_public_key_data'):
|
|
private_data.setdefault('certificates', {})[credential] = credential.get_input('ssh_public_key_data', default='')
|
|
|
|
return private_data
|
|
|
|
def build_passwords(self, job, runtime_passwords):
|
|
'''
|
|
Build a dictionary of passwords for SSH private key, SSH user, sudo/su
|
|
and ansible-vault.
|
|
'''
|
|
passwords = super(RunJob, self).build_passwords(job, runtime_passwords)
|
|
cred = job.machine_credential
|
|
if cred:
|
|
for field in ('ssh_key_unlock', 'ssh_password', 'become_password', 'vault_password'):
|
|
value = runtime_passwords.get(field, cred.get_input('password' if field == 'ssh_password' else field, default=''))
|
|
if value not in ('', 'ASK'):
|
|
passwords[field] = value
|
|
|
|
for cred in job.vault_credentials:
|
|
field = 'vault_password'
|
|
vault_id = cred.get_input('vault_id', default=None)
|
|
if vault_id:
|
|
field = 'vault_password.{}'.format(vault_id)
|
|
if field in passwords:
|
|
raise RuntimeError(
|
|
'multiple vault credentials were specified with --vault-id {}@prompt'.format(
|
|
vault_id
|
|
)
|
|
)
|
|
value = runtime_passwords.get(field, cred.get_input('vault_password', default=''))
|
|
if value not in ('', 'ASK'):
|
|
passwords[field] = value
|
|
|
|
'''
|
|
Only 1 value can be provided for a unique prompt string. Prefer ssh
|
|
key unlock over network key unlock.
|
|
'''
|
|
if 'ssh_key_unlock' not in passwords:
|
|
for cred in job.network_credentials:
|
|
if cred.inputs.get('ssh_key_unlock'):
|
|
passwords['ssh_key_unlock'] = runtime_passwords.get('ssh_key_unlock', cred.get_input('ssh_key_unlock', default=''))
|
|
break
|
|
|
|
return passwords
|
|
|
|
def build_env(self, job, private_data_dir, isolated=False, private_data_files=None):
|
|
'''
|
|
Build environment dictionary for ansible-playbook.
|
|
'''
|
|
env = super(RunJob, self).build_env(job, private_data_dir,
|
|
isolated=isolated,
|
|
private_data_files=private_data_files)
|
|
if private_data_files is None:
|
|
private_data_files = {}
|
|
self.add_ansible_venv(job.ansible_virtualenv_path, env, isolated=isolated)
|
|
# Set environment variables needed for inventory and job event
|
|
# callbacks to work.
|
|
env['JOB_ID'] = str(job.pk)
|
|
env['INVENTORY_ID'] = str(job.inventory.pk)
|
|
if job.project:
|
|
env['PROJECT_REVISION'] = job.project.scm_revision
|
|
env['ANSIBLE_RETRY_FILES_ENABLED'] = "False"
|
|
env['MAX_EVENT_RES'] = str(settings.MAX_EVENT_RES_DATA)
|
|
if not isolated:
|
|
if hasattr(settings, 'AWX_ANSIBLE_CALLBACK_PLUGINS') and settings.AWX_ANSIBLE_CALLBACK_PLUGINS:
|
|
env['ANSIBLE_CALLBACK_PLUGINS'] = ':'.join(settings.AWX_ANSIBLE_CALLBACK_PLUGINS)
|
|
env['AWX_HOST'] = settings.TOWER_URL_BASE
|
|
|
|
# Create a directory for ControlPath sockets that is unique to each
|
|
# job and visible inside the proot environment (when enabled).
|
|
cp_dir = os.path.join(private_data_dir, 'cp')
|
|
if not os.path.exists(cp_dir):
|
|
os.mkdir(cp_dir, 0o700)
|
|
env['ANSIBLE_SSH_CONTROL_PATH_DIR'] = cp_dir
|
|
|
|
# Set environment variables for cloud credentials.
|
|
cred_files = private_data_files.get('credentials', {})
|
|
for cloud_cred in job.cloud_credentials:
|
|
if cloud_cred and cloud_cred.credential_type.namespace == 'openstack':
|
|
env['OS_CLIENT_CONFIG_FILE'] = cred_files.get(cloud_cred, '')
|
|
|
|
for network_cred in job.network_credentials:
|
|
env['ANSIBLE_NET_USERNAME'] = network_cred.get_input('username', default='')
|
|
env['ANSIBLE_NET_PASSWORD'] = network_cred.get_input('password', default='')
|
|
|
|
ssh_keyfile = cred_files.get(network_cred, '')
|
|
if ssh_keyfile:
|
|
env['ANSIBLE_NET_SSH_KEYFILE'] = ssh_keyfile
|
|
|
|
authorize = network_cred.get_input('authorize', default=False)
|
|
env['ANSIBLE_NET_AUTHORIZE'] = str(int(authorize))
|
|
if authorize:
|
|
env['ANSIBLE_NET_AUTH_PASS'] = network_cred.get_input('authorize_password', default='')
|
|
|
|
path_vars = (
|
|
('ANSIBLE_COLLECTIONS_PATHS', 'collections_paths', 'requirements_collections', '~/.ansible/collections:/usr/share/ansible/collections'),
|
|
('ANSIBLE_ROLES_PATH', 'roles_path', 'requirements_roles', '~/.ansible/roles:/usr/share/ansible/roles:/etc/ansible/roles'))
|
|
|
|
config_values = read_ansible_config(job.project.get_project_path(), list(map(lambda x: x[1], path_vars)))
|
|
|
|
for env_key, config_setting, folder, default in path_vars:
|
|
paths = default.split(':')
|
|
if env_key in env:
|
|
for path in env[env_key].split(':'):
|
|
if path not in paths:
|
|
paths = [env[env_key]] + paths
|
|
elif config_setting in config_values:
|
|
for path in config_values[config_setting].split(':'):
|
|
if path not in paths:
|
|
paths = [config_values[config_setting]] + paths
|
|
paths = [os.path.join(private_data_dir, folder)] + paths
|
|
env[env_key] = os.pathsep.join(paths)
|
|
|
|
return env
|
|
|
|
def build_args(self, job, private_data_dir, passwords):
|
|
'''
|
|
Build command line argument list for running ansible-playbook,
|
|
optionally using ssh-agent for public/private key authentication.
|
|
'''
|
|
creds = job.machine_credential
|
|
|
|
ssh_username, become_username, become_method = '', '', ''
|
|
if creds:
|
|
ssh_username = creds.get_input('username', default='')
|
|
become_method = creds.get_input('become_method', default='')
|
|
become_username = creds.get_input('become_username', default='')
|
|
else:
|
|
become_method = None
|
|
become_username = ""
|
|
# Always specify the normal SSH user as root by default. Since this
|
|
# task is normally running in the background under a service account,
|
|
# it doesn't make sense to rely on ansible-playbook's default of using
|
|
# the current user.
|
|
ssh_username = ssh_username or 'root'
|
|
args = []
|
|
if job.job_type == 'check':
|
|
args.append('--check')
|
|
args.extend(['-u', sanitize_jinja(ssh_username)])
|
|
if 'ssh_password' in passwords:
|
|
args.append('--ask-pass')
|
|
if job.become_enabled:
|
|
args.append('--become')
|
|
if job.diff_mode:
|
|
args.append('--diff')
|
|
if become_method:
|
|
args.extend(['--become-method', sanitize_jinja(become_method)])
|
|
if become_username:
|
|
args.extend(['--become-user', sanitize_jinja(become_username)])
|
|
if 'become_password' in passwords:
|
|
args.append('--ask-become-pass')
|
|
|
|
# Support prompting for multiple vault passwords
|
|
for k, v in passwords.items():
|
|
if k.startswith('vault_password'):
|
|
if k == 'vault_password':
|
|
args.append('--ask-vault-pass')
|
|
else:
|
|
# split only on the first dot in case the vault ID itself contains a dot
|
|
vault_id = k.split('.', 1)[1]
|
|
args.append('--vault-id')
|
|
args.append('{}@prompt'.format(vault_id))
|
|
|
|
if job.forks:
|
|
if settings.MAX_FORKS > 0 and job.forks > settings.MAX_FORKS:
|
|
logger.warning(f'Maximum number of forks ({settings.MAX_FORKS}) exceeded.')
|
|
args.append('--forks=%d' % settings.MAX_FORKS)
|
|
else:
|
|
args.append('--forks=%d' % job.forks)
|
|
if job.force_handlers:
|
|
args.append('--force-handlers')
|
|
if job.limit:
|
|
args.extend(['-l', job.limit])
|
|
if job.verbosity:
|
|
args.append('-%s' % ('v' * min(5, job.verbosity)))
|
|
if job.job_tags:
|
|
args.extend(['-t', job.job_tags])
|
|
if job.skip_tags:
|
|
args.append('--skip-tags=%s' % job.skip_tags)
|
|
if job.start_at_task:
|
|
args.append('--start-at-task=%s' % job.start_at_task)
|
|
|
|
return args
|
|
|
|
def build_cwd(self, job, private_data_dir):
|
|
return os.path.join(private_data_dir, 'project')
|
|
|
|
def build_playbook_path_relative_to_cwd(self, job, private_data_dir):
|
|
return job.playbook
|
|
|
|
def build_extra_vars_file(self, job, private_data_dir):
|
|
# Define special extra_vars for AWX, combine with job.extra_vars.
|
|
extra_vars = job.awx_meta_vars()
|
|
|
|
if job.extra_vars_dict:
|
|
extra_vars.update(json.loads(job.decrypted_extra_vars()))
|
|
|
|
# By default, all extra vars disallow Jinja2 template usage for
|
|
# security reasons; top level key-values defined in JT.extra_vars, however,
|
|
# are allowed as "safe" (because they can only be set by users with
|
|
# higher levels of privilege - those that have the ability create and
|
|
# edit Job Templates)
|
|
safe_dict = {}
|
|
if job.job_template and settings.ALLOW_JINJA_IN_EXTRA_VARS == 'template':
|
|
safe_dict = job.job_template.extra_vars_dict
|
|
|
|
return self._write_extra_vars_file(private_data_dir, extra_vars, safe_dict)
|
|
|
|
def build_credentials_list(self, job):
|
|
return job.credentials.prefetch_related('input_sources__source_credential').all()
|
|
|
|
def get_password_prompts(self, passwords={}):
|
|
d = super(RunJob, self).get_password_prompts(passwords)
|
|
d[r'Enter passphrase for .*:\s*?$'] = 'ssh_key_unlock'
|
|
d[r'Bad passphrase, try again for .*:\s*?$'] = ''
|
|
for method in PRIVILEGE_ESCALATION_METHODS:
|
|
d[r'%s password.*:\s*?$' % (method[0])] = 'become_password'
|
|
d[r'%s password.*:\s*?$' % (method[0].upper())] = 'become_password'
|
|
d[r'BECOME password.*:\s*?$'] = 'become_password'
|
|
d[r'SSH password:\s*?$'] = 'ssh_password'
|
|
d[r'Password:\s*?$'] = 'ssh_password'
|
|
d[r'Vault password:\s*?$'] = 'vault_password'
|
|
for k, v in passwords.items():
|
|
if k.startswith('vault_password.'):
|
|
# split only on the first dot in case the vault ID itself contains a dot
|
|
vault_id = k.split('.', 1)[1]
|
|
d[r'Vault password \({}\):\s*?$'.format(vault_id)] = k
|
|
return d
|
|
|
|
def should_use_resource_profiling(self, job):
|
|
'''
|
|
Return whether this task should use resource profiling
|
|
'''
|
|
return settings.AWX_RESOURCE_PROFILING_ENABLED
|
|
|
|
def should_use_proot(self, job):
|
|
'''
|
|
Return whether this task should use proot.
|
|
'''
|
|
if job.is_containerized:
|
|
return False
|
|
return getattr(settings, 'AWX_PROOT_ENABLED', False)
|
|
|
|
def pre_run_hook(self, job, private_data_dir):
|
|
if job.inventory is None:
|
|
error = _('Job could not start because it does not have a valid inventory.')
|
|
self.update_model(job.pk, status='failed', job_explanation=error)
|
|
raise RuntimeError(error)
|
|
elif job.project is None:
|
|
error = _('Job could not start because it does not have a valid project.')
|
|
self.update_model(job.pk, status='failed', job_explanation=error)
|
|
raise RuntimeError(error)
|
|
elif job.project.status in ('error', 'failed'):
|
|
msg = _(
|
|
'The project revision for this job template is unknown due to a failed update.'
|
|
)
|
|
job = self.update_model(job.pk, status='failed', job_explanation=msg)
|
|
raise RuntimeError(msg)
|
|
|
|
project_path = job.project.get_project_path(check_if_exists=False)
|
|
job_revision = job.project.scm_revision
|
|
sync_needs = []
|
|
source_update_tag = 'update_{}'.format(job.project.scm_type)
|
|
branch_override = bool(job.scm_branch and job.scm_branch != job.project.scm_branch)
|
|
if not job.project.scm_type:
|
|
pass # manual projects are not synced, user has responsibility for that
|
|
elif not os.path.exists(project_path):
|
|
logger.debug('Performing fresh clone of {} on this instance.'.format(job.project))
|
|
sync_needs.append(source_update_tag)
|
|
elif job.project.scm_type == 'git' and job.project.scm_revision and (not branch_override):
|
|
git_repo = git.Repo(project_path)
|
|
try:
|
|
if job_revision == git_repo.head.commit.hexsha:
|
|
logger.debug('Skipping project sync for {} because commit is locally available'.format(job.log_format))
|
|
else:
|
|
sync_needs.append(source_update_tag)
|
|
except (ValueError, BadGitName):
|
|
logger.debug('Needed commit for {} not in local source tree, will sync with remote'.format(job.log_format))
|
|
sync_needs.append(source_update_tag)
|
|
else:
|
|
logger.debug('Project not available locally, {} will sync with remote'.format(job.log_format))
|
|
sync_needs.append(source_update_tag)
|
|
|
|
has_cache = os.path.exists(os.path.join(job.project.get_cache_path(), job.project.cache_id))
|
|
# Galaxy requirements are not supported for manual projects
|
|
if job.project.scm_type and ((not has_cache) or branch_override):
|
|
sync_needs.extend(['install_roles', 'install_collections'])
|
|
|
|
if sync_needs:
|
|
pu_ig = job.instance_group
|
|
pu_en = job.execution_node
|
|
if job.is_isolated() is True:
|
|
pu_ig = pu_ig.controller
|
|
pu_en = settings.CLUSTER_HOST_ID
|
|
|
|
sync_metafields = dict(
|
|
launch_type="sync",
|
|
job_type='run',
|
|
job_tags=','.join(sync_needs),
|
|
status='running',
|
|
instance_group = pu_ig,
|
|
execution_node=pu_en,
|
|
celery_task_id=job.celery_task_id
|
|
)
|
|
if branch_override:
|
|
sync_metafields['scm_branch'] = job.scm_branch
|
|
if 'update_' not in sync_metafields['job_tags']:
|
|
sync_metafields['scm_revision'] = job_revision
|
|
local_project_sync = job.project.create_project_update(_eager_fields=sync_metafields)
|
|
# save the associated job before calling run() so that a
|
|
# cancel() call on the job can cancel the project update
|
|
job = self.update_model(job.pk, project_update=local_project_sync)
|
|
|
|
project_update_task = local_project_sync._get_task_class()
|
|
try:
|
|
# the job private_data_dir is passed so sync can download roles and collections there
|
|
sync_task = project_update_task(job_private_data_dir=private_data_dir)
|
|
sync_task.run(local_project_sync.id)
|
|
local_project_sync.refresh_from_db()
|
|
job = self.update_model(job.pk, scm_revision=local_project_sync.scm_revision)
|
|
except Exception:
|
|
local_project_sync.refresh_from_db()
|
|
if local_project_sync.status != 'canceled':
|
|
job = self.update_model(job.pk, status='failed',
|
|
job_explanation=('Previous Task Failed: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}' %
|
|
('project_update', local_project_sync.name, local_project_sync.id)))
|
|
raise
|
|
job.refresh_from_db()
|
|
if job.cancel_flag:
|
|
return
|
|
else:
|
|
# Case where a local sync is not needed, meaning that local tree is
|
|
# up-to-date with project, job is running project current version
|
|
if job_revision:
|
|
job = self.update_model(job.pk, scm_revision=job_revision)
|
|
# Project update does not copy the folder, so copy here
|
|
RunProjectUpdate.make_local_copy(job.project, private_data_dir, scm_revision=job_revision)
|
|
|
|
if job.inventory.kind == 'smart':
|
|
# cache smart inventory memberships so that the host_filter query is not
|
|
# ran inside of the event saving code
|
|
update_smart_memberships_for_inventory(job.inventory)
|
|
|
|
def final_run_hook(self, job, status, private_data_dir, fact_modification_times, isolated_manager_instance=None):
|
|
super(RunJob, self).final_run_hook(job, status, private_data_dir, fact_modification_times)
|
|
if not private_data_dir:
|
|
# If there's no private data dir, that means we didn't get into the
|
|
# actual `run()` call; this _usually_ means something failed in
|
|
# the pre_run_hook method
|
|
return
|
|
if job.use_fact_cache:
|
|
job.finish_job_fact_cache(
|
|
os.path.join(private_data_dir, 'artifacts', str(job.id), 'fact_cache'),
|
|
fact_modification_times,
|
|
)
|
|
if isolated_manager_instance and not job.is_containerized:
|
|
isolated_manager_instance.cleanup()
|
|
|
|
try:
|
|
inventory = job.inventory
|
|
except Inventory.DoesNotExist:
|
|
pass
|
|
else:
|
|
if inventory is not None:
|
|
update_inventory_computed_fields.delay(inventory.id)
|
|
|
|
|
|
@task(queue=get_local_queuename)
|
|
class RunProjectUpdate(BaseTask):
|
|
|
|
model = ProjectUpdate
|
|
event_model = ProjectUpdateEvent
|
|
event_data_key = 'project_update_id'
|
|
|
|
@property
|
|
def proot_show_paths(self):
|
|
return [settings.PROJECTS_ROOT]
|
|
|
|
def __init__(self, *args, job_private_data_dir=None, **kwargs):
|
|
super(RunProjectUpdate, self).__init__(*args, **kwargs)
|
|
self.playbook_new_revision = None
|
|
self.original_branch = None
|
|
self.job_private_data_dir = job_private_data_dir
|
|
|
|
def event_handler(self, event_data):
|
|
super(RunProjectUpdate, self).event_handler(event_data)
|
|
returned_data = event_data.get('event_data', {})
|
|
if returned_data.get('task_action', '') == 'set_fact':
|
|
returned_facts = returned_data.get('res', {}).get('ansible_facts', {})
|
|
if 'scm_version' in returned_facts:
|
|
self.playbook_new_revision = returned_facts['scm_version']
|
|
|
|
def build_private_data(self, project_update, private_data_dir):
|
|
'''
|
|
Return SSH private key data needed for this project update.
|
|
|
|
Returns a dict of the form
|
|
{
|
|
'credentials': {
|
|
<awx.main.models.Credential>: <credential_decrypted_ssh_key_data>,
|
|
<awx.main.models.Credential>: <credential_decrypted_ssh_key_data>,
|
|
<awx.main.models.Credential>: <credential_decrypted_ssh_key_data>
|
|
}
|
|
}
|
|
'''
|
|
private_data = {'credentials': {}}
|
|
if project_update.credential:
|
|
credential = project_update.credential
|
|
if credential.has_input('ssh_key_data'):
|
|
private_data['credentials'][credential] = credential.get_input('ssh_key_data', default='')
|
|
return private_data
|
|
|
|
def build_passwords(self, project_update, runtime_passwords):
|
|
'''
|
|
Build a dictionary of passwords for SSH private key unlock and SCM
|
|
username/password.
|
|
'''
|
|
passwords = super(RunProjectUpdate, self).build_passwords(project_update, runtime_passwords)
|
|
if project_update.credential:
|
|
passwords['scm_key_unlock'] = project_update.credential.get_input('ssh_key_unlock', default='')
|
|
passwords['scm_username'] = project_update.credential.get_input('username', default='')
|
|
passwords['scm_password'] = project_update.credential.get_input('password', default='')
|
|
return passwords
|
|
|
|
def build_env(self, project_update, private_data_dir, isolated=False, private_data_files=None):
|
|
'''
|
|
Build environment dictionary for ansible-playbook.
|
|
'''
|
|
env = super(RunProjectUpdate, self).build_env(project_update, private_data_dir,
|
|
isolated=isolated,
|
|
private_data_files=private_data_files)
|
|
self.add_ansible_venv(settings.ANSIBLE_VENV_PATH, env)
|
|
env['ANSIBLE_RETRY_FILES_ENABLED'] = str(False)
|
|
env['ANSIBLE_ASK_PASS'] = str(False)
|
|
env['ANSIBLE_BECOME_ASK_PASS'] = str(False)
|
|
env['DISPLAY'] = '' # Prevent stupid password popup when running tests.
|
|
# give ansible a hint about the intended tmpdir to work around issues
|
|
# like https://github.com/ansible/ansible/issues/30064
|
|
env['TMP'] = settings.AWX_PROOT_BASE_PATH
|
|
env['PROJECT_UPDATE_ID'] = str(project_update.pk)
|
|
if settings.GALAXY_IGNORE_CERTS:
|
|
env['ANSIBLE_GALAXY_IGNORE'] = True
|
|
|
|
# build out env vars for Galaxy credentials (in order)
|
|
galaxy_server_list = []
|
|
if project_update.project.organization:
|
|
for i, cred in enumerate(
|
|
project_update.project.organization.galaxy_credentials.all()
|
|
):
|
|
env[f'ANSIBLE_GALAXY_SERVER_SERVER{i}_URL'] = cred.get_input('url')
|
|
auth_url = cred.get_input('auth_url', default=None)
|
|
token = cred.get_input('token', default=None)
|
|
if token:
|
|
env[f'ANSIBLE_GALAXY_SERVER_SERVER{i}_TOKEN'] = token
|
|
if auth_url:
|
|
env[f'ANSIBLE_GALAXY_SERVER_SERVER{i}_AUTH_URL'] = auth_url
|
|
galaxy_server_list.append(f'server{i}')
|
|
|
|
if galaxy_server_list:
|
|
env['ANSIBLE_GALAXY_SERVER_LIST'] = ','.join(galaxy_server_list)
|
|
|
|
return env
|
|
|
|
def _build_scm_url_extra_vars(self, project_update):
|
|
'''
|
|
Helper method to build SCM url and extra vars with parameters needed
|
|
for authentication.
|
|
'''
|
|
extra_vars = {}
|
|
if project_update.credential:
|
|
scm_username = project_update.credential.get_input('username', default='')
|
|
scm_password = project_update.credential.get_input('password', default='')
|
|
else:
|
|
scm_username = ''
|
|
scm_password = ''
|
|
scm_type = project_update.scm_type
|
|
scm_url = update_scm_url(scm_type, project_update.scm_url,
|
|
check_special_cases=False)
|
|
scm_url_parts = urlparse.urlsplit(scm_url)
|
|
# Prefer the username/password in the URL, if provided.
|
|
scm_username = scm_url_parts.username or scm_username
|
|
scm_password = scm_url_parts.password or scm_password
|
|
if scm_username:
|
|
if scm_type == 'svn':
|
|
extra_vars['scm_username'] = scm_username
|
|
extra_vars['scm_password'] = scm_password
|
|
scm_password = False
|
|
if scm_url_parts.scheme != 'svn+ssh':
|
|
scm_username = False
|
|
elif scm_url_parts.scheme.endswith('ssh'):
|
|
scm_password = False
|
|
elif scm_type in ('insights', 'archive'):
|
|
extra_vars['scm_username'] = scm_username
|
|
extra_vars['scm_password'] = scm_password
|
|
scm_url = update_scm_url(scm_type, scm_url, scm_username,
|
|
scm_password, scp_format=True)
|
|
else:
|
|
scm_url = update_scm_url(scm_type, scm_url, scp_format=True)
|
|
|
|
# Pass the extra accept_hostkey parameter to the git module.
|
|
if scm_type == 'git' and scm_url_parts.scheme.endswith('ssh'):
|
|
extra_vars['scm_accept_hostkey'] = 'true'
|
|
|
|
return scm_url, extra_vars
|
|
|
|
def build_inventory(self, instance, private_data_dir):
|
|
return 'localhost,'
|
|
|
|
def build_args(self, project_update, private_data_dir, passwords):
|
|
'''
|
|
Build command line argument list for running ansible-playbook,
|
|
optionally using ssh-agent for public/private key authentication.
|
|
'''
|
|
args = []
|
|
if getattr(settings, 'PROJECT_UPDATE_VVV', False):
|
|
args.append('-vvv')
|
|
if project_update.job_tags:
|
|
args.extend(['-t', project_update.job_tags])
|
|
return args
|
|
|
|
def build_extra_vars_file(self, project_update, private_data_dir):
|
|
extra_vars = {}
|
|
scm_url, extra_vars_new = self._build_scm_url_extra_vars(project_update)
|
|
extra_vars.update(extra_vars_new)
|
|
|
|
scm_branch = project_update.scm_branch
|
|
if project_update.job_type == 'run' and (not project_update.branch_override):
|
|
if project_update.project.scm_revision:
|
|
scm_branch = project_update.project.scm_revision
|
|
elif not scm_branch:
|
|
raise RuntimeError('Could not determine a revision to run from project.')
|
|
elif not scm_branch:
|
|
scm_branch = 'HEAD'
|
|
|
|
galaxy_creds_are_defined = (
|
|
project_update.project.organization and
|
|
project_update.project.organization.galaxy_credentials.exists()
|
|
)
|
|
if not galaxy_creds_are_defined and (
|
|
settings.AWX_ROLES_ENABLED or settings.AWX_COLLECTIONS_ENABLED
|
|
):
|
|
logger.warning(
|
|
'Galaxy role/collection syncing is enabled, but no '
|
|
f'credentials are configured for {project_update.project.organization}.'
|
|
)
|
|
|
|
extra_vars.update({
|
|
'projects_root': settings.PROJECTS_ROOT.rstrip('/'),
|
|
'local_path': os.path.basename(project_update.project.local_path),
|
|
'project_path': project_update.get_project_path(check_if_exists=False), # deprecated
|
|
'insights_url': settings.INSIGHTS_URL_BASE,
|
|
'awx_license_type': get_license().get('license_type', 'UNLICENSED'),
|
|
'awx_version': get_awx_version(),
|
|
'scm_url': scm_url,
|
|
'scm_branch': scm_branch,
|
|
'scm_clean': project_update.scm_clean,
|
|
'roles_enabled': galaxy_creds_are_defined and settings.AWX_ROLES_ENABLED,
|
|
'collections_enabled': galaxy_creds_are_defined and settings.AWX_COLLECTIONS_ENABLED,
|
|
})
|
|
# apply custom refspec from user for PR refs and the like
|
|
if project_update.scm_refspec:
|
|
extra_vars['scm_refspec'] = project_update.scm_refspec
|
|
elif project_update.project.allow_override:
|
|
# If branch is override-able, do extra fetch for all branches
|
|
extra_vars['scm_refspec'] = 'refs/heads/*:refs/remotes/origin/*'
|
|
self._write_extra_vars_file(private_data_dir, extra_vars)
|
|
|
|
def build_cwd(self, project_update, private_data_dir):
|
|
return os.path.join(private_data_dir, 'project')
|
|
|
|
def build_playbook_path_relative_to_cwd(self, project_update, private_data_dir):
|
|
return os.path.join('project_update.yml')
|
|
|
|
def get_password_prompts(self, passwords={}):
|
|
d = super(RunProjectUpdate, self).get_password_prompts(passwords)
|
|
d[r'Username for.*:\s*?$'] = 'scm_username'
|
|
d[r'Password for.*:\s*?$'] = 'scm_password'
|
|
d['Password:\s*?$'] = 'scm_password' # noqa
|
|
d[r'\S+?@\S+?\'s\s+?password:\s*?$'] = 'scm_password'
|
|
d[r'Enter passphrase for .*:\s*?$'] = 'scm_key_unlock'
|
|
d[r'Bad passphrase, try again for .*:\s*?$'] = ''
|
|
# FIXME: Configure whether we should auto accept host keys?
|
|
d[r'^Are you sure you want to continue connecting \(yes/no\)\?\s*?$'] = 'yes'
|
|
return d
|
|
|
|
def _update_dependent_inventories(self, project_update, dependent_inventory_sources):
|
|
scm_revision = project_update.project.scm_revision
|
|
inv_update_class = InventoryUpdate._get_task_class()
|
|
for inv_src in dependent_inventory_sources:
|
|
if not inv_src.update_on_project_update:
|
|
continue
|
|
if inv_src.scm_last_revision == scm_revision:
|
|
logger.debug('Skipping SCM inventory update for `{}` because '
|
|
'project has not changed.'.format(inv_src.name))
|
|
continue
|
|
logger.debug('Local dependent inventory update for `{}`.'.format(inv_src.name))
|
|
with transaction.atomic():
|
|
if InventoryUpdate.objects.filter(inventory_source=inv_src,
|
|
status__in=ACTIVE_STATES).exists():
|
|
logger.debug('Skipping SCM inventory update for `{}` because '
|
|
'another update is already active.'.format(inv_src.name))
|
|
continue
|
|
local_inv_update = inv_src.create_inventory_update(
|
|
_eager_fields=dict(
|
|
launch_type='scm',
|
|
status='running',
|
|
instance_group=project_update.instance_group,
|
|
execution_node=project_update.execution_node,
|
|
source_project_update=project_update,
|
|
celery_task_id=project_update.celery_task_id))
|
|
try:
|
|
inv_update_class().run(local_inv_update.id)
|
|
except Exception:
|
|
logger.exception('{} Unhandled exception updating dependent SCM inventory sources.'.format(
|
|
project_update.log_format
|
|
))
|
|
|
|
try:
|
|
project_update.refresh_from_db()
|
|
except ProjectUpdate.DoesNotExist:
|
|
logger.warning('Project update deleted during updates of dependent SCM inventory sources.')
|
|
break
|
|
try:
|
|
local_inv_update.refresh_from_db()
|
|
except InventoryUpdate.DoesNotExist:
|
|
logger.warning('%s Dependent inventory update deleted during execution.', project_update.log_format)
|
|
continue
|
|
if project_update.cancel_flag:
|
|
logger.info('Project update {} was canceled while updating dependent inventories.'.format(project_update.log_format))
|
|
break
|
|
if local_inv_update.cancel_flag:
|
|
logger.info('Continuing to process project dependencies after {} was canceled'.format(local_inv_update.log_format))
|
|
if local_inv_update.status == 'successful':
|
|
inv_src.scm_last_revision = scm_revision
|
|
inv_src.save(update_fields=['scm_last_revision'])
|
|
|
|
def release_lock(self, instance):
|
|
try:
|
|
fcntl.lockf(self.lock_fd, fcntl.LOCK_UN)
|
|
except IOError as e:
|
|
logger.error("I/O error({0}) while trying to release lock file [{1}]: {2}".format(e.errno, instance.get_lock_file(), e.strerror))
|
|
os.close(self.lock_fd)
|
|
raise
|
|
|
|
os.close(self.lock_fd)
|
|
self.lock_fd = None
|
|
|
|
'''
|
|
Note: We don't support blocking=False
|
|
'''
|
|
def acquire_lock(self, instance, blocking=True):
|
|
lock_path = instance.get_lock_file()
|
|
if lock_path is None:
|
|
# If from migration or someone blanked local_path for any other reason, recoverable by save
|
|
instance.save()
|
|
lock_path = instance.get_lock_file()
|
|
if lock_path is None:
|
|
raise RuntimeError(u'Invalid lock file path')
|
|
|
|
try:
|
|
self.lock_fd = os.open(lock_path, os.O_RDWR | os.O_CREAT)
|
|
except OSError as e:
|
|
logger.error("I/O error({0}) while trying to open lock file [{1}]: {2}".format(e.errno, lock_path, e.strerror))
|
|
raise
|
|
|
|
start_time = time.time()
|
|
while True:
|
|
try:
|
|
instance.refresh_from_db(fields=['cancel_flag'])
|
|
if instance.cancel_flag:
|
|
logger.debug("ProjectUpdate({0}) was canceled".format(instance.pk))
|
|
return
|
|
fcntl.lockf(self.lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
|
break
|
|
except IOError as e:
|
|
if e.errno not in (errno.EAGAIN, errno.EACCES):
|
|
os.close(self.lock_fd)
|
|
logger.error("I/O error({0}) while trying to aquire lock on file [{1}]: {2}".format(e.errno, lock_path, e.strerror))
|
|
raise
|
|
else:
|
|
time.sleep(1.0)
|
|
waiting_time = time.time() - start_time
|
|
|
|
if waiting_time > 1.0:
|
|
logger.info(
|
|
'{} spent {} waiting to acquire lock for local source tree '
|
|
'for path {}.'.format(instance.log_format, waiting_time, lock_path))
|
|
|
|
def pre_run_hook(self, instance, private_data_dir):
|
|
# re-create root project folder if a natural disaster has destroyed it
|
|
if not os.path.exists(settings.PROJECTS_ROOT):
|
|
os.mkdir(settings.PROJECTS_ROOT)
|
|
self.acquire_lock(instance)
|
|
self.original_branch = None
|
|
if instance.scm_type == 'git' and instance.branch_override:
|
|
project_path = instance.project.get_project_path(check_if_exists=False)
|
|
if os.path.exists(project_path):
|
|
git_repo = git.Repo(project_path)
|
|
if git_repo.head.is_detached:
|
|
self.original_branch = git_repo.head.commit
|
|
else:
|
|
self.original_branch = git_repo.active_branch
|
|
|
|
stage_path = os.path.join(instance.get_cache_path(), 'stage')
|
|
if os.path.exists(stage_path):
|
|
logger.warning('{0} unexpectedly existed before update'.format(stage_path))
|
|
shutil.rmtree(stage_path)
|
|
os.makedirs(stage_path) # presence of empty cache indicates lack of roles or collections
|
|
|
|
# the project update playbook is not in a git repo, but uses a vendoring directory
|
|
# to be consistent with the ansible-runner model,
|
|
# that is moved into the runner projecct folder here
|
|
awx_playbooks = self.get_path_to('..', 'playbooks')
|
|
copy_tree(awx_playbooks, os.path.join(private_data_dir, 'project'))
|
|
|
|
@staticmethod
|
|
def clear_project_cache(cache_dir, keep_value):
|
|
if os.path.isdir(cache_dir):
|
|
for entry in os.listdir(cache_dir):
|
|
old_path = os.path.join(cache_dir, entry)
|
|
if entry not in (keep_value, 'stage'):
|
|
# invalidate, then delete
|
|
new_path = os.path.join(cache_dir,'.~~delete~~' + entry)
|
|
try:
|
|
os.rename(old_path, new_path)
|
|
shutil.rmtree(new_path)
|
|
except OSError:
|
|
logger.warning(f"Could not remove cache directory {old_path}")
|
|
|
|
@staticmethod
|
|
def make_local_copy(p, job_private_data_dir, scm_revision=None):
|
|
"""Copy project content (roles and collections) to a job private_data_dir
|
|
|
|
:param object p: Either a project or a project update
|
|
:param str job_private_data_dir: The root of the target ansible-runner folder
|
|
:param str scm_revision: For branch_override cases, the git revision to copy
|
|
"""
|
|
project_path = p.get_project_path(check_if_exists=False)
|
|
destination_folder = os.path.join(job_private_data_dir, 'project')
|
|
if not scm_revision:
|
|
scm_revision = p.scm_revision
|
|
|
|
if p.scm_type == 'git':
|
|
git_repo = git.Repo(project_path)
|
|
if not os.path.exists(destination_folder):
|
|
os.mkdir(destination_folder, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC)
|
|
tmp_branch_name = 'awx_internal/{}'.format(uuid4())
|
|
# always clone based on specific job revision
|
|
if not p.scm_revision:
|
|
raise RuntimeError('Unexpectedly could not determine a revision to run from project.')
|
|
source_branch = git_repo.create_head(tmp_branch_name, p.scm_revision)
|
|
# git clone must take file:// syntax for source repo or else options like depth will be ignored
|
|
source_as_uri = Path(project_path).as_uri()
|
|
git.Repo.clone_from(
|
|
source_as_uri, destination_folder, branch=source_branch,
|
|
depth=1, single_branch=True, # shallow, do not copy full history
|
|
)
|
|
# submodules copied in loop because shallow copies from local HEADs are ideal
|
|
# and no git clone submodule options are compatible with minimum requirements
|
|
for submodule in git_repo.submodules:
|
|
subrepo_path = os.path.abspath(os.path.join(project_path, submodule.path))
|
|
subrepo_destination_folder = os.path.abspath(os.path.join(destination_folder, submodule.path))
|
|
subrepo_uri = Path(subrepo_path).as_uri()
|
|
git.Repo.clone_from(subrepo_uri, subrepo_destination_folder, depth=1, single_branch=True)
|
|
# force option is necessary because remote refs are not counted, although no information is lost
|
|
git_repo.delete_head(tmp_branch_name, force=True)
|
|
else:
|
|
copy_tree(project_path, destination_folder, preserve_symlinks=1)
|
|
|
|
# copy over the roles and collection cache to job folder
|
|
cache_path = os.path.join(p.get_cache_path(), p.cache_id)
|
|
subfolders = []
|
|
if settings.AWX_COLLECTIONS_ENABLED:
|
|
subfolders.append('requirements_collections')
|
|
if settings.AWX_ROLES_ENABLED:
|
|
subfolders.append('requirements_roles')
|
|
for subfolder in subfolders:
|
|
cache_subpath = os.path.join(cache_path, subfolder)
|
|
if os.path.exists(cache_subpath):
|
|
dest_subpath = os.path.join(job_private_data_dir, subfolder)
|
|
copy_tree(cache_subpath, dest_subpath, preserve_symlinks=1)
|
|
logger.debug('{0} {1} prepared {2} from cache'.format(type(p).__name__, p.pk, dest_subpath))
|
|
|
|
def post_run_hook(self, instance, status):
|
|
# To avoid hangs, very important to release lock even if errors happen here
|
|
try:
|
|
if self.playbook_new_revision:
|
|
instance.scm_revision = self.playbook_new_revision
|
|
instance.save(update_fields=['scm_revision'])
|
|
|
|
# Roles and collection folders copy to durable cache
|
|
base_path = instance.get_cache_path()
|
|
stage_path = os.path.join(base_path, 'stage')
|
|
if status == 'successful' and 'install_' in instance.job_tags:
|
|
# Clear other caches before saving this one, and if branch is overridden
|
|
# do not clear cache for main branch, but do clear it for other branches
|
|
self.clear_project_cache(base_path, keep_value=instance.project.cache_id)
|
|
cache_path = os.path.join(base_path, instance.cache_id)
|
|
if os.path.exists(stage_path):
|
|
if os.path.exists(cache_path):
|
|
logger.warning('Rewriting cache at {0}, performance may suffer'.format(cache_path))
|
|
shutil.rmtree(cache_path)
|
|
os.rename(stage_path, cache_path)
|
|
logger.debug('{0} wrote to cache at {1}'.format(instance.log_format, cache_path))
|
|
elif os.path.exists(stage_path):
|
|
shutil.rmtree(stage_path) # cannot trust content update produced
|
|
|
|
if self.job_private_data_dir:
|
|
if status == 'successful':
|
|
# copy project folder before resetting to default branch
|
|
# because some git-tree-specific resources (like submodules) might matter
|
|
self.make_local_copy(instance, self.job_private_data_dir)
|
|
if self.original_branch:
|
|
# for git project syncs, non-default branches can be problems
|
|
# restore to branch the repo was on before this run
|
|
try:
|
|
self.original_branch.checkout()
|
|
except Exception:
|
|
# this could have failed due to dirty tree, but difficult to predict all cases
|
|
logger.exception('Failed to restore project repo to prior state after {}'.format(instance.log_format))
|
|
finally:
|
|
self.release_lock(instance)
|
|
p = instance.project
|
|
if instance.job_type == 'check' and status not in ('failed', 'canceled',):
|
|
if self.playbook_new_revision:
|
|
p.scm_revision = self.playbook_new_revision
|
|
else:
|
|
if status == 'successful':
|
|
logger.error("{} Could not find scm revision in check".format(instance.log_format))
|
|
p.playbook_files = p.playbooks
|
|
p.inventory_files = p.inventories
|
|
p.save(update_fields=['scm_revision', 'playbook_files', 'inventory_files'])
|
|
|
|
# Update any inventories that depend on this project
|
|
dependent_inventory_sources = p.scm_inventory_sources.filter(update_on_project_update=True)
|
|
if len(dependent_inventory_sources) > 0:
|
|
if status == 'successful' and instance.launch_type != 'sync':
|
|
self._update_dependent_inventories(instance, dependent_inventory_sources)
|
|
|
|
def should_use_proot(self, project_update):
|
|
'''
|
|
Return whether this task should use proot.
|
|
'''
|
|
return getattr(settings, 'AWX_PROOT_ENABLED', False)
|
|
|
|
|
|
@task(queue=get_local_queuename)
|
|
class RunInventoryUpdate(BaseTask):
|
|
|
|
model = InventoryUpdate
|
|
event_model = InventoryUpdateEvent
|
|
event_data_key = 'inventory_update_id'
|
|
|
|
# TODO: remove once inv updates run in containers
|
|
def should_use_proot(self, inventory_update):
|
|
'''
|
|
Return whether this task should use proot.
|
|
'''
|
|
return getattr(settings, 'AWX_PROOT_ENABLED', False)
|
|
|
|
# TODO: remove once inv updates run in containers
|
|
@property
|
|
def proot_show_paths(self):
|
|
return [settings.AWX_ANSIBLE_COLLECTIONS_PATHS]
|
|
|
|
def build_private_data(self, inventory_update, private_data_dir):
|
|
"""
|
|
Return private data needed for inventory update.
|
|
|
|
Returns a dict of the form
|
|
{
|
|
'credentials': {
|
|
<awx.main.models.Credential>: <credential_decrypted_ssh_key_data>,
|
|
<awx.main.models.Credential>: <credential_decrypted_ssh_key_data>,
|
|
<awx.main.models.Credential>: <credential_decrypted_ssh_key_data>
|
|
}
|
|
}
|
|
|
|
If no private data is needed, return None.
|
|
"""
|
|
if inventory_update.source in InventorySource.injectors:
|
|
injector = InventorySource.injectors[inventory_update.source]()
|
|
return injector.build_private_data(inventory_update, private_data_dir)
|
|
|
|
def build_env(self, inventory_update, private_data_dir, isolated, private_data_files=None):
|
|
"""Build environment dictionary for ansible-inventory.
|
|
|
|
Most environment variables related to credentials or configuration
|
|
are accomplished by the inventory source injectors (in this method)
|
|
or custom credential type injectors (in main run method).
|
|
"""
|
|
env = super(RunInventoryUpdate, self).build_env(inventory_update,
|
|
private_data_dir,
|
|
isolated,
|
|
private_data_files=private_data_files)
|
|
if private_data_files is None:
|
|
private_data_files = {}
|
|
# TODO: remove once containers replace custom venvs
|
|
self.add_ansible_venv(inventory_update.ansible_virtualenv_path, env, isolated=isolated)
|
|
|
|
# Legacy environment variables, were used as signal to awx-manage command
|
|
# now they are provided in case some scripts may be relying on them
|
|
env['INVENTORY_SOURCE_ID'] = str(inventory_update.inventory_source_id)
|
|
env['INVENTORY_UPDATE_ID'] = str(inventory_update.pk)
|
|
env.update(STANDARD_INVENTORY_UPDATE_ENV)
|
|
|
|
injector = None
|
|
if inventory_update.source in InventorySource.injectors:
|
|
injector = InventorySource.injectors[inventory_update.source]()
|
|
|
|
if injector is not None:
|
|
env = injector.build_env(inventory_update, env, private_data_dir, private_data_files)
|
|
# All CLOUD_PROVIDERS sources implement as inventory plugin from collection
|
|
env['ANSIBLE_INVENTORY_ENABLED'] = 'auto'
|
|
|
|
if inventory_update.source in ['scm', 'custom']:
|
|
for env_k in inventory_update.source_vars_dict:
|
|
if str(env_k) not in env and str(env_k) not in settings.INV_ENV_VARIABLE_BLOCKED:
|
|
env[str(env_k)] = str(inventory_update.source_vars_dict[env_k])
|
|
elif inventory_update.source == 'file':
|
|
raise NotImplementedError('Cannot update file sources through the task system.')
|
|
|
|
if inventory_update.source == 'scm' and inventory_update.source_project_update:
|
|
env_key = 'ANSIBLE_COLLECTIONS_PATHS'
|
|
config_setting = 'collections_paths'
|
|
folder = 'requirements_collections'
|
|
default = '~/.ansible/collections:/usr/share/ansible/collections'
|
|
|
|
config_values = read_ansible_config(os.path.join(private_data_dir, 'project'), [config_setting])
|
|
|
|
paths = default.split(':')
|
|
if env_key in env:
|
|
for path in env[env_key].split(':'):
|
|
if path not in paths:
|
|
paths = [env[env_key]] + paths
|
|
elif config_setting in config_values:
|
|
for path in config_values[config_setting].split(':'):
|
|
if path not in paths:
|
|
paths = [config_values[config_setting]] + paths
|
|
paths = [os.path.join(private_data_dir, folder)] + paths
|
|
env[env_key] = os.pathsep.join(paths)
|
|
|
|
return env
|
|
|
|
def write_args_file(self, private_data_dir, args):
|
|
path = os.path.join(private_data_dir, 'args')
|
|
handle = os.open(path, os.O_RDWR | os.O_CREAT, stat.S_IREAD | stat.S_IWRITE)
|
|
f = os.fdopen(handle, 'w')
|
|
f.write(' '.join(args))
|
|
f.close()
|
|
os.chmod(path, stat.S_IRUSR)
|
|
return path
|
|
|
|
def build_args(self, inventory_update, private_data_dir, passwords):
|
|
"""Build the command line argument list for running an inventory
|
|
import.
|
|
"""
|
|
# Get the inventory source and inventory.
|
|
inventory_source = inventory_update.inventory_source
|
|
inventory = inventory_source.inventory
|
|
|
|
if inventory is None:
|
|
raise RuntimeError('Inventory Source is not associated with an Inventory.')
|
|
|
|
args = ['ansible-inventory', '--list', '--export']
|
|
|
|
# Add arguments for the source inventory file/script/thing
|
|
source_location = self.pseudo_build_inventory(inventory_update, private_data_dir)
|
|
args.append('-i')
|
|
args.append(source_location)
|
|
|
|
args.append('--output')
|
|
args.append(os.path.join(private_data_dir, 'artifacts', 'output.json'))
|
|
|
|
if os.path.isdir(source_location):
|
|
playbook_dir = source_location
|
|
else:
|
|
playbook_dir = os.path.dirname(source_location)
|
|
args.extend(['--playbook-dir', playbook_dir])
|
|
|
|
if inventory_update.verbosity:
|
|
args.append('-' + 'v' * min(5, inventory_update.verbosity * 2 + 1))
|
|
|
|
return args
|
|
|
|
def build_inventory(self, inventory_update, private_data_dir):
|
|
return None # what runner expects in order to not deal with inventory
|
|
|
|
def pseudo_build_inventory(self, inventory_update, private_data_dir):
|
|
"""Inventory imports are ran through a management command
|
|
we pass the inventory in args to that command, so this is not considered
|
|
to be "Ansible" inventory (by runner) even though it is
|
|
Eventually, we would like to cut out the management command,
|
|
and thus use this as the real inventory
|
|
"""
|
|
src = inventory_update.source
|
|
|
|
injector = None
|
|
if inventory_update.source in InventorySource.injectors:
|
|
injector = InventorySource.injectors[src]()
|
|
|
|
if injector is not None:
|
|
content = injector.inventory_contents(inventory_update, private_data_dir)
|
|
# must be a statically named file
|
|
inventory_path = os.path.join(private_data_dir, injector.filename)
|
|
with open(inventory_path, 'w') as f:
|
|
f.write(content)
|
|
os.chmod(inventory_path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
|
|
elif src == 'scm':
|
|
inventory_path = os.path.join(private_data_dir, 'project', inventory_update.source_path)
|
|
elif src == 'custom':
|
|
handle, inventory_path = tempfile.mkstemp(dir=private_data_dir)
|
|
f = os.fdopen(handle, 'w')
|
|
if inventory_update.source_script is None:
|
|
raise RuntimeError('Inventory Script does not exist')
|
|
f.write(inventory_update.source_script.script)
|
|
f.close()
|
|
os.chmod(inventory_path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
|
|
return inventory_path
|
|
|
|
def build_cwd(self, inventory_update, private_data_dir):
|
|
'''
|
|
There is one case where the inventory "source" is in a different
|
|
location from the private data:
|
|
- SCM, where source needs to live in the project folder
|
|
'''
|
|
src = inventory_update.source
|
|
if src == 'scm' and inventory_update.source_project_update:
|
|
return os.path.join(private_data_dir, 'project')
|
|
return private_data_dir
|
|
|
|
def build_playbook_path_relative_to_cwd(self, inventory_update, private_data_dir):
|
|
return None
|
|
|
|
def build_credentials_list(self, inventory_update):
|
|
# All credentials not used by inventory source injector
|
|
return inventory_update.get_extra_credentials()
|
|
|
|
def pre_run_hook(self, inventory_update, private_data_dir):
|
|
source_project = None
|
|
if inventory_update.inventory_source:
|
|
source_project = inventory_update.inventory_source.source_project
|
|
if (inventory_update.source=='scm' and inventory_update.launch_type!='scm' and
|
|
source_project and source_project.scm_type): # never ever update manual projects
|
|
|
|
# Check if the content cache exists, so that we do not unnecessarily re-download roles
|
|
sync_needs = ['update_{}'.format(source_project.scm_type)]
|
|
has_cache = os.path.exists(os.path.join(source_project.get_cache_path(), source_project.cache_id))
|
|
# Galaxy requirements are not supported for manual projects
|
|
if not has_cache:
|
|
sync_needs.extend(['install_roles', 'install_collections'])
|
|
|
|
local_project_sync = source_project.create_project_update(
|
|
_eager_fields=dict(
|
|
launch_type="sync",
|
|
job_type='run',
|
|
job_tags=','.join(sync_needs),
|
|
status='running',
|
|
execution_node=inventory_update.execution_node,
|
|
instance_group = inventory_update.instance_group,
|
|
celery_task_id=inventory_update.celery_task_id))
|
|
# associate the inventory update before calling run() so that a
|
|
# cancel() call on the inventory update can cancel the project update
|
|
local_project_sync.scm_inventory_updates.add(inventory_update)
|
|
|
|
project_update_task = local_project_sync._get_task_class()
|
|
try:
|
|
sync_task = project_update_task(job_private_data_dir=private_data_dir)
|
|
sync_task.run(local_project_sync.id)
|
|
local_project_sync.refresh_from_db()
|
|
inventory_update.inventory_source.scm_last_revision = local_project_sync.scm_revision
|
|
inventory_update.inventory_source.save(update_fields=['scm_last_revision'])
|
|
except Exception:
|
|
inventory_update = self.update_model(
|
|
inventory_update.pk, status='failed',
|
|
job_explanation=('Previous Task Failed: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}' %
|
|
('project_update', local_project_sync.name, local_project_sync.id)))
|
|
raise
|
|
elif inventory_update.source == 'scm' and inventory_update.launch_type == 'scm' and source_project:
|
|
# This follows update, not sync, so make copy here
|
|
RunProjectUpdate.make_local_copy(source_project, private_data_dir)
|
|
|
|
def post_run_hook(self, inventory_update, status):
|
|
if status != 'successful':
|
|
return # nothing to save, step out of the way to allow error reporting
|
|
|
|
private_data_dir = inventory_update.job_env['AWX_PRIVATE_DATA_DIR']
|
|
expected_output = os.path.join(private_data_dir, 'artifacts', 'output.json')
|
|
with open(expected_output) as f:
|
|
data = json.load(f)
|
|
|
|
# build inventory save options
|
|
options = dict(
|
|
overwrite=inventory_update.overwrite,
|
|
overwrite_vars=inventory_update.overwrite_vars,
|
|
)
|
|
src = inventory_update.source
|
|
|
|
if inventory_update.enabled_var:
|
|
options['enabled_var'] = inventory_update.enabled_var
|
|
options['enabled_value'] = inventory_update.enabled_value
|
|
else:
|
|
if getattr(settings, '%s_ENABLED_VAR' % src.upper(), False):
|
|
options['enabled_var'] = getattr(settings, '%s_ENABLED_VAR' % src.upper())
|
|
if getattr(settings, '%s_ENABLED_VALUE' % src.upper(), False):
|
|
options['enabled_value'] = getattr(settings, '%s_ENABLED_VALUE' % src.upper())
|
|
|
|
if inventory_update.host_filter:
|
|
options['host_filter'] = inventory_update.host_filter
|
|
|
|
if getattr(settings, '%s_EXCLUDE_EMPTY_GROUPS' % src.upper()):
|
|
options['exclude_empty_groups'] = True
|
|
if getattr(settings, '%s_INSTANCE_ID_VAR' % src.upper(), False):
|
|
options['instance_id_var'] = getattr(settings, '%s_INSTANCE_ID_VAR' % src.upper())
|
|
|
|
# Verbosity is applied to saving process, as well as ansible-inventory CLI option
|
|
if inventory_update.verbosity:
|
|
options['verbosity'] = inventory_update.verbosity
|
|
|
|
handler = SpecialInventoryHandler(
|
|
self.event_handler, self.cancel_callback,
|
|
verbosity=inventory_update.verbosity,
|
|
job_timeout=self.get_instance_timeout(self.instance),
|
|
start_time=inventory_update.started,
|
|
counter=self.event_ct, initial_line=self.end_line
|
|
)
|
|
inv_logger = logging.getLogger('awx.main.commands.inventory_import')
|
|
formatter = inv_logger.handlers[0].formatter
|
|
formatter.job_start = inventory_update.started
|
|
handler.formatter = formatter
|
|
inv_logger.handlers[0] = handler
|
|
|
|
from awx.main.management.commands.inventory_import import Command as InventoryImportCommand
|
|
cmd = InventoryImportCommand()
|
|
try:
|
|
# save the inventory data to database.
|
|
# canceling exceptions will be handled in the global post_run_hook
|
|
cmd.perform_update(options, data, inventory_update)
|
|
except PermissionDenied as exc:
|
|
logger.exception('License error saving {} content'.format(inventory_update.log_format))
|
|
raise PostRunError(str(exc), status='error')
|
|
except PostRunError:
|
|
logger.exception('Error saving {} content, rolling back changes'.format(inventory_update.log_format))
|
|
raise
|
|
except Exception:
|
|
logger.exception('Exception saving {} content, rolling back changes.'.format(
|
|
inventory_update.log_format))
|
|
raise PostRunError(
|
|
'Error occured while saving inventory data, see traceback or server logs',
|
|
status='error', tb=traceback.format_exc())
|
|
|
|
|
|
@task(queue=get_local_queuename)
|
|
class RunAdHocCommand(BaseTask):
|
|
'''
|
|
Run an ad hoc command using ansible.
|
|
'''
|
|
|
|
model = AdHocCommand
|
|
event_model = AdHocCommandEvent
|
|
event_data_key = 'ad_hoc_command_id'
|
|
|
|
def build_private_data(self, ad_hoc_command, private_data_dir):
|
|
'''
|
|
Return SSH private key data needed for this ad hoc command (only if
|
|
stored in DB as ssh_key_data).
|
|
|
|
Returns a dict of the form
|
|
{
|
|
'credentials': {
|
|
<awx.main.models.Credential>: <credential_decrypted_ssh_key_data>,
|
|
<awx.main.models.Credential>: <credential_decrypted_ssh_key_data>,
|
|
...
|
|
},
|
|
'certificates': {
|
|
<awx.main.models.Credential>: <signed SSH certificate data>,
|
|
<awx.main.models.Credential>: <signed SSH certificate data>,
|
|
...
|
|
}
|
|
}
|
|
'''
|
|
# If we were sent SSH credentials, decrypt them and send them
|
|
# back (they will be written to a temporary file).
|
|
creds = ad_hoc_command.credential
|
|
private_data = {'credentials': {}}
|
|
if creds and creds.has_input('ssh_key_data'):
|
|
private_data['credentials'][creds] = creds.get_input('ssh_key_data', default='')
|
|
if creds and creds.has_input('ssh_public_key_data'):
|
|
private_data.setdefault('certificates', {})[creds] = creds.get_input('ssh_public_key_data', default='')
|
|
return private_data
|
|
|
|
def build_passwords(self, ad_hoc_command, runtime_passwords):
|
|
'''
|
|
Build a dictionary of passwords for SSH private key, SSH user and
|
|
sudo/su.
|
|
'''
|
|
passwords = super(RunAdHocCommand, self).build_passwords(ad_hoc_command, runtime_passwords)
|
|
cred = ad_hoc_command.credential
|
|
if cred:
|
|
for field in ('ssh_key_unlock', 'ssh_password', 'become_password'):
|
|
value = runtime_passwords.get(field, cred.get_input('password' if field == 'ssh_password' else field, default=''))
|
|
if value not in ('', 'ASK'):
|
|
passwords[field] = value
|
|
return passwords
|
|
|
|
def build_env(self, ad_hoc_command, private_data_dir, isolated=False, private_data_files=None):
|
|
'''
|
|
Build environment dictionary for ansible.
|
|
'''
|
|
env = super(RunAdHocCommand, self).build_env(ad_hoc_command, private_data_dir,
|
|
isolated=isolated,
|
|
private_data_files=private_data_files)
|
|
self.add_ansible_venv(settings.ANSIBLE_VENV_PATH, env)
|
|
# Set environment variables needed for inventory and ad hoc event
|
|
# callbacks to work.
|
|
env['AD_HOC_COMMAND_ID'] = str(ad_hoc_command.pk)
|
|
env['INVENTORY_ID'] = str(ad_hoc_command.inventory.pk)
|
|
env['INVENTORY_HOSTVARS'] = str(True)
|
|
env['ANSIBLE_LOAD_CALLBACK_PLUGINS'] = '1'
|
|
env['ANSIBLE_SFTP_BATCH_MODE'] = 'False'
|
|
|
|
# Create a directory for ControlPath sockets that is unique to each
|
|
# ad hoc command and visible inside the proot environment (when enabled).
|
|
cp_dir = os.path.join(private_data_dir, 'cp')
|
|
if not os.path.exists(cp_dir):
|
|
os.mkdir(cp_dir, 0o700)
|
|
env['ANSIBLE_SSH_CONTROL_PATH'] = cp_dir
|
|
|
|
return env
|
|
|
|
def build_args(self, ad_hoc_command, private_data_dir, passwords):
|
|
'''
|
|
Build command line argument list for running ansible, optionally using
|
|
ssh-agent for public/private key authentication.
|
|
'''
|
|
creds = ad_hoc_command.credential
|
|
ssh_username, become_username, become_method = '', '', ''
|
|
if creds:
|
|
ssh_username = creds.get_input('username', default='')
|
|
become_method = creds.get_input('become_method', default='')
|
|
become_username = creds.get_input('become_username', default='')
|
|
else:
|
|
become_method = None
|
|
become_username = ""
|
|
# Always specify the normal SSH user as root by default. Since this
|
|
# task is normally running in the background under a service account,
|
|
# it doesn't make sense to rely on ansible's default of using the
|
|
# current user.
|
|
ssh_username = ssh_username or 'root'
|
|
args = []
|
|
if ad_hoc_command.job_type == 'check':
|
|
args.append('--check')
|
|
args.extend(['-u', sanitize_jinja(ssh_username)])
|
|
if 'ssh_password' in passwords:
|
|
args.append('--ask-pass')
|
|
# We only specify sudo/su user and password if explicitly given by the
|
|
# credential. Credential should never specify both sudo and su.
|
|
if ad_hoc_command.become_enabled:
|
|
args.append('--become')
|
|
if become_method:
|
|
args.extend(['--become-method', sanitize_jinja(become_method)])
|
|
if become_username:
|
|
args.extend(['--become-user', sanitize_jinja(become_username)])
|
|
if 'become_password' in passwords:
|
|
args.append('--ask-become-pass')
|
|
|
|
if ad_hoc_command.forks: # FIXME: Max limit?
|
|
args.append('--forks=%d' % ad_hoc_command.forks)
|
|
if ad_hoc_command.diff_mode:
|
|
args.append('--diff')
|
|
if ad_hoc_command.verbosity:
|
|
args.append('-%s' % ('v' * min(5, ad_hoc_command.verbosity)))
|
|
|
|
extra_vars = ad_hoc_command.awx_meta_vars()
|
|
|
|
if ad_hoc_command.extra_vars_dict:
|
|
redacted_extra_vars, removed_vars = extract_ansible_vars(ad_hoc_command.extra_vars_dict)
|
|
if removed_vars:
|
|
raise ValueError(_(
|
|
"{} are prohibited from use in ad hoc commands."
|
|
).format(", ".join(removed_vars)))
|
|
extra_vars.update(ad_hoc_command.extra_vars_dict)
|
|
|
|
if ad_hoc_command.limit:
|
|
args.append(ad_hoc_command.limit)
|
|
else:
|
|
args.append('all')
|
|
|
|
return args
|
|
|
|
def build_extra_vars_file(self, ad_hoc_command, private_data_dir):
|
|
extra_vars = ad_hoc_command.awx_meta_vars()
|
|
|
|
if ad_hoc_command.extra_vars_dict:
|
|
redacted_extra_vars, removed_vars = extract_ansible_vars(ad_hoc_command.extra_vars_dict)
|
|
if removed_vars:
|
|
raise ValueError(_(
|
|
"{} are prohibited from use in ad hoc commands."
|
|
).format(", ".join(removed_vars)))
|
|
extra_vars.update(ad_hoc_command.extra_vars_dict)
|
|
self._write_extra_vars_file(private_data_dir, extra_vars)
|
|
|
|
def build_module_name(self, ad_hoc_command):
|
|
return ad_hoc_command.module_name
|
|
|
|
def build_module_args(self, ad_hoc_command):
|
|
module_args = ad_hoc_command.module_args
|
|
if settings.ALLOW_JINJA_IN_EXTRA_VARS != 'always':
|
|
module_args = sanitize_jinja(module_args)
|
|
return module_args
|
|
|
|
def build_cwd(self, ad_hoc_command, private_data_dir):
|
|
return private_data_dir
|
|
|
|
def build_playbook_path_relative_to_cwd(self, job, private_data_dir):
|
|
return None
|
|
|
|
def get_password_prompts(self, passwords={}):
|
|
d = super(RunAdHocCommand, self).get_password_prompts()
|
|
d[r'Enter passphrase for .*:\s*?$'] = 'ssh_key_unlock'
|
|
d[r'Bad passphrase, try again for .*:\s*?$'] = ''
|
|
for method in PRIVILEGE_ESCALATION_METHODS:
|
|
d[r'%s password.*:\s*?$' % (method[0])] = 'become_password'
|
|
d[r'%s password.*:\s*?$' % (method[0].upper())] = 'become_password'
|
|
d[r'BECOME password.*:\s*?$'] = 'become_password'
|
|
d[r'SSH password:\s*?$'] = 'ssh_password'
|
|
d[r'Password:\s*?$'] = 'ssh_password'
|
|
return d
|
|
|
|
def should_use_proot(self, ad_hoc_command):
|
|
'''
|
|
Return whether this task should use proot.
|
|
'''
|
|
if ad_hoc_command.is_containerized:
|
|
return False
|
|
return getattr(settings, 'AWX_PROOT_ENABLED', False)
|
|
|
|
def final_run_hook(self, adhoc_job, status, private_data_dir, fact_modification_times, isolated_manager_instance=None):
|
|
super(RunAdHocCommand, self).final_run_hook(adhoc_job, status, private_data_dir, fact_modification_times)
|
|
if isolated_manager_instance:
|
|
isolated_manager_instance.cleanup()
|
|
|
|
|
|
@task(queue=get_local_queuename)
|
|
class RunSystemJob(BaseTask):
|
|
|
|
model = SystemJob
|
|
event_model = SystemJobEvent
|
|
event_data_key = 'system_job_id'
|
|
|
|
def build_args(self, system_job, private_data_dir, passwords):
|
|
args = ['awx-manage', system_job.job_type]
|
|
try:
|
|
# System Job extra_vars can be blank, must be JSON if not blank
|
|
if system_job.extra_vars == '':
|
|
json_vars = {}
|
|
else:
|
|
json_vars = json.loads(system_job.extra_vars)
|
|
if system_job.job_type in ('cleanup_jobs', 'cleanup_activitystream'):
|
|
if 'days' in json_vars:
|
|
args.extend(['--days', str(json_vars.get('days', 60))])
|
|
if 'dry_run' in json_vars and json_vars['dry_run']:
|
|
args.extend(['--dry-run'])
|
|
if system_job.job_type == 'cleanup_jobs':
|
|
args.extend(['--jobs', '--project-updates', '--inventory-updates',
|
|
'--management-jobs', '--ad-hoc-commands', '--workflow-jobs',
|
|
'--notifications'])
|
|
except Exception:
|
|
logger.exception("{} Failed to parse system job".format(system_job.log_format))
|
|
return args
|
|
|
|
def write_args_file(self, private_data_dir, args):
|
|
path = os.path.join(private_data_dir, 'args')
|
|
handle = os.open(path, os.O_RDWR | os.O_CREAT, stat.S_IREAD | stat.S_IWRITE)
|
|
f = os.fdopen(handle, 'w')
|
|
f.write(' '.join(args))
|
|
f.close()
|
|
os.chmod(path, stat.S_IRUSR)
|
|
return path
|
|
|
|
def build_env(self, instance, private_data_dir, isolated=False, private_data_files=None):
|
|
env = super(RunSystemJob, self).build_env(instance, private_data_dir,
|
|
isolated=isolated,
|
|
private_data_files=private_data_files)
|
|
self.add_awx_venv(env)
|
|
return env
|
|
|
|
def build_cwd(self, instance, private_data_dir):
|
|
return settings.BASE_DIR
|
|
|
|
def build_playbook_path_relative_to_cwd(self, job, private_data_dir):
|
|
return None
|
|
|
|
def build_inventory(self, instance, private_data_dir):
|
|
return None
|
|
|
|
|
|
def _reconstruct_relationships(copy_mapping):
|
|
for old_obj, new_obj in copy_mapping.items():
|
|
model = type(old_obj)
|
|
for field_name in getattr(model, 'FIELDS_TO_PRESERVE_AT_COPY', []):
|
|
field = model._meta.get_field(field_name)
|
|
if isinstance(field, ForeignKey):
|
|
if getattr(new_obj, field_name, None):
|
|
continue
|
|
related_obj = getattr(old_obj, field_name)
|
|
related_obj = copy_mapping.get(related_obj, related_obj)
|
|
setattr(new_obj, field_name, related_obj)
|
|
elif field.many_to_many:
|
|
for related_obj in getattr(old_obj, field_name).all():
|
|
logger.debug('Deep copy: Adding {} to {}({}).{} relationship'.format(
|
|
related_obj, new_obj, model, field_name
|
|
))
|
|
getattr(new_obj, field_name).add(copy_mapping.get(related_obj, related_obj))
|
|
new_obj.save()
|
|
|
|
|
|
@task(queue=get_local_queuename)
|
|
def deep_copy_model_obj(
|
|
model_module, model_name, obj_pk, new_obj_pk,
|
|
user_pk, uuid, permission_check_func=None
|
|
):
|
|
sub_obj_list = cache.get(uuid)
|
|
if sub_obj_list is None:
|
|
logger.error('Deep copy {} from {} to {} failed unexpectedly.'.format(model_name, obj_pk, new_obj_pk))
|
|
return
|
|
|
|
logger.debug('Deep copy {} from {} to {}.'.format(model_name, obj_pk, new_obj_pk))
|
|
from awx.api.generics import CopyAPIView
|
|
from awx.main.signals import disable_activity_stream
|
|
model = getattr(importlib.import_module(model_module), model_name, None)
|
|
if model is None:
|
|
return
|
|
try:
|
|
obj = model.objects.get(pk=obj_pk)
|
|
new_obj = model.objects.get(pk=new_obj_pk)
|
|
creater = User.objects.get(pk=user_pk)
|
|
except ObjectDoesNotExist:
|
|
logger.warning("Object or user no longer exists.")
|
|
return
|
|
with transaction.atomic(), ignore_inventory_computed_fields(), disable_activity_stream():
|
|
copy_mapping = {}
|
|
for sub_obj_setup in sub_obj_list:
|
|
sub_model = getattr(importlib.import_module(sub_obj_setup[0]),
|
|
sub_obj_setup[1], None)
|
|
if sub_model is None:
|
|
continue
|
|
try:
|
|
sub_obj = sub_model.objects.get(pk=sub_obj_setup[2])
|
|
except ObjectDoesNotExist:
|
|
continue
|
|
copy_mapping.update(CopyAPIView.copy_model_obj(
|
|
obj, new_obj, sub_model, sub_obj, creater
|
|
))
|
|
_reconstruct_relationships(copy_mapping)
|
|
if permission_check_func:
|
|
permission_check_func = getattr(getattr(
|
|
importlib.import_module(permission_check_func[0]), permission_check_func[1]
|
|
), permission_check_func[2])
|
|
permission_check_func(creater, copy_mapping.values())
|
|
if isinstance(new_obj, Inventory):
|
|
update_inventory_computed_fields.delay(new_obj.id)
|