Skip to content

Commit

Permalink
Update test_auth.py
Browse files Browse the repository at this point in the history
  • Loading branch information
daisieh committed Nov 29, 2024
1 parent 057f02d commit 30d4766
Showing 1 changed file with 31 additions and 31 deletions.
62 changes: 31 additions & 31 deletions test_auth.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import requests
import os
import pytest
import src.authx.auth as authx.auth
import src.authx.auth
import tempfile
from pathlib import Path
import warnings
Expand Down Expand Up @@ -31,13 +31,13 @@ def __init__(self, token=None, site_admin=False):
warnings.warn(UserWarning("KEYCLOAK_URL is not set"))
token = "testtesttest"
elif site_admin:
token = authx.auth.get_access_token(
token = src.authx.auth.get_access_token(
keycloak_url=KEYCLOAK_PUBLIC_URL,
username=SITE_ADMIN_USER,
password=SITE_ADMIN_PASSWORD
)
else:
token = authx.auth.get_access_token(
token = src.authx.auth.get_access_token(
keycloak_url=KEYCLOAK_PUBLIC_URL,
username=NOT_ADMIN_USER,
password=NOT_ADMIN_PASSWORD
Expand All @@ -56,13 +56,13 @@ def test_add_opa_provider():
return

if OPA_URL is not None:
token = authx.auth.get_access_token(
token = src.authx.auth.get_access_token(
keycloak_url=KEYCLOAK_PUBLIC_URL,
username=SITE_ADMIN_USER,
password=SITE_ADMIN_PASSWORD
)
test_key="testtest"
response = authx.auth.add_provider_to_opa(token, f"{KEYCLOAK_PUBLIC_URL}/auth/realms/candig", test_key=test_key)
response = src.authx.auth.add_provider_to_opa(token, f"{KEYCLOAK_PUBLIC_URL}/auth/realms/candig", test_key=test_key)
print(response)
assert len(response) > 0
found = False
Expand All @@ -73,7 +73,7 @@ def test_add_opa_provider():

# try adding the same thing again: the count should stay the same
count = len(response)
response = authx.auth.add_provider_to_opa(token, f"{KEYCLOAK_PUBLIC_URL}/auth/realms/candig", test_key=test_key)
response = src.authx.auth.add_provider_to_opa(token, f"{KEYCLOAK_PUBLIC_URL}/auth/realms/candig", test_key=test_key)
assert len(response) == count
else:
warnings.warn(UserWarning("OPA_URL is not set"))
Expand All @@ -85,8 +85,8 @@ def test_site_admin():
"""
if OPA_URL is not None:
print(f"{OPA_URL} {OPA_SECRET}")
assert authx.auth.is_site_admin(FakeRequest(site_admin=True), opa_url=OPA_URL, admin_secret=OPA_SECRET)
assert not authx.auth.is_site_admin(FakeRequest(), opa_url=OPA_URL, admin_secret=OPA_SECRET)
assert src.authx.auth.is_site_admin(FakeRequest(site_admin=True), opa_url=OPA_URL, admin_secret=OPA_SECRET)
assert not src.authx.auth.is_site_admin(FakeRequest(), opa_url=OPA_URL, admin_secret=OPA_SECRET)

else:
warnings.warn(UserWarning("OPA_URL is not set"))
Expand All @@ -97,7 +97,7 @@ def test_user_email():
If OPA is present, check to see that the user's email is returned.
"""
if OPA_URL is not None:
assert authx.auth.get_user_email(FakeRequest(site_admin=True), opa_url=OPA_URL, admin_secret=OPA_SECRET) == f"{SITE_ADMIN_USER}@test.ca"
assert src.authx.auth.get_user_email(FakeRequest(site_admin=True), opa_url=OPA_URL, admin_secret=OPA_SECRET) == f"{SITE_ADMIN_USER}@test.ca"
else:
warnings.warn(UserWarning("OPA_URL is not set"))

Expand All @@ -111,16 +111,16 @@ def test_remove_opa_provider():
return

if OPA_URL is not None:
token = authx.auth.get_access_token(
token = src.authx.auth.get_access_token(
keycloak_url=KEYCLOAK_PUBLIC_URL,
username=SITE_ADMIN_USER,
password=SITE_ADMIN_PASSWORD
)
test_key="testtest"

response = authx.auth.add_provider_to_opa(token, f"{KEYCLOAK_PUBLIC_URL}/auth/realms/candig", test_key=test_key)
response = src.authx.auth.add_provider_to_opa(token, f"{KEYCLOAK_PUBLIC_URL}/auth/realms/candig", test_key=test_key)
count = len(response)
response = authx.auth.remove_provider_from_opa(KEYCLOAK_PUBLIC_URL, test_key=test_key)
response = src.authx.auth.remove_provider_from_opa(KEYCLOAK_PUBLIC_URL, test_key=test_key)
assert len(response) < count
found = False
for p in response:
Expand All @@ -138,18 +138,18 @@ def test_get_opa_datasets():
if OPA_URL is not None:
# try to get user1 datasets without OPA_SECRET:
try:
user_datasets = authx.auth.get_opa_datasets(FakeRequest())
user_datasets = src.authx.auth.get_opa_datasets(FakeRequest())
except requests.HTTPError as e:
# get_opa_datasets should raise an error
assert True

# user1 has controlled4 in its datasets
user_datasets = authx.auth.get_opa_datasets(FakeRequest(), admin_secret=OPA_SECRET)
user_datasets = src.authx.auth.get_opa_datasets(FakeRequest(), admin_secret=OPA_SECRET)
print(user_datasets)
assert "SYNTHETIC-1" in user_datasets

# user2 has controlled5 in its datasets
user_datasets = authx.auth.get_opa_datasets(FakeRequest(site_admin=True), admin_secret=OPA_SECRET)
user_datasets = src.authx.auth.get_opa_datasets(FakeRequest(site_admin=True), admin_secret=OPA_SECRET)
print(user_datasets)
assert "SYNTHETIC-2" in user_datasets
else:
Expand All @@ -166,17 +166,17 @@ def test_put_aws_credential():
return
endpoint = "http://test.endpoint"
# store credential using not-site-admin token
result, status_code = authx.auth.store_aws_credential(token=authx.auth.get_auth_token(FakeRequest()), endpoint=endpoint, bucket="test_bucket", access="test", secret="secret", vault_url=VAULT_URL)
result, status_code = src.authx.auth.store_aws_credential(token=src.authx.auth.get_auth_token(FakeRequest()), endpoint=endpoint, bucket="test_bucket", access="test", secret="secret", vault_url=VAULT_URL)
print(result, status_code)
assert status_code == 200

# try getting it with a non-site_admin token
result, status_code = authx.auth.get_aws_credential(token=authx.auth.get_auth_token(FakeRequest()), vault_url=VAULT_URL, endpoint=endpoint, bucket="test_bucket")
result, status_code = src.authx.auth.get_aws_credential(token=src.authx.auth.get_auth_token(FakeRequest()), vault_url=VAULT_URL, endpoint=endpoint, bucket="test_bucket")
print(result)
assert "errors" in result

# try getting it with a site_admin token
result, status_code = authx.auth.get_aws_credential(token=authx.auth.get_auth_token(FakeRequest(site_admin=True)), vault_url=VAULT_URL, endpoint=endpoint, bucket="test_bucket")
result, status_code = src.authx.auth.get_aws_credential(token=src.authx.auth.get_auth_token(FakeRequest(site_admin=True)), vault_url=VAULT_URL, endpoint=endpoint, bucket="test_bucket")
assert result['secret'] == 'secret'
assert result['url'] == 'test.endpoint'
else:
Expand All @@ -197,21 +197,21 @@ def test_get_s3_url():
if os.getenv("SERVICE_NAME") != "candig-ingest":
warnings.warn(UserWarning("aws credential tests can only be run within the candig-ingest container"))
return
result, status_code = authx.auth.store_aws_credential(token=authx.auth.get_auth_token(FakeRequest()),endpoint=MINIO_URL, bucket="test", access=MINIO_ACCESS_KEY, secret=MINIO_SECRET_KEY, vault_url=VAULT_URL)
result, status_code = src.authx.auth.store_aws_credential(token=src.authx.auth.get_auth_token(FakeRequest()),endpoint=MINIO_URL, bucket="test", access=MINIO_ACCESS_KEY, secret=MINIO_SECRET_KEY, vault_url=VAULT_URL)
assert result['url'] in MINIO_URL
minio = authx.auth.get_minio_client(token=authx.auth.get_auth_token(FakeRequest()), s3_endpoint=MINIO_URL, bucket="test")
minio = src.authx.auth.get_minio_client(token=src.authx.auth.get_auth_token(FakeRequest()), s3_endpoint=MINIO_URL, bucket="test")
assert minio['endpoint'] == MINIO_URL
else:
warnings.warn(UserWarning("VAULT_URL is not set"))
minio = authx.auth.get_minio_client(token=authx.auth.get_auth_token(FakeRequest()), s3_endpoint=MINIO_URL, access_key=MINIO_ACCESS_KEY, secret_key=MINIO_SECRET_KEY, bucket="test")
minio = src.authx.auth.get_minio_client(token=src.authx.auth.get_auth_token(FakeRequest()), s3_endpoint=MINIO_URL, access_key=MINIO_ACCESS_KEY, secret_key=MINIO_SECRET_KEY, bucket="test")
else:
warnings.warn(UserWarning("MINIO_URL is not set"))
return
filename = Path(fp.name).name
minio['client'].put_object(minio['bucket'], filename, fp, Path(fp.name).stat().st_size)
fp.close()

url_obj, status_code = authx.auth.get_s3_url(object_id=filename, s3_endpoint=minio['endpoint'], bucket=minio['bucket'], access_key=minio['access'], secret_key=minio['secret'])
url_obj, status_code = src.authx.auth.get_s3_url(object_id=filename, s3_endpoint=minio['endpoint'], bucket=minio['bucket'], access_key=minio['access'], secret_key=minio['secret'])
print(url_obj["url"])
assert status_code == 200

Expand All @@ -222,7 +222,7 @@ def test_get_s3_url():


def test_get_public_s3_url():
url_obj, status_code = authx.auth.get_s3_url(public=True, bucket="1000genomes", s3_endpoint="http://s3.us-east-1.amazonaws.com", object_id="README.ebi_aspera_info", access_key=None, secret_key=None, region="us-east-1")
url_obj, status_code = src.authx.auth.get_s3_url(public=True, bucket="1000genomes", s3_endpoint="http://s3.us-east-1.amazonaws.com", object_id="README.ebi_aspera_info", access_key=None, secret_key=None, region="us-east-1")
response = requests.get(url_obj["url"])
print(response.text)
assert "If you wish to use aspera" in response.text
Expand All @@ -233,13 +233,13 @@ def test_tyk_api():
warnings.warn(UserWarning("KEYCLOAK_URL is not set"))
return

token = authx.auth.get_access_token(
token = src.authx.auth.get_access_token(
keycloak_url=KEYCLOAK_PUBLIC_URL,
username=SITE_ADMIN_USER,
password=SITE_ADMIN_PASSWORD
)
policy_id="testtest"
response = authx.auth.add_provider_to_tyk_api("91", token, f"{KEYCLOAK_PUBLIC_URL}/auth/realms/candig", policy_id=policy_id)
response = src.authx.auth.add_provider_to_tyk_api("91", token, f"{KEYCLOAK_PUBLIC_URL}/auth/realms/candig", policy_id=policy_id)
assert response.status_code == 200
time.sleep(5) # tyk takes a second to refresh this after reloading
url = f"{TYK_LOGIN_TARGET_URL}/tyk/apis/91"
Expand All @@ -254,12 +254,12 @@ def test_tyk_api():

# try adding the same thing again: the count should stay the same
count = len(response.json()['openid_options']['providers'])
response = authx.auth.add_provider_to_tyk_api("91", token, f"{KEYCLOAK_PUBLIC_URL}/auth/realms/candig", policy_id=policy_id)
response = src.authx.auth.add_provider_to_tyk_api("91", token, f"{KEYCLOAK_PUBLIC_URL}/auth/realms/candig", policy_id=policy_id)
assert response.status_code == 200
time.sleep(5) # tyk takes a second to refresh this after reloading
assert len(response.json()['openid_options']['providers']) == count

response = authx.auth.remove_provider_from_tyk_api("91", KEYCLOAK_PUBLIC_URL, policy_id=policy_id)
response = src.authx.auth.remove_provider_from_tyk_api("91", KEYCLOAK_PUBLIC_URL, policy_id=policy_id)
time.sleep(5) # tyk takes a second to refresh this after reloading
assert response.status_code == 200
response = requests.request("GET", url, headers=headers)
Expand All @@ -280,7 +280,7 @@ def test_service_store_secret():
warnings.warn(UserWarning("SERVICE_NAME is not set"))
else:
data = {"payload": "test"}
response, status_code = authx.auth.set_service_store_secret(SERVICE_NAME, key="testtest", value=data)
response, status_code = src.authx.auth.set_service_store_secret(SERVICE_NAME, key="testtest", value=data)
print(response)
assert status_code == 200
assert response["payload"] == "test"
Expand All @@ -296,8 +296,8 @@ def test_verify_service():
if SERVICE_NAME is None:
warnings.warn(UserWarning("SERVICE_NAME is not set"))
else:
token = authx.auth.create_service_token()
assert authx.auth.verify_service_token(service=SERVICE_NAME, token=token)
assert not authx.auth.verify_service_token(service=SERVICE_NAME, token="foo")
token = src.authx.auth.create_service_token()
assert src.authx.auth.verify_service_token(service=SERVICE_NAME, token=token)
assert not src.authx.auth.verify_service_token(service=SERVICE_NAME, token="foo")
else:
warnings.warn(UserWarning("VAULT_URL is not set"))

0 comments on commit 30d4766

Please sign in to comment.