262 lines
12 KiB
Python
262 lines
12 KiB
Python
import pytest
|
|
|
|
from awx.main.access import (
|
|
OAuth2ApplicationAccess,
|
|
OAuth2TokenAccess,
|
|
ActivityStreamAccess,
|
|
)
|
|
from awx.main.models.oauth import (
|
|
OAuth2Application as Application,
|
|
OAuth2AccessToken as AccessToken,
|
|
)
|
|
from awx.main.models import ActivityStream
|
|
from awx.api.versioning import reverse
|
|
|
|
|
|
@pytest.mark.django_db
|
|
class TestOAuth2Application:
|
|
|
|
@pytest.mark.parametrize("user_for_access, can_access_list", [
|
|
(0, [True, True]),
|
|
(1, [True, True]),
|
|
(2, [True, True]),
|
|
(3, [False, False]),
|
|
])
|
|
def test_can_read(
|
|
self, admin, org_admin, org_member, alice, user_for_access, can_access_list, organization
|
|
):
|
|
user_list = [admin, org_admin, org_member, alice]
|
|
access = OAuth2ApplicationAccess(user_list[user_for_access])
|
|
app_creation_user_list = [admin, org_admin]
|
|
for user, can_access in zip(app_creation_user_list, can_access_list):
|
|
app = Application.objects.create(
|
|
name='test app for {}'.format(user.username), user=user,
|
|
client_type='confidential', authorization_grant_type='password', organization=organization
|
|
)
|
|
assert access.can_read(app) is can_access
|
|
|
|
def test_admin_only_can_read(self, user, organization):
|
|
user = user('org-admin', False)
|
|
organization.admin_role.members.add(user)
|
|
access = OAuth2ApplicationAccess(user)
|
|
app = Application.objects.create(
|
|
name='test app for {}'.format(user.username), user=user,
|
|
client_type='confidential', authorization_grant_type='password', organization=organization
|
|
)
|
|
assert access.can_read(app) is True
|
|
|
|
def test_app_activity_stream(self, org_admin, alice, organization):
|
|
app = Application.objects.create(
|
|
name='test app for {}'.format(org_admin.username), user=org_admin,
|
|
client_type='confidential', authorization_grant_type='password', organization=organization
|
|
)
|
|
access = OAuth2ApplicationAccess(org_admin)
|
|
assert access.can_read(app) is True
|
|
access = ActivityStreamAccess(org_admin)
|
|
activity_stream = ActivityStream.objects.filter(o_auth2_application=app).latest('pk')
|
|
assert access.can_read(activity_stream) is True
|
|
access = ActivityStreamAccess(alice)
|
|
assert access.can_read(app) is False
|
|
assert access.can_read(activity_stream) is False
|
|
|
|
|
|
def test_token_activity_stream(self, org_admin, alice, organization, post):
|
|
app = Application.objects.create(
|
|
name='test app for {}'.format(org_admin.username), user=org_admin,
|
|
client_type='confidential', authorization_grant_type='password', organization=organization
|
|
)
|
|
response = post(
|
|
reverse('api:o_auth2_application_token_list', kwargs={'pk': app.pk}),
|
|
{'scope': 'read'}, org_admin, expect=201
|
|
)
|
|
token = AccessToken.objects.get(token=response.data['token'])
|
|
access = OAuth2ApplicationAccess(org_admin)
|
|
assert access.can_read(app) is True
|
|
access = ActivityStreamAccess(org_admin)
|
|
activity_stream = ActivityStream.objects.filter(o_auth2_access_token=token).latest('pk')
|
|
assert access.can_read(activity_stream) is True
|
|
access = ActivityStreamAccess(alice)
|
|
assert access.can_read(token) is False
|
|
assert access.can_read(activity_stream) is False
|
|
|
|
|
|
|
|
def test_can_edit_delete_app_org_admin(
|
|
self, admin, org_admin, org_member, alice, organization
|
|
):
|
|
user_list = [admin, org_admin, org_member, alice]
|
|
can_access_list = [True, True, False, False]
|
|
for user, can_access in zip(user_list, can_access_list):
|
|
app = Application.objects.create(
|
|
name='test app for {}'.format(user.username), user=org_admin,
|
|
client_type='confidential', authorization_grant_type='password', organization=organization
|
|
)
|
|
access = OAuth2ApplicationAccess(user)
|
|
assert access.can_change(app, {}) is can_access
|
|
assert access.can_delete(app) is can_access
|
|
|
|
|
|
def test_can_edit_delete_app_admin(
|
|
self, admin, org_admin, org_member, alice, organization
|
|
):
|
|
user_list = [admin, org_admin, org_member, alice]
|
|
can_access_list = [True, True, False, False]
|
|
for user, can_access in zip(user_list, can_access_list):
|
|
app = Application.objects.create(
|
|
name='test app for {}'.format(user.username), user=admin,
|
|
client_type='confidential', authorization_grant_type='password', organization=organization
|
|
)
|
|
access = OAuth2ApplicationAccess(user)
|
|
assert access.can_change(app, {}) is can_access
|
|
assert access.can_delete(app) is can_access
|
|
|
|
|
|
def test_superuser_can_always_create(self, admin, org_admin, org_member, alice, organization):
|
|
access = OAuth2ApplicationAccess(admin)
|
|
for user in [admin, org_admin, org_member, alice]:
|
|
assert access.can_add({
|
|
'name': 'test app', 'user': user.pk, 'client_type': 'confidential',
|
|
'authorization_grant_type': 'password', 'organization': organization.id
|
|
})
|
|
|
|
def test_normal_user_cannot_create(self, admin, org_admin, org_member, alice, organization):
|
|
for access_user in [org_member, alice]:
|
|
access = OAuth2ApplicationAccess(access_user)
|
|
for user in [admin, org_admin, org_member, alice]:
|
|
assert not access.can_add({
|
|
'name': 'test app', 'user': user.pk, 'client_type': 'confidential',
|
|
'authorization_grant_type': 'password', 'organization': organization.id
|
|
})
|
|
|
|
|
|
@pytest.mark.django_db
|
|
class TestOAuth2Token:
|
|
|
|
def test_can_read_change_delete_app_token(
|
|
self, post, admin, org_admin, org_member, alice, organization
|
|
):
|
|
user_list = [admin, org_admin, org_member, alice]
|
|
can_access_list = [True, True, False, False]
|
|
app = Application.objects.create(
|
|
name='test app for {}'.format(admin.username), user=admin,
|
|
client_type='confidential', authorization_grant_type='password',
|
|
organization=organization
|
|
)
|
|
response = post(
|
|
reverse('api:o_auth2_application_token_list', kwargs={'pk': app.pk}),
|
|
{'scope': 'read'}, admin, expect=201
|
|
)
|
|
for user, can_access in zip(user_list, can_access_list):
|
|
token = AccessToken.objects.get(token=response.data['token'])
|
|
access = OAuth2TokenAccess(user)
|
|
assert access.can_read(token) is can_access
|
|
assert access.can_change(token, {}) is can_access
|
|
assert access.can_delete(token) is can_access
|
|
|
|
|
|
def test_auditor_can_read(
|
|
self, post, admin, org_admin, org_member, alice, system_auditor, organization
|
|
):
|
|
user_list = [admin, org_admin, org_member]
|
|
can_access_list = [True, True, True]
|
|
cannot_access_list = [False, False, False]
|
|
app = Application.objects.create(
|
|
name='test app for {}'.format(admin.username), user=admin,
|
|
client_type='confidential', authorization_grant_type='password',
|
|
organization=organization
|
|
)
|
|
for user, can_access, cannot_access in zip(user_list, can_access_list, cannot_access_list):
|
|
response = post(
|
|
reverse('api:o_auth2_application_token_list', kwargs={'pk': app.pk}),
|
|
{'scope': 'read'}, user, expect=201
|
|
)
|
|
token = AccessToken.objects.get(token=response.data['token'])
|
|
access = OAuth2TokenAccess(system_auditor)
|
|
assert access.can_read(token) is can_access
|
|
assert access.can_change(token, {}) is cannot_access
|
|
assert access.can_delete(token) is cannot_access
|
|
|
|
def test_user_auditor_can_change(
|
|
self, post, org_member, org_admin, system_auditor, organization
|
|
):
|
|
app = Application.objects.create(
|
|
name='test app for {}'.format(org_admin.username), user=org_admin,
|
|
client_type='confidential', authorization_grant_type='password',
|
|
organization=organization
|
|
)
|
|
response = post(
|
|
reverse('api:o_auth2_application_token_list', kwargs={'pk': app.pk}),
|
|
{'scope': 'read'}, org_member, expect=201
|
|
)
|
|
token = AccessToken.objects.get(token=response.data['token'])
|
|
access = OAuth2TokenAccess(system_auditor)
|
|
assert access.can_read(token) is True
|
|
assert access.can_change(token, {}) is False
|
|
assert access.can_delete(token) is False
|
|
dual_user = system_auditor
|
|
organization.admin_role.members.add(dual_user)
|
|
access = OAuth2TokenAccess(dual_user)
|
|
assert access.can_read(token) is True
|
|
assert access.can_change(token, {}) is True
|
|
assert access.can_delete(token) is True
|
|
|
|
|
|
|
|
def test_can_read_change_delete_personal_token_org_member(
|
|
self, post, admin, org_admin, org_member, alice
|
|
):
|
|
# Tests who can read a token created by an org-member
|
|
user_list = [admin, org_admin, org_member, alice]
|
|
can_access_list = [True, False, True, False]
|
|
response = post(
|
|
reverse('api:user_personal_token_list', kwargs={'pk': org_member.pk}),
|
|
{'scope': 'read'}, org_member, expect=201
|
|
)
|
|
token = AccessToken.objects.get(token=response.data['token'])
|
|
for user, can_access in zip(user_list, can_access_list):
|
|
access = OAuth2TokenAccess(user)
|
|
assert access.can_read(token) is can_access
|
|
assert access.can_change(token, {}) is can_access
|
|
assert access.can_delete(token) is can_access
|
|
|
|
|
|
def test_can_read_personal_token_creator(
|
|
self, post, admin, org_admin, org_member, alice
|
|
):
|
|
# Tests the token's creator can read their tokens
|
|
user_list = [admin, org_admin, org_member, alice]
|
|
can_access_list = [True, True, True, True]
|
|
|
|
for user, can_access in zip(user_list, can_access_list):
|
|
response = post(
|
|
reverse('api:user_personal_token_list', kwargs={'pk': user.pk}),
|
|
{'scope': 'read', 'application':None}, user, expect=201
|
|
)
|
|
token = AccessToken.objects.get(token=response.data['token'])
|
|
access = OAuth2TokenAccess(user)
|
|
assert access.can_read(token) is can_access
|
|
assert access.can_change(token, {}) is can_access
|
|
assert access.can_delete(token) is can_access
|
|
|
|
|
|
@pytest.mark.parametrize("user_for_access, can_access_list", [
|
|
(0, [True, True]),
|
|
(1, [True, True]),
|
|
(2, [True, True]),
|
|
(3, [False, False]),
|
|
])
|
|
def test_can_create(
|
|
self, post, admin, org_admin, org_member, alice, user_for_access, can_access_list, organization
|
|
):
|
|
user_list = [admin, org_admin, org_member, alice]
|
|
for user, can_access in zip(user_list, can_access_list):
|
|
app = Application.objects.create(
|
|
name='test app for {}'.format(user.username), user=user,
|
|
client_type='confidential', authorization_grant_type='password', organization=organization
|
|
)
|
|
post(
|
|
reverse('api:o_auth2_application_token_list', kwargs={'pk': app.pk}),
|
|
{'scope': 'read'}, user_list[user_for_access], expect=201 if can_access else 403
|
|
)
|
|
|