diff --git a/test_auth.py b/test_auth.py index 125c043..f9f2d0d 100644 --- a/test_auth.py +++ b/test_auth.py @@ -1,7 +1,7 @@ import requests import os import pytest -import src.authx.auth as authx.auth +import src.authx.auth import tempfile from pathlib import Path import warnings @@ -31,13 +31,13 @@ def __init__(self, token=None, site_admin=False): warnings.warn(UserWarning("KEYCLOAK_URL is not set")) token = "testtesttest" elif site_admin: - token = authx.auth.get_access_token( + token = src.authx.auth.get_access_token( keycloak_url=KEYCLOAK_PUBLIC_URL, username=SITE_ADMIN_USER, password=SITE_ADMIN_PASSWORD ) else: - token = authx.auth.get_access_token( + token = src.authx.auth.get_access_token( keycloak_url=KEYCLOAK_PUBLIC_URL, username=NOT_ADMIN_USER, password=NOT_ADMIN_PASSWORD @@ -56,13 +56,13 @@ def test_add_opa_provider(): return if OPA_URL is not None: - token = authx.auth.get_access_token( + token = src.authx.auth.get_access_token( keycloak_url=KEYCLOAK_PUBLIC_URL, username=SITE_ADMIN_USER, password=SITE_ADMIN_PASSWORD ) test_key="testtest" - response = authx.auth.add_provider_to_opa(token, f"{KEYCLOAK_PUBLIC_URL}/auth/realms/candig", test_key=test_key) + response = src.authx.auth.add_provider_to_opa(token, f"{KEYCLOAK_PUBLIC_URL}/auth/realms/candig", test_key=test_key) print(response) assert len(response) > 0 found = False @@ -73,7 +73,7 @@ def test_add_opa_provider(): # try adding the same thing again: the count should stay the same count = len(response) - response = authx.auth.add_provider_to_opa(token, f"{KEYCLOAK_PUBLIC_URL}/auth/realms/candig", test_key=test_key) + response = src.authx.auth.add_provider_to_opa(token, f"{KEYCLOAK_PUBLIC_URL}/auth/realms/candig", test_key=test_key) assert len(response) == count else: warnings.warn(UserWarning("OPA_URL is not set")) @@ -85,8 +85,8 @@ def test_site_admin(): """ if OPA_URL is not None: print(f"{OPA_URL} {OPA_SECRET}") - assert authx.auth.is_site_admin(FakeRequest(site_admin=True), opa_url=OPA_URL, admin_secret=OPA_SECRET) - assert not authx.auth.is_site_admin(FakeRequest(), opa_url=OPA_URL, admin_secret=OPA_SECRET) + assert src.authx.auth.is_site_admin(FakeRequest(site_admin=True), opa_url=OPA_URL, admin_secret=OPA_SECRET) + assert not src.authx.auth.is_site_admin(FakeRequest(), opa_url=OPA_URL, admin_secret=OPA_SECRET) else: warnings.warn(UserWarning("OPA_URL is not set")) @@ -97,7 +97,7 @@ def test_user_email(): If OPA is present, check to see that the user's email is returned. """ if OPA_URL is not None: - assert authx.auth.get_user_email(FakeRequest(site_admin=True), opa_url=OPA_URL, admin_secret=OPA_SECRET) == f"{SITE_ADMIN_USER}@test.ca" + assert src.authx.auth.get_user_email(FakeRequest(site_admin=True), opa_url=OPA_URL, admin_secret=OPA_SECRET) == f"{SITE_ADMIN_USER}@test.ca" else: warnings.warn(UserWarning("OPA_URL is not set")) @@ -111,16 +111,16 @@ def test_remove_opa_provider(): return if OPA_URL is not None: - token = authx.auth.get_access_token( + token = src.authx.auth.get_access_token( keycloak_url=KEYCLOAK_PUBLIC_URL, username=SITE_ADMIN_USER, password=SITE_ADMIN_PASSWORD ) test_key="testtest" - response = authx.auth.add_provider_to_opa(token, f"{KEYCLOAK_PUBLIC_URL}/auth/realms/candig", test_key=test_key) + response = src.authx.auth.add_provider_to_opa(token, f"{KEYCLOAK_PUBLIC_URL}/auth/realms/candig", test_key=test_key) count = len(response) - response = authx.auth.remove_provider_from_opa(KEYCLOAK_PUBLIC_URL, test_key=test_key) + response = src.authx.auth.remove_provider_from_opa(KEYCLOAK_PUBLIC_URL, test_key=test_key) assert len(response) < count found = False for p in response: @@ -138,18 +138,18 @@ def test_get_opa_datasets(): if OPA_URL is not None: # try to get user1 datasets without OPA_SECRET: try: - user_datasets = authx.auth.get_opa_datasets(FakeRequest()) + user_datasets = src.authx.auth.get_opa_datasets(FakeRequest()) except requests.HTTPError as e: # get_opa_datasets should raise an error assert True # user1 has controlled4 in its datasets - user_datasets = authx.auth.get_opa_datasets(FakeRequest(), admin_secret=OPA_SECRET) + user_datasets = src.authx.auth.get_opa_datasets(FakeRequest(), admin_secret=OPA_SECRET) print(user_datasets) assert "SYNTHETIC-1" in user_datasets # user2 has controlled5 in its datasets - user_datasets = authx.auth.get_opa_datasets(FakeRequest(site_admin=True), admin_secret=OPA_SECRET) + user_datasets = src.authx.auth.get_opa_datasets(FakeRequest(site_admin=True), admin_secret=OPA_SECRET) print(user_datasets) assert "SYNTHETIC-2" in user_datasets else: @@ -166,17 +166,17 @@ def test_put_aws_credential(): return endpoint = "http://test.endpoint" # store credential using not-site-admin token - result, status_code = authx.auth.store_aws_credential(token=authx.auth.get_auth_token(FakeRequest()), endpoint=endpoint, bucket="test_bucket", access="test", secret="secret", vault_url=VAULT_URL) + result, status_code = src.authx.auth.store_aws_credential(token=src.authx.auth.get_auth_token(FakeRequest()), endpoint=endpoint, bucket="test_bucket", access="test", secret="secret", vault_url=VAULT_URL) print(result, status_code) assert status_code == 200 # try getting it with a non-site_admin token - result, status_code = authx.auth.get_aws_credential(token=authx.auth.get_auth_token(FakeRequest()), vault_url=VAULT_URL, endpoint=endpoint, bucket="test_bucket") + result, status_code = src.authx.auth.get_aws_credential(token=src.authx.auth.get_auth_token(FakeRequest()), vault_url=VAULT_URL, endpoint=endpoint, bucket="test_bucket") print(result) assert "errors" in result # try getting it with a site_admin token - result, status_code = authx.auth.get_aws_credential(token=authx.auth.get_auth_token(FakeRequest(site_admin=True)), vault_url=VAULT_URL, endpoint=endpoint, bucket="test_bucket") + result, status_code = src.authx.auth.get_aws_credential(token=src.authx.auth.get_auth_token(FakeRequest(site_admin=True)), vault_url=VAULT_URL, endpoint=endpoint, bucket="test_bucket") assert result['secret'] == 'secret' assert result['url'] == 'test.endpoint' else: @@ -197,13 +197,13 @@ def test_get_s3_url(): if os.getenv("SERVICE_NAME") != "candig-ingest": warnings.warn(UserWarning("aws credential tests can only be run within the candig-ingest container")) return - result, status_code = authx.auth.store_aws_credential(token=authx.auth.get_auth_token(FakeRequest()),endpoint=MINIO_URL, bucket="test", access=MINIO_ACCESS_KEY, secret=MINIO_SECRET_KEY, vault_url=VAULT_URL) + result, status_code = src.authx.auth.store_aws_credential(token=src.authx.auth.get_auth_token(FakeRequest()),endpoint=MINIO_URL, bucket="test", access=MINIO_ACCESS_KEY, secret=MINIO_SECRET_KEY, vault_url=VAULT_URL) assert result['url'] in MINIO_URL - minio = authx.auth.get_minio_client(token=authx.auth.get_auth_token(FakeRequest()), s3_endpoint=MINIO_URL, bucket="test") + minio = src.authx.auth.get_minio_client(token=src.authx.auth.get_auth_token(FakeRequest()), s3_endpoint=MINIO_URL, bucket="test") assert minio['endpoint'] == MINIO_URL else: warnings.warn(UserWarning("VAULT_URL is not set")) - minio = authx.auth.get_minio_client(token=authx.auth.get_auth_token(FakeRequest()), s3_endpoint=MINIO_URL, access_key=MINIO_ACCESS_KEY, secret_key=MINIO_SECRET_KEY, bucket="test") + minio = src.authx.auth.get_minio_client(token=src.authx.auth.get_auth_token(FakeRequest()), s3_endpoint=MINIO_URL, access_key=MINIO_ACCESS_KEY, secret_key=MINIO_SECRET_KEY, bucket="test") else: warnings.warn(UserWarning("MINIO_URL is not set")) return @@ -211,7 +211,7 @@ def test_get_s3_url(): minio['client'].put_object(minio['bucket'], filename, fp, Path(fp.name).stat().st_size) fp.close() - url_obj, status_code = authx.auth.get_s3_url(object_id=filename, s3_endpoint=minio['endpoint'], bucket=minio['bucket'], access_key=minio['access'], secret_key=minio['secret']) + url_obj, status_code = src.authx.auth.get_s3_url(object_id=filename, s3_endpoint=minio['endpoint'], bucket=minio['bucket'], access_key=minio['access'], secret_key=minio['secret']) print(url_obj["url"]) assert status_code == 200 @@ -222,7 +222,7 @@ def test_get_s3_url(): def test_get_public_s3_url(): - url_obj, status_code = authx.auth.get_s3_url(public=True, bucket="1000genomes", s3_endpoint="http://s3.us-east-1.amazonaws.com", object_id="README.ebi_aspera_info", access_key=None, secret_key=None, region="us-east-1") + url_obj, status_code = src.authx.auth.get_s3_url(public=True, bucket="1000genomes", s3_endpoint="http://s3.us-east-1.amazonaws.com", object_id="README.ebi_aspera_info", access_key=None, secret_key=None, region="us-east-1") response = requests.get(url_obj["url"]) print(response.text) assert "If you wish to use aspera" in response.text @@ -233,13 +233,13 @@ def test_tyk_api(): warnings.warn(UserWarning("KEYCLOAK_URL is not set")) return - token = authx.auth.get_access_token( + token = src.authx.auth.get_access_token( keycloak_url=KEYCLOAK_PUBLIC_URL, username=SITE_ADMIN_USER, password=SITE_ADMIN_PASSWORD ) policy_id="testtest" - response = authx.auth.add_provider_to_tyk_api("91", token, f"{KEYCLOAK_PUBLIC_URL}/auth/realms/candig", policy_id=policy_id) + response = src.authx.auth.add_provider_to_tyk_api("91", token, f"{KEYCLOAK_PUBLIC_URL}/auth/realms/candig", policy_id=policy_id) assert response.status_code == 200 time.sleep(5) # tyk takes a second to refresh this after reloading url = f"{TYK_LOGIN_TARGET_URL}/tyk/apis/91" @@ -254,12 +254,12 @@ def test_tyk_api(): # try adding the same thing again: the count should stay the same count = len(response.json()['openid_options']['providers']) - response = authx.auth.add_provider_to_tyk_api("91", token, f"{KEYCLOAK_PUBLIC_URL}/auth/realms/candig", policy_id=policy_id) + response = src.authx.auth.add_provider_to_tyk_api("91", token, f"{KEYCLOAK_PUBLIC_URL}/auth/realms/candig", policy_id=policy_id) assert response.status_code == 200 time.sleep(5) # tyk takes a second to refresh this after reloading assert len(response.json()['openid_options']['providers']) == count - response = authx.auth.remove_provider_from_tyk_api("91", KEYCLOAK_PUBLIC_URL, policy_id=policy_id) + response = src.authx.auth.remove_provider_from_tyk_api("91", KEYCLOAK_PUBLIC_URL, policy_id=policy_id) time.sleep(5) # tyk takes a second to refresh this after reloading assert response.status_code == 200 response = requests.request("GET", url, headers=headers) @@ -280,7 +280,7 @@ def test_service_store_secret(): warnings.warn(UserWarning("SERVICE_NAME is not set")) else: data = {"payload": "test"} - response, status_code = authx.auth.set_service_store_secret(SERVICE_NAME, key="testtest", value=data) + response, status_code = src.authx.auth.set_service_store_secret(SERVICE_NAME, key="testtest", value=data) print(response) assert status_code == 200 assert response["payload"] == "test" @@ -296,8 +296,8 @@ def test_verify_service(): if SERVICE_NAME is None: warnings.warn(UserWarning("SERVICE_NAME is not set")) else: - token = authx.auth.create_service_token() - assert authx.auth.verify_service_token(service=SERVICE_NAME, token=token) - assert not authx.auth.verify_service_token(service=SERVICE_NAME, token="foo") + token = src.authx.auth.create_service_token() + assert src.authx.auth.verify_service_token(service=SERVICE_NAME, token=token) + assert not src.authx.auth.verify_service_token(service=SERVICE_NAME, token="foo") else: warnings.warn(UserWarning("VAULT_URL is not set"))