x86_64-centos-awx/awx/main/tests/functional/test_rbac_job_templates.py
2021-03-02 20:53:43 +08:00

388 lines
14 KiB
Python

from unittest import mock
import pytest
from awx.api.versioning import reverse
from awx.main.access import (
BaseAccess,
JobTemplateAccess,
ScheduleAccess
)
from awx.main.models.jobs import JobTemplate
from awx.main.models import Project, Organization, Inventory, Schedule, User
@mock.patch.object(BaseAccess, 'check_license', return_value=None)
@pytest.mark.django_db
def test_job_template_access_superuser(check_license, user, deploy_jobtemplate):
# GIVEN a superuser
u = user('admin', True)
# WHEN access to a job template is checked
access = JobTemplateAccess(u)
# THEN all access checks should pass
assert access.can_read(deploy_jobtemplate)
assert access.can_add({})
@pytest.mark.django_db
class TestImplicitAccess:
def test_org_execute(self, jt_linked, rando):
assert rando not in jt_linked.execute_role
jt_linked.organization.execute_role.members.add(rando)
assert rando in jt_linked.execute_role
def test_org_admin(self, jt_linked, rando):
assert rando not in jt_linked.execute_role
jt_linked.organization.job_template_admin_role.members.add(rando)
assert rando in jt_linked.execute_role
def test_org_auditor(self, jt_linked, rando):
assert rando not in jt_linked.read_role
jt_linked.organization.auditor_role.members.add(rando)
assert rando in jt_linked.read_role
def test_deprecated_inventory_read(self, jt_linked, rando):
assert rando not in jt_linked.read_role
jt_linked.inventory.organization.execute_role.members.add(rando)
assert rando in jt_linked.read_role
@pytest.mark.django_db
def test_job_template_access_read_level(jt_linked, rando):
ssh_cred = jt_linked.machine_credential
vault_cred = jt_linked.vault_credentials[0]
access = JobTemplateAccess(rando)
jt_linked.project.read_role.members.add(rando)
jt_linked.inventory.read_role.members.add(rando)
ssh_cred.read_role.members.add(rando)
proj_pk = jt_linked.project.pk
assert not access.can_add(dict(inventory=jt_linked.inventory.pk, project=proj_pk))
assert not access.can_add(dict(credential=ssh_cred.pk, project=proj_pk))
assert not access.can_add(dict(vault_credential=vault_cred.pk, project=proj_pk))
for cred in jt_linked.credentials.all():
assert not access.can_unattach(jt_linked, cred, 'credentials', {})
@pytest.mark.django_db
def test_project_use_access(project, rando):
project.use_role.members.add(rando)
access = JobTemplateAccess(rando)
assert access.can_add(None)
assert access.can_add({'project': project.id, 'ask_inventory_on_launch': True})
project2 = Project.objects.create(
name='second-project', scm_type=project.scm_type, playbook_files=project.playbook_files,
organization=project.organization,
)
project2.use_role.members.add(rando)
jt = JobTemplate.objects.create(project=project, ask_inventory_on_launch=True)
jt.admin_role.members.add(rando)
assert access.can_change(jt, {'project': project2.pk})
@pytest.mark.django_db
def test_job_template_access_use_level(jt_linked, rando):
access = JobTemplateAccess(rando)
jt_linked.project.use_role.members.add(rando)
jt_linked.inventory.use_role.members.add(rando)
jt_linked.admin_role.members.add(rando)
proj_pk = jt_linked.project.pk
assert access.can_change(jt_linked, {'job_type': 'check', 'project': proj_pk})
assert access.can_change(jt_linked, {'job_type': 'check', 'inventory': None})
for cred in jt_linked.credentials.all():
assert access.can_unattach(jt_linked, cred, 'credentials', {})
assert access.can_add(dict(inventory=jt_linked.inventory.pk, project=proj_pk))
assert access.can_add(dict(project=proj_pk))
@pytest.mark.django_db
@pytest.mark.parametrize("role_names", [("admin_role",), ("job_template_admin_role", "inventory_admin_role", "project_admin_role")])
def test_job_template_access_admin(role_names, jt_linked, rando):
ssh_cred = jt_linked.machine_credential
access = JobTemplateAccess(rando)
assert not access.can_read(jt_linked)
assert not access.can_delete(jt_linked)
# Appoint this user to the org role
organization = jt_linked.organization
for role_name in role_names:
getattr(organization, role_name).members.add(rando)
# Assign organization permission in the same way the create view does
ssh_cred.admin_role.parents.add(organization.admin_role)
proj_pk = jt_linked.project.pk
assert access.can_add(dict(inventory=jt_linked.inventory.pk, project=proj_pk))
for cred in jt_linked.credentials.all():
assert access.can_unattach(jt_linked, cred, 'credentials', {})
assert access.can_read(jt_linked)
assert access.can_delete(jt_linked)
@pytest.mark.django_db
def test_job_template_credentials_prompts_access(
rando, post, inventory, project, machine_credential, vault_credential):
jt = JobTemplate.objects.create(
name = 'test-jt',
project = project,
playbook = 'helloworld.yml',
inventory = inventory,
ask_credential_on_launch = True
)
jt.credentials.add(machine_credential)
jt.execute_role.members.add(rando)
post(
reverse('api:job_template_launch', kwargs={'pk': jt.id}),
{'credentials': [machine_credential.pk, vault_credential.pk]}, rando,
expect=403
)
@pytest.mark.django_db
class TestJobTemplateCredentials:
def test_job_template_cannot_add_credentials(self, job_template, credential, rando):
job_template.admin_role.members.add(rando)
credential.read_role.members.add(rando)
# without permission to credential, user can not attach it
assert not JobTemplateAccess(rando).can_attach(
job_template, credential, 'credentials', {})
def test_job_template_can_add_credentials(self, job_template, credential, rando):
job_template.admin_role.members.add(rando)
credential.use_role.members.add(rando)
# user has permission to apply credential
assert JobTemplateAccess(rando).can_attach(
job_template, credential, 'credentials', {})
@pytest.mark.django_db
class TestOrphanJobTemplate:
def test_orphan_JT_readable_by_system_auditor(self, job_template, system_auditor):
assert system_auditor.is_system_auditor
assert job_template.project is None
access = JobTemplateAccess(system_auditor)
assert access.can_read(job_template)
def test_system_admin_orphan_capabilities(self, job_template, admin_user):
job_template.capabilities_cache = {'edit': False}
access = JobTemplateAccess(admin_user)
capabilities = access.get_user_capabilities(job_template, method_list=['edit'])
assert capabilities['edit']
@pytest.mark.django_db
@pytest.mark.job_permissions
def test_job_template_creator_access(project, organization, rando, post):
project.use_role.members.add(rando)
response = post(url=reverse('api:job_template_list'), data=dict(
name='newly-created-jt',
ask_inventory_on_launch=True,
project=project.pk,
playbook='helloworld.yml'
), user=rando, expect=201)
jt_pk = response.data['id']
jt_obj = JobTemplate.objects.get(pk=jt_pk)
# Creating a JT should place the creator in the admin role
assert rando in jt_obj.admin_role.members.all()
@pytest.mark.django_db
@pytest.mark.job_permissions
@pytest.mark.parametrize('lacking', ['project', 'inventory'])
def test_job_template_insufficient_creator_permissions(lacking, project, inventory, organization, rando, post):
if lacking != 'project':
project.use_role.members.add(rando)
else:
project.read_role.members.add(rando)
if lacking != 'inventory':
inventory.use_role.members.add(rando)
else:
inventory.read_role.members.add(rando)
post(url=reverse('api:job_template_list'), data=dict(
name='newly-created-jt',
inventory=inventory.id,
project=project.pk,
playbook='helloworld.yml'
), user=rando, expect=403)
@pytest.mark.django_db
def test_associate_label(label, user, job_template):
access = JobTemplateAccess(user('joe', False))
job_template.admin_role.members.add(user('joe', False))
label.organization.read_role.members.add(user('joe', False))
assert access.can_attach(job_template, label, 'labels', None)
@pytest.mark.django_db
class TestJobTemplateSchedules:
rrule = 'DTSTART:20151117T050000Z RRULE:FREQ=DAILY;INTERVAL=1;COUNT=1'
rrule2 = 'DTSTART:20151117T050000Z RRULE:FREQ=WEEKLY;INTERVAL=1;COUNT=1'
@pytest.fixture
def jt2(self):
return JobTemplate.objects.create(name="other-jt")
def test_move_schedule_to_JT_no_access(self, job_template, rando, jt2):
schedule = Schedule.objects.create(unified_job_template=job_template, rrule=self.rrule)
job_template.admin_role.members.add(rando)
access = ScheduleAccess(rando)
assert not access.can_change(schedule, data=dict(unified_job_template=jt2.pk))
def test_move_schedule_from_JT_no_access(self, job_template, rando, jt2):
schedule = Schedule.objects.create(unified_job_template=job_template, rrule=self.rrule)
jt2.admin_role.members.add(rando)
access = ScheduleAccess(rando)
assert not access.can_change(schedule, data=dict(unified_job_template=jt2.pk))
def test_can_create_schedule_with_execute(self, job_template, rando):
job_template.execute_role.members.add(rando)
access = ScheduleAccess(rando)
assert access.can_add({'unified_job_template': job_template})
def test_can_modify_ones_own_schedule(self, job_template, rando):
job_template.execute_role.members.add(rando)
schedule = Schedule.objects.create(unified_job_template=job_template, rrule=self.rrule, created_by=rando)
access = ScheduleAccess(rando)
assert access.can_change(schedule, {'rrule': self.rrule2})
def test_prompts_access_checked(self, job_template, inventory, credential, rando):
job_template.execute_role.members.add(rando)
access = ScheduleAccess(rando)
data = dict(
unified_job_template=job_template,
rrule=self.rrule,
created_by=rando,
inventory=inventory,
credentials=[credential]
)
with mock.patch('awx.main.access.JobLaunchConfigAccess.can_add') as mock_add:
mock_add.return_value = True
assert access.can_add(data)
mock_add.assert_called_once_with(data)
data.pop('credentials')
schedule = Schedule.objects.create(**data)
with mock.patch('awx.main.access.JobLaunchConfigAccess.can_change') as mock_change:
mock_change.return_value = True
assert access.can_change(schedule, {'inventory': 42})
mock_change.assert_called_once_with(schedule, {'inventory': 42})
@pytest.mark.django_db
class TestProjectOrganization:
"""Tests stories related to management of JT organization via its project
which have some bearing on RBAC integrity
"""
def test_new_project_org_change(self, project, patch, admin_user):
org2 = Organization.objects.create(name='bar')
patch(
url=project.get_absolute_url(),
data={'organization': org2.id},
user=admin_user,
expect=200
)
assert Project.objects.get(pk=project.id).organization_id == org2.id
def test_jt_org_cannot_change(self, project, post, patch, admin_user):
post(
url=reverse('api:job_template_list'),
data={
'name': 'foo_template',
'project': project.id,
'playbook': 'helloworld.yml',
'ask_inventory_on_launch': True
},
user=admin_user,
expect=201
)
org2 = Organization.objects.create(name='bar')
r = patch(
url=project.get_absolute_url(),
data={'organization': org2.id},
user=admin_user,
expect=400
)
assert 'Organization cannot be changed' in str(r.data)
def test_orphan_JT_adoption(self, project, patch, admin_user, org_admin):
jt = JobTemplate.objects.create(
name='bar',
ask_inventory_on_launch=True,
playbook='helloworld.yml'
)
assert org_admin not in jt.admin_role
patch(
url=jt.get_absolute_url(),
data={'project': project.id},
user=admin_user,
expect=200
)
assert org_admin in jt.admin_role
def test_inventory_read_transfer_direct(self, patch):
orgs = []
invs = []
admins = []
for i in range(2):
org = Organization.objects.create(name='org{}'.format(i))
org_admin = User.objects.create(username='user{}'.format(i))
inv = Inventory.objects.create(
organization=org,
name='inv{}'.format(i)
)
org.auditor_role.members.add(org_admin)
orgs.append(org)
admins.append(org_admin)
invs.append(inv)
jt = JobTemplate.objects.create(name='foo', inventory=invs[0])
assert admins[0] in jt.read_role
assert admins[1] not in jt.read_role
jt.inventory = invs[1]
jt.save(update_fields=['inventory'])
assert admins[0] not in jt.read_role
assert admins[1] in jt.read_role
def test_inventory_read_transfer_indirect(self, patch):
orgs = []
admins = []
for i in range(2):
org = Organization.objects.create(name='org{}'.format(i))
org_admin = User.objects.create(username='user{}'.format(i))
org.auditor_role.members.add(org_admin)
orgs.append(org)
admins.append(org_admin)
inv = Inventory.objects.create(
organization=orgs[0],
name='inv{}'.format(i)
)
jt = JobTemplate.objects.create(name='foo', inventory=inv)
assert admins[0] in jt.read_role
assert admins[1] not in jt.read_role
inv.organization = orgs[1]
inv.save(update_fields=['organization'])
assert admins[0] not in jt.read_role
assert admins[1] in jt.read_role