Skip to content

Commit

Permalink
fix(mfa): Prevent add when email unverified
Browse files Browse the repository at this point in the history
  • Loading branch information
pennersr committed Sep 5, 2024
1 parent 6b731eb commit 2d5b2ff
Show file tree
Hide file tree
Showing 16 changed files with 196 additions and 62 deletions.
9 changes: 9 additions & 0 deletions ChangeLog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,15 @@ Fixes
in case of ``ACCOUNT_LOGIN_ON_EMAIL_CONFIRMATION = True``.


Security notice
---------------

- It was already the case that you could not enable TOTP 2FA if your account had
unverified email addresses. This is necessary to stop a user from claiming
email addresses and locking other users out. This safety check is now added to
WebAuthn security keys as well.


64.2.0 (2024-08-30)
*******************

Expand Down
9 changes: 7 additions & 2 deletions allauth/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,18 @@ def user_password(password_factory):


@pytest.fixture
def user_factory(email_factory, db, user_password):
def email_verified():
return True


@pytest.fixture
def user_factory(email_factory, db, user_password, email_verified):
def factory(
email=None,
username=None,
commit=True,
with_email=True,
email_verified=True,
email_verified=email_verified,
password=None,
with_emailaddress=True,
with_totp=False,
Expand Down
50 changes: 33 additions & 17 deletions allauth/headless/mfa/tests/test_totp.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,27 @@
import pytest

from allauth.mfa.models import Authenticator


def test_get_totp_not_active(
auth_client,
user,
headless_reverse,
):
@pytest.mark.parametrize("email_verified", [False, True])
def test_get_totp_not_active(auth_client, user, headless_reverse, email_verified):
resp = auth_client.get(headless_reverse("headless:mfa:manage_totp"))
assert resp.status_code == 404
data = resp.json()
assert len(data["meta"]["secret"]) == 32
assert len(data["meta"]["totp_url"]) == 145
if email_verified:
assert resp.status_code == 404
data = resp.json()
assert len(data["meta"]["secret"]) == 32
assert len(data["meta"]["totp_url"]) == 145
else:
assert resp.status_code == 409
assert resp.json() == {
"status": 409,
"errors": [
{
"message": "You cannot activate two-factor authentication until you have verified your email address.",
"code": "unverified_email",
}
],
}


def test_get_totp(
Expand All @@ -37,13 +48,15 @@ def test_deactivate_totp(
assert not Authenticator.objects.filter(user=user_with_totp).exists()


@pytest.mark.parametrize("email_verified", [False, True])
def test_activate_totp(
auth_client,
user,
headless_reverse,
reauthentication_bypass,
settings,
totp_validation_bypass,
email_verified,
):
with reauthentication_bypass():
with totp_validation_bypass():
Expand All @@ -52,11 +65,14 @@ def test_activate_totp(
data={"code": "42"},
content_type="application/json",
)
assert resp.status_code == 200
assert Authenticator.objects.filter(
user=user, type=Authenticator.Type.TOTP
).exists()
data = resp.json()
assert data["data"]["type"] == "totp"
assert isinstance(data["data"]["created_at"], float)
assert data["data"]["last_used_at"] is None
if email_verified:
assert resp.status_code == 200
assert Authenticator.objects.filter(
user=user, type=Authenticator.Type.TOTP
).exists()
data = resp.json()
assert data["data"]["type"] == "totp"
assert isinstance(data["data"]["created_at"], float)
assert data["data"]["last_used_at"] is None
else:
assert resp.status_code == 400
30 changes: 20 additions & 10 deletions allauth/headless/mfa/tests/test_webauthn.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from unittest.mock import ANY

import pytest

from allauth.headless.constants import Flow
from allauth.mfa.models import Authenticator

Expand Down Expand Up @@ -110,35 +112,43 @@ def test_delete_authenticator(
assert not Authenticator.objects.filter(pk=passkey.pk).exists()


@pytest.mark.parametrize("email_verified", [False, True])
def test_add_authenticator(
user,
auth_client,
headless_reverse,
webauthn_registration_bypass,
reauthentication_bypass,
email_verified,
):
resp = auth_client.get(headless_reverse("headless:mfa:manage_webauthn"))
# Reauthentication required
assert resp.status_code == 401
assert resp.status_code == 401 if email_verified else 409

with reauthentication_bypass():
resp = auth_client.get(headless_reverse("headless:mfa:manage_webauthn"))
data = resp.json()
assert data["data"]["creation_options"] == ANY
if email_verified:
assert resp.status_code == 200
data = resp.json()
assert data["data"]["creation_options"] == ANY
else:
assert resp.status_code == 409

with webauthn_registration_bypass(user, False) as credential:
resp = auth_client.post(
headless_reverse("headless:mfa:manage_webauthn"),
data={"credential": credential},
content_type="application/json",
)
assert resp.status_code == 200
assert (
Authenticator.objects.filter(
type=Authenticator.Type.WEBAUTHN, user=user
).count()
== 1
)
webauthn_count = Authenticator.objects.filter(
type=Authenticator.Type.WEBAUTHN, user=user
).count()
if email_verified:
assert resp.status_code == 200
assert webauthn_count == 1
else:
assert resp.status_code == 409
assert webauthn_count == 0


def test_2fa_login(
Expand Down
21 changes: 21 additions & 0 deletions allauth/headless/mfa/views.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
from django.core.exceptions import ValidationError

from allauth.account.models import Login
from allauth.headless.base.response import APIResponse, AuthenticationResponse
from allauth.headless.base.views import (
APIView,
AuthenticatedAPIView,
AuthenticationStageAPIView,
)
from allauth.headless.internal.restkit.response import ErrorResponse
from allauth.headless.mfa import response
from allauth.headless.mfa.inputs import (
ActivateTOTPInput,
Expand All @@ -18,6 +21,7 @@
UpdateWebAuthnInput,
)
from allauth.mfa.adapter import DefaultMFAAdapter, get_adapter
from allauth.mfa.internal.flows import add
from allauth.mfa.models import Authenticator
from allauth.mfa.recovery_codes.internal import flows as recovery_codes_flows
from allauth.mfa.stages import AuthenticateStage
Expand All @@ -28,6 +32,13 @@
)


def _validate_can_add_authenticator(request):
try:
add.validate_can_add_authenticator(request.user)
except ValidationError as e:
return ErrorResponse(request, status=409, exception=e)


class AuthenticateView(AuthenticationStageAPIView):
input_class = AuthenticateInput
stage_class = AuthenticateStage
Expand Down Expand Up @@ -63,6 +74,9 @@ class ManageTOTPView(AuthenticatedAPIView):
def get(self, request, *args, **kwargs) -> APIResponse:
authenticator = self._get_authenticator()
if not authenticator:
err = _validate_can_add_authenticator(request)
if err:
return err
adapter: DefaultMFAAdapter = get_adapter()
secret = totp_auth.get_totp_secret(regenerate=True)
totp_url: str = adapter.build_totp_url(request.user, secret)
Expand Down Expand Up @@ -112,6 +126,13 @@ class ManageWebAuthnView(AuthenticatedAPIView):
"DELETE": DeleteWebAuthnInput,
}

def handle(self, request, *args, **kwargs):
if request.method in ["GET", "POST"]:
err = _validate_can_add_authenticator(request)
if err:
return err
return super().handle(request, *args, **kwargs)

def get(self, request, *args, **kwargs):
passwordless = "passwordless" in request.GET
creation_options = webauthn_flows.begin_registration(
Expand Down
Empty file.
42 changes: 42 additions & 0 deletions allauth/mfa/internal/flows/add.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from functools import wraps

from django.contrib import messages
from django.core.exceptions import ValidationError
from django.http import HttpResponseRedirect
from django.urls import reverse

from allauth.account.adapter import get_adapter as get_account_adapter
from allauth.account.models import EmailAddress
from allauth.mfa.adapter import get_adapter


def validate_can_add_authenticator(user):
"""
If we would allow users to enable 2FA with unverified email address,
that would allow for an attacker to signup, not verify and prevent the real
owner of the account from ever regaining access.
"""
email_verified = not EmailAddress.objects.filter(user=user, verified=False).exists()
if not email_verified:
raise get_adapter().validation_error("unverified_email")


def redirect_if_add_not_allowed(function=None):
def decorator(view_func):
@wraps(view_func)
def _wrapper_view(request, *args, **kwargs):
if request.user.is_authenticated: # allow for this to go before reauth
try:
validate_can_add_authenticator(request.user)
except ValidationError as e:
for message in e.messages:
adapter = get_account_adapter()
adapter.add_message(request, messages.ERROR, message=message)
return HttpResponseRedirect(reverse("mfa_index"))
return view_func(request, *args, **kwargs)

return _wrapper_view

if function:
return decorator(function)
return decorator
19 changes: 6 additions & 13 deletions allauth/mfa/totp/forms.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from django import forms
from django.utils.translation import gettext_lazy as _

from allauth.account.models import EmailAddress
from allauth.mfa.adapter import get_adapter
from allauth.mfa.internal.flows.add import validate_can_add_authenticator
from allauth.mfa.totp.internal import auth


Expand All @@ -16,22 +16,15 @@ class ActivateTOTPForm(forms.Form):

def __init__(self, *args, **kwargs):
self.user = kwargs.pop("user")
self.email_verified = not EmailAddress.objects.filter(
user=self.user, verified=False
).exists()
super().__init__(*args, **kwargs)
self.secret = auth.get_totp_secret(regenerate=not self.is_bound)

def clean_code(self):
try:
code = self.cleaned_data["code"]
if not self.email_verified:
raise get_adapter().validation_error("unverified_email")
if not auth.validate_totp_code(self.secret, code):
raise get_adapter().validation_error("incorrect_code")
return code
except forms.ValidationError as e:
raise e
validate_can_add_authenticator(self.user)
code = self.cleaned_data["code"]
if not auth.validate_totp_code(self.secret, code):
raise get_adapter().validation_error("incorrect_code")
return code


class DeactivateTOTPForm(forms.Form):
Expand Down
23 changes: 9 additions & 14 deletions allauth/mfa/totp/tests/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
from django.test import Client
from django.urls import reverse

import pytest
from pytest_django.asserts import assertTemplateUsed

from allauth.account import app_settings
from allauth.account.authentication import AUTHENTICATION_METHODS_SESSION_KEY
from allauth.account.models import EmailAddress
from allauth.mfa.adapter import get_adapter
from allauth.mfa.models import Authenticator

Expand All @@ -29,22 +29,17 @@ def test_activate_totp_with_incorrect_code(auth_client, reauthentication_bypass)
}


@pytest.mark.parametrize("email_verified", [False])
@pytest.mark.parametrize("method", ["get", "post"])
def test_activate_totp_with_unverified_email(
auth_client, user, totp_validation_bypass, reauthentication_bypass
auth_client, user, totp_validation_bypass, reauthentication_bypass, method
):
EmailAddress.objects.filter(user=user).update(verified=False)
with reauthentication_bypass():
resp = auth_client.get(reverse("mfa_activate_totp"))
with totp_validation_bypass():
resp = auth_client.post(
reverse("mfa_activate_totp"),
{
"code": "123",
},
)
assert resp.context["form"].errors == {
"code": [get_adapter().error_messages["unverified_email"]],
}
if method == "get":
resp = auth_client.get(reverse("mfa_activate_totp"))
else:
resp = auth_client.post(reverse("mfa_activate_totp"), {"code": "123"})
assert resp["location"] == reverse("mfa_index")


def test_activate_totp_success(
Expand Down
2 changes: 2 additions & 0 deletions allauth/mfa/totp/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@
from allauth.account.decorators import reauthentication_required
from allauth.mfa import app_settings
from allauth.mfa.adapter import get_adapter
from allauth.mfa.internal.flows.add import redirect_if_add_not_allowed
from allauth.mfa.models import Authenticator
from allauth.mfa.totp.forms import ActivateTOTPForm, DeactivateTOTPForm
from allauth.mfa.totp.internal import flows
from allauth.mfa.utils import is_mfa_enabled
from allauth.utils import get_form_class


@method_decorator(redirect_if_add_not_allowed, name="dispatch")
@method_decorator(reauthentication_required, name="dispatch")
class ActivateTOTPView(FormView):
form_class = ActivateTOTPForm
Expand Down
15 changes: 15 additions & 0 deletions allauth/mfa/webauthn/tests/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,3 +137,18 @@ def test_add_key(
def test_list_keys(auth_client):
resp = auth_client.get(reverse("mfa_list_webauthn"))
assertTemplateUsed(resp, "mfa/webauthn/authenticator_list.html")


@pytest.mark.parametrize("email_verified", [False])
@pytest.mark.parametrize("method", ["get", "post"])
def test_add_with_unverified_email(
auth_client, user, webauthn_registration_bypass, reauthentication_bypass, method
):
with webauthn_registration_bypass(user, False) as credential:
if method == "get":
resp = auth_client.get(reverse("mfa_add_webauthn"))
else:
resp = auth_client.post(
reverse("mfa_add_webauthn"), data={"credential": credential}
)
assert resp["location"] == reverse("mfa_index")
Loading

0 comments on commit 2d5b2ff

Please sign in to comment.