diff --git a/web/b3desk/commands.py b/web/b3desk/commands.py index eb414d0..029dc29 100644 --- a/web/b3desk/commands.py +++ b/web/b3desk/commands.py @@ -1,84 +1,19 @@ import click -import requests from flask import Blueprint from flask import current_app bp = Blueprint("commands", __name__, cli_group=None) -class TooManyUsers(Exception): - """Exception raised if email returns more than one user. - - Attributes: - message -- explanation of the error - """ - - def __init__(self, message="More than one user is using this email"): - self.message = message - super().__init__(self.message) - - -class NoUserFound(Exception): - """Exception raised if email returns no user. - - Attributes: - message -- explanation of the error - """ - - def __init__(self, message="No user with this email was found"): - self.message = message - super().__init__(self.message) - - @bp.cli.command("get-apps-id") @click.argument("email") def get_apps_id(email): - try: - token_response = requests.post( - f"{current_app.config['SECONDARY_IDENTITY_PROVIDER_URI']}/auth/realms/{current_app.config['SECONDARY_IDENTITY_PROVIDER_REALM']}/protocol/openid-connect/token", - headers={"Content-Type": "application/x-www-form-urlencoded"}, - data={ - "grant_type": "client_credentials", - "client_id": f"{current_app.config['SECONDARY_IDENTITY_PROVIDER_CLIENT_ID']}", - "client_secret": f"{current_app.config['SECONDARY_IDENTITY_PROVIDER_CLIENT_SECRET']}", - }, - ) - token_response.raise_for_status() - except requests.exceptions.HTTPError as exception: - current_app.logger.error( - "Get token request error: %s, %s", exception, token_response.text - ) - raise exception - access_token = token_response.json()["access_token"] + from b3desk.models.users import get_secondary_identity_provider_id_from_email try: - users_response = requests.get( - f"{current_app.config['SECONDARY_IDENTITY_PROVIDER_URI']}/auth/admin/realms/{current_app.config['SECONDARY_IDENTITY_PROVIDER_REALM']}/users", - headers={ - "Authorization": f"Bearer {access_token}", - "cache-control": "no-cache", - }, - params={"email": email}, - ) - users_response.raise_for_status() - except requests.exceptions.HTTPError as exception: - current_app.logger.error( - "Get user from email request error: %s, %s", exception, users_response.text + secondary_id = get_secondary_identity_provider_id_from_email(email) + current_app.logger.info( + "ID from secondary identity provider for email %s: %s", email, secondary_id ) - raise exception - found_users = users_response.json() - if (user_count := len(found_users)) > 1: - too_many_users_exception = TooManyUsers( - f"There are {user_count} users with the email {email}" - ) - current_app.logger.error(too_many_users_exception.message) - raise too_many_users_exception - elif user_count < 1: - no_user_found_exception = NoUserFound( - f"There are no users with the email {email}" - ) - current_app.logger.error(no_user_found_exception.message) - raise no_user_found_exception - - [user] = found_users - return user["username"] + except Exception as e: + current_app.logger.error(e) diff --git a/web/b3desk/models/users.py b/web/b3desk/models/users.py index 611c49b..5fb6d82 100644 --- a/web/b3desk/models/users.py +++ b/web/b3desk/models/users.py @@ -45,19 +45,107 @@ def make_nextcloud_credentials_request(url, payload, headers): return None -def get_user_nc_credentials(username): +class TooManyUsers(Exception): + """Exception raised if email returns more than one user. + + Attributes: + message -- explanation of the error + """ + + def __init__(self, message="More than one user is using this email"): + self.message = message + super().__init__(self.message) + + +class NoUserFound(Exception): + """Exception raised if email returns no user. + + Attributes: + message -- explanation of the error + """ + + def __init__(self, message="No user with this email was found"): + self.message = message + super().__init__(self.message) + + +def get_secondary_identity_provider_token(): + return requests.post( + f"{current_app.config['SECONDARY_IDENTITY_PROVIDER_URI']}/auth/realms/{current_app.config['SECONDARY_IDENTITY_PROVIDER_REALM']}/protocol/openid-connect/token", + headers={"Content-Type": "application/x-www-form-urlencoded"}, + data={ + "grant_type": "client_credentials", + "client_id": f"{current_app.config['SECONDARY_IDENTITY_PROVIDER_CLIENT_ID']}", + "client_secret": f"{current_app.config['SECONDARY_IDENTITY_PROVIDER_CLIENT_SECRET']}", + }, + ) + + +def get_secondary_identity_provider_users_from_email(email, access_token): + return requests.get( + f"{current_app.config['SECONDARY_IDENTITY_PROVIDER_URI']}/auth/admin/realms/{current_app.config['SECONDARY_IDENTITY_PROVIDER_REALM']}/users", + headers={ + "Authorization": f"Bearer {access_token}", + "cache-control": "no-cache", + }, + params={"email": email}, + ) + + +def get_secondary_identity_provider_id_from_email(email): + try: + token_response = get_secondary_identity_provider_token() + token_response.raise_for_status() + except requests.exceptions.HTTPError as exception: + current_app.logger.warning( + "Get token request error: %s, %s", exception, token_response.text + ) + raise exception + access_token = token_response.json()["access_token"] + + try: + users_response = get_secondary_identity_provider_users_from_email( + email=email, access_token=access_token + ) + users_response.raise_for_status() + except requests.exceptions.HTTPError as exception: + current_app.logger.warning( + "Get user from email request error: %s, %s", exception, users_response.text + ) + raise exception + found_users = users_response.json() + if (user_count := len(found_users)) > 1: + raise TooManyUsers(f"There are {user_count} users with the email {email}") + elif user_count < 1: + raise NoUserFound(f"There are no users with the email {email}") + + [user] = found_users + return user["username"] + + +def get_user_nc_credentials(preferred_username="", email=""): if ( not current_app.config["NC_LOGIN_API_KEY"] or not current_app.config["NC_LOGIN_API_URL"] or not current_app.config["FILE_SHARING"] - or not username + or not (preferred_username or email) ): current_app.logger.info( "File sharing deactivated or unable to perform, no connection to Nextcloud instance" ) return {"nctoken": None, "nclocator": None, "nclogin": None} - payload = {"username": username} + nc_username = preferred_username + if current_app.config["SECONDARY_IDENTITY_PROVIDER_ENABLED"] and email: + try: + nc_username = get_secondary_identity_provider_id_from_email(email=email) + except requests.exceptions.HTTPError: + pass + except TooManyUsers as e: + current_app.logger.warning(e.message) + except NoUserFound as e: + current_app.logger.warning(e.message) + payload = {"username": nc_username} headers = {"X-API-KEY": current_app.config["NC_LOGIN_API_KEY"]} current_app.logger.info( "Retrieve NC credentials from NC_LOGIN_API_URL %s " @@ -103,12 +191,17 @@ def update_user_nc_credentials(user, user_info): if current_app.config["FILE_SHARING"] else None ) - data = get_user_nc_credentials(preferred_username) - if ( - preferred_username is None - or data["nclocator"] is None - or data["nctoken"] is None - ): + + if current_app.config["SECONDARY_IDENTITY_PROVIDER_ENABLED"]: + data = get_user_nc_credentials( + email=( + user_info.get("email") if current_app.config["FILE_SHARING"] else None + ) + ) + else: + data = get_user_nc_credentials(preferred_username=preferred_username) + + if data["nclogin"] is None or data["nclocator"] is None or data["nctoken"] is None: current_app.logger.info( "No new Nextcloud enroll needed for user %s with those data %s", user, data ) @@ -119,7 +212,7 @@ def update_user_nc_credentials(user, user_info): user.nc_locator = data["nclocator"] user.nc_token = data["nctoken"] - user.nc_login = preferred_username + user.nc_login = data["nclogin"] user.nc_last_auto_enroll = nc_last_auto_enroll return True diff --git a/web/tests/conftest.py b/web/tests/conftest.py index 443f420..5beecdd 100644 --- a/web/tests/conftest.py +++ b/web/tests/conftest.py @@ -82,6 +82,7 @@ def configuration(tmp_path, iam_server, iam_client, smtpd): "OIDC_CLIENT_AUTH_METHOD": iam_client.token_endpoint_auth_method, "OIDC_SCOPES": iam_client.scope, "OIDC_USERINFO_HTTP_METHOD": "GET", + "SECONDARY_IDENTITY_PROVIDER_ENABLED": False, "UPLOAD_DIR": str(tmp_path), "TMP_DOWNLOAD_DIR": str(tmp_path), "RECORDING": True, @@ -287,3 +288,19 @@ def cloud_service_response(mocker, webdav_server, request): @pytest.fixture def jpg_file_content(): return b"\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x01\x00H\x00H\x00\x00\xff\xdb\x00C\x00\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xc2\x00\x0b\x08\x00\x01\x00\x01\x01\x01\x11\x00\xff\xc4\x00\x14\x10\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xda\x00\x08\x01\x01\x00\x01?\x10" + + +class ValidToken: + def raise_for_status(): + pass + + def json(): + return {"access_token": "valid_token"} + + +@pytest.fixture +def valid_secondary_identity_token(mocker): + mocker.patch( + "b3desk.models.users.get_secondary_identity_provider_token", + return_value=ValidToken, + ) diff --git a/web/tests/test_user.py b/web/tests/test_user.py index 944f7ac..74bc2f2 100644 --- a/web/tests/test_user.py +++ b/web/tests/test_user.py @@ -1,11 +1,15 @@ from datetime import date import pytest +import requests from freezegun import freeze_time from b3desk.models import db +from b3desk.models.users import NoUserFound +from b3desk.models.users import TooManyUsers from b3desk.models.users import User from b3desk.models.users import get_or_create_user +from b3desk.models.users import get_secondary_identity_provider_id_from_email from b3desk.models.users import make_nextcloud_credentials_request @@ -108,3 +112,75 @@ def test_make_nextcloud_credentials_request_force_secure_for_missing_scheme( headers={"X-API-KEY": app.config["NC_LOGIN_API_KEY"]}, ) assert credentials["nclocator"].startswith("https://") + + +def test_get_secondary_identity_provider_id_from_email_token_error( + client_app, mocker, caplog +): + class TokenErrorAnswer: + text = "Unable to get token" + + def raise_for_status(): + raise requests.exceptions.HTTPError + + mocker.patch( + "b3desk.models.users.get_secondary_identity_provider_token", + return_value=TokenErrorAnswer, + ) + with pytest.raises(requests.exceptions.HTTPError): + get_secondary_identity_provider_id_from_email("jean.espece@rock.org") + assert "Get token request error:" in caplog.text + + +def test_get_secondary_identity_provider_id_from_email_request_error( + client_app, mocker, caplog, valid_secondary_identity_token +): + class RequestErrorAnswer: + text = "Unable to ask identity provider" + + def raise_for_status(): + raise requests.exceptions.HTTPError + + mocker.patch( + "b3desk.models.users.get_secondary_identity_provider_users_from_email", + return_value=RequestErrorAnswer, + ) + with pytest.raises(requests.exceptions.HTTPError): + get_secondary_identity_provider_id_from_email("michel.vendeur@rock.org") + assert "Get user from email request error:" in caplog.text + + +def test_get_secondary_identity_provider_id_from_email_many_users( + client_app, app, mocker, valid_secondary_identity_token +): + class ManyUsersAnswer: + def raise_for_status(): + pass + + def json(): + return [{"username": "freddy"}, {"username": "fred"}] + + mocker.patch( + "b3desk.models.users.get_secondary_identity_provider_users_from_email", + return_value=ManyUsersAnswer, + ) + with pytest.raises(TooManyUsers): + get_secondary_identity_provider_id_from_email("frederick.mercure@rock.org") + + +def test_get_secondary_identity_provider_id_from_email_no_user( + client_app, app, mocker, valid_secondary_identity_token +): + class NoUsersAnswer: + def raise_for_status(): + pass + + def json(): + return [] + + mocker.patch( + "b3desk.models.users.get_secondary_identity_provider_users_from_email", + return_value=NoUsersAnswer, + ) + with pytest.raises(NoUserFound): + get_secondary_identity_provider_id_from_email("blaireau.riviere@rock.org")