Skip to content

Commit

Permalink
Get nextcloud id from email with another identity provider
Browse files Browse the repository at this point in the history
  • Loading branch information
LoanR committed Apr 25, 2024
1 parent 2869052 commit 5e4fa61
Show file tree
Hide file tree
Showing 4 changed files with 202 additions and 81 deletions.
77 changes: 6 additions & 71 deletions web/b3desk/commands.py
Original file line number Diff line number Diff line change
@@ -1,84 +1,19 @@
import click
import requests
from flask import Blueprint
from flask import current_app

bp = Blueprint("commands", __name__, cli_group=None)


class TooManyUsers(Exception):
"""Exception raised if email returns more than one user.
Attributes:
message -- explanation of the error
"""

def __init__(self, message="More than one user is using this email"):
self.message = message
super().__init__(self.message)


class NoUserFound(Exception):
"""Exception raised if email returns no user.
Attributes:
message -- explanation of the error
"""

def __init__(self, message="No user with this email was found"):
self.message = message
super().__init__(self.message)


@bp.cli.command("get-apps-id")
@click.argument("email")
def get_apps_id(email):
try:
token_response = requests.post(
f"{current_app.config['SECONDARY_IDENTITY_PROVIDER_URI']}/auth/realms/{current_app.config['SECONDARY_IDENTITY_PROVIDER_REALM']}/protocol/openid-connect/token",
headers={"Content-Type": "application/x-www-form-urlencoded"},
data={
"grant_type": "client_credentials",
"client_id": f"{current_app.config['SECONDARY_IDENTITY_PROVIDER_CLIENT_ID']}",
"client_secret": f"{current_app.config['SECONDARY_IDENTITY_PROVIDER_CLIENT_SECRET']}",
},
)
token_response.raise_for_status()
except requests.exceptions.HTTPError as exception:
current_app.logger.error(
"Get token request error: %s, %s", exception, token_response.text
)
raise exception
access_token = token_response.json()["access_token"]
from b3desk.models.users import get_secondary_identity_provider_id_from_email

try:
users_response = requests.get(
f"{current_app.config['SECONDARY_IDENTITY_PROVIDER_URI']}/auth/admin/realms/{current_app.config['SECONDARY_IDENTITY_PROVIDER_REALM']}/users",
headers={
"Authorization": f"Bearer {access_token}",
"cache-control": "no-cache",
},
params={"email": email},
)
users_response.raise_for_status()
except requests.exceptions.HTTPError as exception:
current_app.logger.error(
"Get user from email request error: %s, %s", exception, users_response.text
secondary_id = get_secondary_identity_provider_id_from_email(email)
current_app.logger.info(
"ID from secondary identity provider for email %s: %s", email, secondary_id
)
raise exception
found_users = users_response.json()
if (user_count := len(found_users)) > 1:
too_many_users_exception = TooManyUsers(
f"There are {user_count} users with the email {email}"
)
current_app.logger.error(too_many_users_exception.message)
raise too_many_users_exception
elif user_count < 1:
no_user_found_exception = NoUserFound(
f"There are no users with the email {email}"
)
current_app.logger.error(no_user_found_exception.message)
raise no_user_found_exception

[user] = found_users
return user["username"]
except Exception as e:
current_app.logger.error(e)
113 changes: 103 additions & 10 deletions web/b3desk/models/users.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,107 @@ def make_nextcloud_credentials_request(url, payload, headers):
return None


def get_user_nc_credentials(username):
class TooManyUsers(Exception):
"""Exception raised if email returns more than one user.
Attributes:
message -- explanation of the error
"""

def __init__(self, message="More than one user is using this email"):
self.message = message
super().__init__(self.message)


class NoUserFound(Exception):
"""Exception raised if email returns no user.
Attributes:
message -- explanation of the error
"""

def __init__(self, message="No user with this email was found"):
self.message = message
super().__init__(self.message)


def get_secondary_identity_provider_token():
return requests.post(
f"{current_app.config['SECONDARY_IDENTITY_PROVIDER_URI']}/auth/realms/{current_app.config['SECONDARY_IDENTITY_PROVIDER_REALM']}/protocol/openid-connect/token",
headers={"Content-Type": "application/x-www-form-urlencoded"},
data={
"grant_type": "client_credentials",
"client_id": f"{current_app.config['SECONDARY_IDENTITY_PROVIDER_CLIENT_ID']}",
"client_secret": f"{current_app.config['SECONDARY_IDENTITY_PROVIDER_CLIENT_SECRET']}",
},
)


def get_secondary_identity_provider_users_from_email(email, access_token):
return requests.get(
f"{current_app.config['SECONDARY_IDENTITY_PROVIDER_URI']}/auth/admin/realms/{current_app.config['SECONDARY_IDENTITY_PROVIDER_REALM']}/users",
headers={
"Authorization": f"Bearer {access_token}",
"cache-control": "no-cache",
},
params={"email": email},
)


def get_secondary_identity_provider_id_from_email(email):
try:
token_response = get_secondary_identity_provider_token()
token_response.raise_for_status()
except requests.exceptions.HTTPError as exception:
current_app.logger.warning(
"Get token request error: %s, %s", exception, token_response.text
)
raise exception
access_token = token_response.json()["access_token"]

try:
users_response = get_secondary_identity_provider_users_from_email(
email=email, access_token=access_token
)
users_response.raise_for_status()
except requests.exceptions.HTTPError as exception:
current_app.logger.warning(
"Get user from email request error: %s, %s", exception, users_response.text
)
raise exception
found_users = users_response.json()
if (user_count := len(found_users)) > 1:
raise TooManyUsers(f"There are {user_count} users with the email {email}")
elif user_count < 1:
raise NoUserFound(f"There are no users with the email {email}")

[user] = found_users
return user["username"]


def get_user_nc_credentials(preferred_username="", email=""):
if (
not current_app.config["NC_LOGIN_API_KEY"]
or not current_app.config["NC_LOGIN_API_URL"]
or not current_app.config["FILE_SHARING"]
or not username
or not (preferred_username or email)
):
current_app.logger.info(
"File sharing deactivated or unable to perform, no connection to Nextcloud instance"
)
return {"nctoken": None, "nclocator": None, "nclogin": None}

payload = {"username": username}
nc_username = preferred_username
if current_app.config["SECONDARY_IDENTITY_PROVIDER_ENABLED"] and email:
try:
nc_username = get_secondary_identity_provider_id_from_email(email=email)
except requests.exceptions.HTTPError:
pass
except TooManyUsers as e:
current_app.logger.warning(e.message)
except NoUserFound as e:
current_app.logger.warning(e.message)
payload = {"username": nc_username}
headers = {"X-API-KEY": current_app.config["NC_LOGIN_API_KEY"]}
current_app.logger.info(
"Retrieve NC credentials from NC_LOGIN_API_URL %s "
Expand Down Expand Up @@ -103,12 +191,17 @@ def update_user_nc_credentials(user, user_info):
if current_app.config["FILE_SHARING"]
else None
)
data = get_user_nc_credentials(preferred_username)
if (
preferred_username is None
or data["nclocator"] is None
or data["nctoken"] is None
):

if current_app.config["SECONDARY_IDENTITY_PROVIDER_ENABLED"]:
data = get_user_nc_credentials(
email=(
user_info.get("email") if current_app.config["FILE_SHARING"] else None
)
)
else:
data = get_user_nc_credentials(preferred_username=preferred_username)

if data["nclogin"] is None or data["nclocator"] is None or data["nctoken"] is None:
current_app.logger.info(
"No new Nextcloud enroll needed for user %s with those data %s", user, data
)
Expand All @@ -119,7 +212,7 @@ def update_user_nc_credentials(user, user_info):

user.nc_locator = data["nclocator"]
user.nc_token = data["nctoken"]
user.nc_login = preferred_username
user.nc_login = data["nclogin"]
user.nc_last_auto_enroll = nc_last_auto_enroll
return True

Expand Down
17 changes: 17 additions & 0 deletions web/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ def configuration(tmp_path, iam_server, iam_client, smtpd):
"OIDC_CLIENT_AUTH_METHOD": iam_client.token_endpoint_auth_method,
"OIDC_SCOPES": iam_client.scope,
"OIDC_USERINFO_HTTP_METHOD": "GET",
"SECONDARY_IDENTITY_PROVIDER_ENABLED": False,
"UPLOAD_DIR": str(tmp_path),
"TMP_DOWNLOAD_DIR": str(tmp_path),
"RECORDING": True,
Expand Down Expand Up @@ -287,3 +288,19 @@ def cloud_service_response(mocker, webdav_server, request):
@pytest.fixture
def jpg_file_content():
return b"\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x01\x00H\x00H\x00\x00\xff\xdb\x00C\x00\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xc2\x00\x0b\x08\x00\x01\x00\x01\x01\x01\x11\x00\xff\xc4\x00\x14\x10\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xda\x00\x08\x01\x01\x00\x01?\x10"


class ValidToken:
def raise_for_status():
pass

def json():
return {"access_token": "valid_token"}


@pytest.fixture
def valid_secondary_identity_token(mocker):
mocker.patch(
"b3desk.models.users.get_secondary_identity_provider_token",
return_value=ValidToken,
)
76 changes: 76 additions & 0 deletions web/tests/test_user.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
from datetime import date

import pytest
import requests
from freezegun import freeze_time

from b3desk.models import db
from b3desk.models.users import NoUserFound
from b3desk.models.users import TooManyUsers
from b3desk.models.users import User
from b3desk.models.users import get_or_create_user
from b3desk.models.users import get_secondary_identity_provider_id_from_email
from b3desk.models.users import make_nextcloud_credentials_request


Expand Down Expand Up @@ -108,3 +112,75 @@ def test_make_nextcloud_credentials_request_force_secure_for_missing_scheme(
headers={"X-API-KEY": app.config["NC_LOGIN_API_KEY"]},
)
assert credentials["nclocator"].startswith("https://")


def test_get_secondary_identity_provider_id_from_email_token_error(
client_app, mocker, caplog
):
class TokenErrorAnswer:
text = "Unable to get token"

def raise_for_status():
raise requests.exceptions.HTTPError

mocker.patch(
"b3desk.models.users.get_secondary_identity_provider_token",
return_value=TokenErrorAnswer,
)
with pytest.raises(requests.exceptions.HTTPError):
get_secondary_identity_provider_id_from_email("[email protected]")
assert "Get token request error:" in caplog.text


def test_get_secondary_identity_provider_id_from_email_request_error(
client_app, mocker, caplog, valid_secondary_identity_token
):
class RequestErrorAnswer:
text = "Unable to ask identity provider"

def raise_for_status():
raise requests.exceptions.HTTPError

mocker.patch(
"b3desk.models.users.get_secondary_identity_provider_users_from_email",
return_value=RequestErrorAnswer,
)
with pytest.raises(requests.exceptions.HTTPError):
get_secondary_identity_provider_id_from_email("[email protected]")
assert "Get user from email request error:" in caplog.text


def test_get_secondary_identity_provider_id_from_email_many_users(
client_app, app, mocker, valid_secondary_identity_token
):
class ManyUsersAnswer:
def raise_for_status():
pass

def json():
return [{"username": "freddy"}, {"username": "fred"}]

mocker.patch(
"b3desk.models.users.get_secondary_identity_provider_users_from_email",
return_value=ManyUsersAnswer,
)
with pytest.raises(TooManyUsers):
get_secondary_identity_provider_id_from_email("[email protected]")


def test_get_secondary_identity_provider_id_from_email_no_user(
client_app, app, mocker, valid_secondary_identity_token
):
class NoUsersAnswer:
def raise_for_status():
pass

def json():
return []

mocker.patch(
"b3desk.models.users.get_secondary_identity_provider_users_from_email",
return_value=NoUsersAnswer,
)
with pytest.raises(NoUserFound):
get_secondary_identity_provider_id_from_email("[email protected]")

0 comments on commit 5e4fa61

Please sign in to comment.