418 lines
18 KiB
Python
418 lines
18 KiB
Python
import pytest
|
|
from unittest import mock
|
|
import json
|
|
from datetime import timedelta
|
|
|
|
from awx.main.scheduler import TaskManager
|
|
from awx.main.scheduler.dependency_graph import DependencyGraph
|
|
from awx.main.utils import encrypt_field
|
|
from awx.main.models import WorkflowJobTemplate, JobTemplate, Job
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_single_job_scheduler_launch(default_instance_group, job_template_factory, mocker):
|
|
instance = default_instance_group.instances.all()[0]
|
|
objects = job_template_factory('jt', organization='org1', project='proj',
|
|
inventory='inv', credential='cred',
|
|
jobs=["job_should_start"])
|
|
j = objects.jobs["job_should_start"]
|
|
j.status = 'pending'
|
|
j.save()
|
|
with mocker.patch("awx.main.scheduler.TaskManager.start_task"):
|
|
TaskManager().schedule()
|
|
TaskManager.start_task.assert_called_once_with(j, default_instance_group, [], instance)
|
|
|
|
|
|
@pytest.mark.django_db
|
|
class TestJobLifeCycle:
|
|
|
|
def run_tm(self, tm, expect_channel=None, expect_schedule=None, expect_commit=None):
|
|
"""Test helper method that takes parameters to assert against
|
|
expect_channel - list of expected websocket emit channel message calls
|
|
expect_schedule - list of expected calls to reschedule itself
|
|
expect_commit - list of expected on_commit calls
|
|
If any of these are None, then the assertion is not made.
|
|
"""
|
|
if expect_schedule and len(expect_schedule) > 1:
|
|
raise RuntimeError('Task manager should reschedule itself one time, at most.')
|
|
with mock.patch('awx.main.models.unified_jobs.UnifiedJob.websocket_emit_status') as mock_channel:
|
|
with mock.patch('awx.main.utils.common._schedule_task_manager') as tm_sch:
|
|
# Job are ultimately submitted in on_commit hook, but this will not
|
|
# actually run, because it waits until outer transaction, which is the test
|
|
# itself in this case
|
|
with mock.patch('django.db.connection.on_commit') as mock_commit:
|
|
tm.schedule()
|
|
if expect_channel is not None:
|
|
assert mock_channel.mock_calls == expect_channel
|
|
if expect_schedule is not None:
|
|
assert tm_sch.mock_calls == expect_schedule
|
|
if expect_commit is not None:
|
|
assert mock_commit.mock_calls == expect_commit
|
|
|
|
def test_task_manager_workflow_rescheduling(self, job_template_factory, inventory, project, default_instance_group):
|
|
jt = JobTemplate.objects.create(
|
|
allow_simultaneous=True,
|
|
inventory=inventory,
|
|
project=project,
|
|
playbook='helloworld.yml'
|
|
)
|
|
wfjt = WorkflowJobTemplate.objects.create(name='foo')
|
|
for i in range(2):
|
|
wfjt.workflow_nodes.create(
|
|
unified_job_template=jt
|
|
)
|
|
wj = wfjt.create_unified_job()
|
|
assert wj.workflow_nodes.count() == 2
|
|
wj.signal_start()
|
|
tm = TaskManager()
|
|
|
|
# Transitions workflow job to running
|
|
# needs to re-schedule so it spawns jobs next round
|
|
self.run_tm(tm, [mock.call('running')], [mock.call()])
|
|
|
|
# Spawns jobs
|
|
# needs re-schedule to submit jobs next round
|
|
self.run_tm(tm, [mock.call('pending'), mock.call('pending')], [mock.call()])
|
|
|
|
assert jt.jobs.count() == 2 # task manager spawned jobs
|
|
|
|
# Submits jobs
|
|
# intermission - jobs will run and reschedule TM when finished
|
|
self.run_tm(tm, [mock.call('waiting'), mock.call('waiting')], [])
|
|
|
|
# I am the job runner
|
|
for job in jt.jobs.all():
|
|
job.status = 'successful'
|
|
job.save()
|
|
|
|
# Finishes workflow
|
|
# no further action is necessary, so rescheduling should not happen
|
|
self.run_tm(tm, [mock.call('successful')], [])
|
|
|
|
def test_task_manager_workflow_workflow_rescheduling(self):
|
|
wfjts = [WorkflowJobTemplate.objects.create(name='foo')]
|
|
for i in range(5):
|
|
wfjt = WorkflowJobTemplate.objects.create(name='foo{}'.format(i))
|
|
wfjts[-1].workflow_nodes.create(
|
|
unified_job_template=wfjt
|
|
)
|
|
wfjts.append(wfjt)
|
|
|
|
wj = wfjts[0].create_unified_job()
|
|
wj.signal_start()
|
|
tm = TaskManager()
|
|
|
|
while wfjts[0].status != 'successful':
|
|
wfjts[1].refresh_from_db()
|
|
if wfjts[1].status == 'successful':
|
|
# final run, no more work to do
|
|
self.run_tm(tm, expect_schedule=[])
|
|
else:
|
|
self.run_tm(tm, expect_schedule=[mock.call()])
|
|
wfjts[0].refresh_from_db()
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_single_jt_multi_job_launch_blocks_last(default_instance_group, job_template_factory, mocker):
|
|
instance = default_instance_group.instances.all()[0]
|
|
objects = job_template_factory('jt', organization='org1', project='proj',
|
|
inventory='inv', credential='cred',
|
|
jobs=["job_should_start", "job_should_not_start"])
|
|
j1 = objects.jobs["job_should_start"]
|
|
j1.status = 'pending'
|
|
j1.save()
|
|
j2 = objects.jobs["job_should_not_start"]
|
|
j2.status = 'pending'
|
|
j2.save()
|
|
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
|
|
TaskManager().schedule()
|
|
TaskManager.start_task.assert_called_once_with(j1, default_instance_group, [], instance)
|
|
j1.status = "successful"
|
|
j1.save()
|
|
with mocker.patch("awx.main.scheduler.TaskManager.start_task"):
|
|
TaskManager().schedule()
|
|
TaskManager.start_task.assert_called_once_with(j2, default_instance_group, [], instance)
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_single_jt_multi_job_launch_allow_simul_allowed(default_instance_group, job_template_factory, mocker):
|
|
instance = default_instance_group.instances.all()[0]
|
|
objects = job_template_factory('jt', organization='org1', project='proj',
|
|
inventory='inv', credential='cred',
|
|
jobs=["job_should_start", "job_should_not_start"])
|
|
jt = objects.job_template
|
|
jt.save()
|
|
|
|
j1 = objects.jobs["job_should_start"]
|
|
j1.allow_simultaneous = True
|
|
j1.status = 'pending'
|
|
j1.save()
|
|
j2 = objects.jobs["job_should_not_start"]
|
|
j2.allow_simultaneous = True
|
|
j2.status = 'pending'
|
|
j2.save()
|
|
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
|
|
TaskManager().schedule()
|
|
TaskManager.start_task.assert_has_calls([mock.call(j1, default_instance_group, [], instance),
|
|
mock.call(j2, default_instance_group, [], instance)])
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_multi_jt_capacity_blocking(default_instance_group, job_template_factory, mocker):
|
|
instance = default_instance_group.instances.all()[0]
|
|
objects1 = job_template_factory('jt1', organization='org1', project='proj1',
|
|
inventory='inv1', credential='cred1',
|
|
jobs=["job_should_start"])
|
|
objects2 = job_template_factory('jt2', organization='org2', project='proj2',
|
|
inventory='inv2', credential='cred2',
|
|
jobs=["job_should_not_start"])
|
|
j1 = objects1.jobs["job_should_start"]
|
|
j1.status = 'pending'
|
|
j1.save()
|
|
j2 = objects2.jobs["job_should_not_start"]
|
|
j2.status = 'pending'
|
|
j2.save()
|
|
tm = TaskManager()
|
|
with mock.patch('awx.main.models.Job.task_impact', new_callable=mock.PropertyMock) as mock_task_impact:
|
|
mock_task_impact.return_value = 500
|
|
with mock.patch.object(TaskManager, "start_task", wraps=tm.start_task) as mock_job:
|
|
tm.schedule()
|
|
mock_job.assert_called_once_with(j1, default_instance_group, [], instance)
|
|
j1.status = "successful"
|
|
j1.save()
|
|
with mock.patch.object(TaskManager, "start_task", wraps=tm.start_task) as mock_job:
|
|
tm.schedule()
|
|
mock_job.assert_called_once_with(j2, default_instance_group, [], instance)
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_single_job_dependencies_project_launch(default_instance_group, job_template_factory, mocker):
|
|
objects = job_template_factory('jt', organization='org1', project='proj',
|
|
inventory='inv', credential='cred',
|
|
jobs=["job_should_start"])
|
|
instance = default_instance_group.instances.all()[0]
|
|
j = objects.jobs["job_should_start"]
|
|
j.status = 'pending'
|
|
j.save()
|
|
p = objects.project
|
|
p.scm_update_on_launch = True
|
|
p.scm_update_cache_timeout = 0
|
|
p.scm_type = "git"
|
|
p.scm_url = "http://github.com/ansible/ansible.git"
|
|
p.save(skip_update=True)
|
|
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
|
|
tm = TaskManager()
|
|
with mock.patch.object(TaskManager, "create_project_update", wraps=tm.create_project_update) as mock_pu:
|
|
tm.schedule()
|
|
mock_pu.assert_called_once_with(j)
|
|
pu = [x for x in p.project_updates.all()]
|
|
assert len(pu) == 1
|
|
TaskManager.start_task.assert_called_once_with(pu[0], default_instance_group, [j], instance)
|
|
pu[0].status = "successful"
|
|
pu[0].save()
|
|
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
|
|
TaskManager().schedule()
|
|
TaskManager.start_task.assert_called_once_with(j, default_instance_group, [], instance)
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_single_job_dependencies_inventory_update_launch(default_instance_group, job_template_factory, mocker, inventory_source_factory):
|
|
objects = job_template_factory('jt', organization='org1', project='proj',
|
|
inventory='inv', credential='cred',
|
|
jobs=["job_should_start"])
|
|
instance = default_instance_group.instances.all()[0]
|
|
j = objects.jobs["job_should_start"]
|
|
j.status = 'pending'
|
|
j.save()
|
|
i = objects.inventory
|
|
ii = inventory_source_factory("ec2")
|
|
ii.source = "ec2"
|
|
ii.update_on_launch = True
|
|
ii.update_cache_timeout = 0
|
|
ii.save()
|
|
i.inventory_sources.add(ii)
|
|
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
|
|
tm = TaskManager()
|
|
with mock.patch.object(TaskManager, "create_inventory_update", wraps=tm.create_inventory_update) as mock_iu:
|
|
tm.schedule()
|
|
mock_iu.assert_called_once_with(j, ii)
|
|
iu = [x for x in ii.inventory_updates.all()]
|
|
assert len(iu) == 1
|
|
TaskManager.start_task.assert_called_once_with(iu[0], default_instance_group, [j], instance)
|
|
iu[0].status = "successful"
|
|
iu[0].save()
|
|
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
|
|
TaskManager().schedule()
|
|
TaskManager.start_task.assert_called_once_with(j, default_instance_group, [], instance)
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_job_dependency_with_already_updated(default_instance_group, job_template_factory, mocker, inventory_source_factory):
|
|
objects = job_template_factory('jt', organization='org1', project='proj',
|
|
inventory='inv', credential='cred',
|
|
jobs=["job_should_start"])
|
|
instance = default_instance_group.instances.all()[0]
|
|
j = objects.jobs["job_should_start"]
|
|
j.status = 'pending'
|
|
j.save()
|
|
i = objects.inventory
|
|
ii = inventory_source_factory("ec2")
|
|
ii.source = "ec2"
|
|
ii.update_on_launch = True
|
|
ii.update_cache_timeout = 0
|
|
ii.save()
|
|
i.inventory_sources.add(ii)
|
|
j.start_args = json.dumps(dict(inventory_sources_already_updated=[ii.id]))
|
|
j.save()
|
|
j.start_args = encrypt_field(j, field_name="start_args")
|
|
j.save()
|
|
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
|
|
tm = TaskManager()
|
|
with mock.patch.object(TaskManager, "create_inventory_update", wraps=tm.create_inventory_update) as mock_iu:
|
|
tm.schedule()
|
|
mock_iu.assert_not_called()
|
|
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
|
|
TaskManager().schedule()
|
|
TaskManager.start_task.assert_called_once_with(j, default_instance_group, [], instance)
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_shared_dependencies_launch(default_instance_group, job_template_factory, mocker, inventory_source_factory):
|
|
instance = default_instance_group.instances.all()[0]
|
|
objects = job_template_factory('jt', organization='org1', project='proj',
|
|
inventory='inv', credential='cred',
|
|
jobs=["first_job", "second_job"])
|
|
j1 = objects.jobs["first_job"]
|
|
j1.status = 'pending'
|
|
j1.save()
|
|
j2 = objects.jobs["second_job"]
|
|
j2.status = 'pending'
|
|
j2.save()
|
|
p = objects.project
|
|
p.scm_update_on_launch = True
|
|
p.scm_update_cache_timeout = 300
|
|
p.scm_type = "git"
|
|
p.scm_url = "http://github.com/ansible/ansible.git"
|
|
p.save()
|
|
|
|
i = objects.inventory
|
|
ii = inventory_source_factory("ec2")
|
|
ii.source = "ec2"
|
|
ii.update_on_launch = True
|
|
ii.update_cache_timeout = 300
|
|
ii.save()
|
|
i.inventory_sources.add(ii)
|
|
|
|
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
|
|
TaskManager().schedule()
|
|
pu = p.project_updates.first()
|
|
iu = ii.inventory_updates.first()
|
|
TaskManager.start_task.assert_has_calls([mock.call(iu, default_instance_group, [j1, j2, pu], instance),
|
|
mock.call(pu, default_instance_group, [j1, j2, iu], instance)])
|
|
pu.status = "successful"
|
|
pu.finished = pu.created + timedelta(seconds=1)
|
|
pu.save()
|
|
iu.status = "successful"
|
|
iu.finished = iu.created + timedelta(seconds=1)
|
|
iu.save()
|
|
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
|
|
TaskManager().schedule()
|
|
TaskManager.start_task.assert_called_once_with(j1, default_instance_group, [], instance)
|
|
j1.status = "successful"
|
|
j1.save()
|
|
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
|
|
TaskManager().schedule()
|
|
TaskManager.start_task.assert_called_once_with(j2, default_instance_group, [], instance)
|
|
pu = [x for x in p.project_updates.all()]
|
|
iu = [x for x in ii.inventory_updates.all()]
|
|
assert len(pu) == 1
|
|
assert len(iu) == 1
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_job_not_blocking_project_update(default_instance_group, job_template_factory):
|
|
objects = job_template_factory('jt', organization='org1', project='proj',
|
|
inventory='inv', credential='cred',
|
|
jobs=["job"])
|
|
job = objects.jobs["job"]
|
|
job.instance_group = default_instance_group
|
|
job.status = "running"
|
|
job.save()
|
|
|
|
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
|
|
task_manager = TaskManager()
|
|
task_manager._schedule()
|
|
|
|
proj = objects.project
|
|
project_update = proj.create_project_update()
|
|
project_update.instance_group = default_instance_group
|
|
project_update.status = "pending"
|
|
project_update.save()
|
|
assert not task_manager.job_blocked_by(project_update)
|
|
|
|
dependency_graph = DependencyGraph()
|
|
dependency_graph.add_job(job)
|
|
assert not dependency_graph.task_blocked_by(project_update)
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_job_not_blocking_inventory_update(default_instance_group, job_template_factory, inventory_source_factory):
|
|
objects = job_template_factory('jt', organization='org1', project='proj',
|
|
inventory='inv', credential='cred',
|
|
jobs=["job"])
|
|
job = objects.jobs["job"]
|
|
job.instance_group = default_instance_group
|
|
job.status = "running"
|
|
job.save()
|
|
|
|
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
|
|
task_manager = TaskManager()
|
|
task_manager._schedule()
|
|
|
|
inv = objects.inventory
|
|
inv_source = inventory_source_factory("ec2")
|
|
inv_source.source = "ec2"
|
|
inv.inventory_sources.add(inv_source)
|
|
inventory_update = inv_source.create_inventory_update()
|
|
inventory_update.instance_group = default_instance_group
|
|
inventory_update.status = "pending"
|
|
inventory_update.save()
|
|
|
|
assert not task_manager.job_blocked_by(inventory_update)
|
|
|
|
dependency_graph = DependencyGraph()
|
|
dependency_graph.add_job(job)
|
|
assert not dependency_graph.task_blocked_by(inventory_update)
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_generate_dependencies_only_once(job_template_factory):
|
|
objects = job_template_factory('jt', organization='org1')
|
|
|
|
job = objects.job_template.create_job()
|
|
job.status = "pending"
|
|
job.name = "job_gen_dep"
|
|
job.save()
|
|
|
|
|
|
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
|
|
# job starts with dependencies_processed as False
|
|
assert not job.dependencies_processed
|
|
# run one cycle of ._schedule() to generate dependencies
|
|
TaskManager()._schedule()
|
|
|
|
# make sure dependencies_processed is now True
|
|
job = Job.objects.filter(name="job_gen_dep")[0]
|
|
assert job.dependencies_processed
|
|
|
|
# Run ._schedule() again, but make sure .generate_dependencies() is not
|
|
# called with job in the argument list
|
|
tm = TaskManager()
|
|
tm.generate_dependencies = mock.MagicMock()
|
|
tm._schedule()
|
|
|
|
# .call_args is tuple, (positional_args, kwargs), [0][0] then is
|
|
# the first positional arg, i.e. the first argument of
|
|
# .generate_dependencies()
|
|
assert tm.generate_dependencies.call_args[0][0] == []
|