Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Login by code required #4060

Merged
merged 6 commits into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ChangeLog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
- Verifying email addresses by means of a code (instead of a link) is now supported.
See ``settings.ACCOUNT_EMAIL_VERIFICATION_BY_CODE_ENABLED``.

- Added support for requiring logging in by code, so that every user logging in
is required to input a login confirmation code sent by email. See
``settings.ACCOUNT_LOGIN_BY_CODE_REQUIRED``.


64.1.0 (2024-08-15)
*******************
Expand Down
20 changes: 20 additions & 0 deletions allauth/account/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -782,6 +782,26 @@ def _generate_code(self):
allowed_chars = allowed_chars.replace(ch, "")
return get_random_string(length=6, allowed_chars=allowed_chars)

def is_login_by_code_required(self, login) -> bool:
"""
Returns whether or not login-by-code is required for the given
login.
"""
from allauth.account import authentication

method = None
records = authentication.get_authentication_records(self.request)
if records:
method = records[-1]["method"]
if method == "code":
return False
value = app_settings.LOGIN_BY_CODE_REQUIRED
if isinstance(value, bool):
return value
if not value:
return False
return method is None or method in value


def get_adapter(request=None):
return import_attribute(app_settings.ADAPTER)(request)
15 changes: 15 additions & 0 deletions allauth/account/app_settings.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import warnings
from enum import Enum
from typing import Set, Union

from django.core.exceptions import ImproperlyConfigured

Expand Down Expand Up @@ -450,6 +451,20 @@ def LOGIN_TIMEOUT(self):
"""
return self._setting("LOGIN_TIMEOUT", 15 * 60)

@property
def LOGIN_BY_CODE_REQUIRED(self) -> Union[bool, Set[str]]:
"""
When enabled (in case of ``True``), every user logging in is
required to input a login confirmation code sent by email.
Alternatively, you can specify a set of authentication methods
(``"password"``, ``"mfa"``, or ``"socialaccount"``) for which login
codes are required.
"""
value = self._setting("LOGIN_BY_CODE_REQUIRED", False)
if isinstance(value, bool):
return value
return set(value)


_app_settings = AppSettings("ACCOUNT_")

Expand Down
43 changes: 3 additions & 40 deletions allauth/account/authentication.py
Original file line number Diff line number Diff line change
@@ -1,43 +1,6 @@
import time


AUTHENTICATION_METHODS_SESSION_KEY = "account_authentication_methods"


def record_authentication(request, method, **extra_data):
"""Here we keep a log of all authentication methods used within the current
session. Important to note is that having entries here does not imply that
a user is fully signed in. For example, consider a case where a user
authenticates using a password, but fails to complete the 2FA challenge.
Or, a user successfully signs in into an inactive account or one that still
needs verification. In such cases, ``request.user`` is still anonymous, yet,
we do have an entry here.

Example data::

{'method': 'password',
'at': 1701423602.7184925,
'username': 'john.doe'}

{'method': 'socialaccount',
'at': 1701423567.6368647,
'provider': 'amazon',
'uid': 'amzn1.account.K2LI23KL2LK2'}

{'method': 'mfa',
'at': 1701423602.6392953,
'id': 1,
'type': 'totp'}

"""
methods = request.session.get(AUTHENTICATION_METHODS_SESSION_KEY, [])
data = {
"method": method,
"at": time.time(),
**extra_data,
}
methods.append(data)
request.session[AUTHENTICATION_METHODS_SESSION_KEY] = methods
from allauth.account.internal.flows.login import (
AUTHENTICATION_METHODS_SESSION_KEY,
)


def get_authentication_records(request):
Expand Down
11 changes: 11 additions & 0 deletions allauth/account/internal/flows/email_verification.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,17 @@
from allauth.utils import build_absolute_uri


def verify_email_indirectly(request: HttpRequest, user, email: str) -> bool:
try:
email_address = EmailAddress.objects.get_for_user(user, email)
except EmailAddress.DoesNotExist:
return False
else:
if not email_address.verified:
return verify_email(request, email_address)
return True


def verify_email(request: HttpRequest, email_address: EmailAddress) -> bool:
"""
Marks the email address as confirmed on the db
Expand Down
39 changes: 38 additions & 1 deletion allauth/account/internal/flows/login.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,51 @@
import time
from typing import Any, Dict

from django.http import HttpRequest, HttpResponse

from allauth.account.adapter import get_adapter
from allauth.account.authentication import record_authentication
from allauth.account.models import Login
from allauth.core.exceptions import ImmediateHttpResponse


LOGIN_SESSION_KEY = "account_login"
AUTHENTICATION_METHODS_SESSION_KEY = "account_authentication_methods"


def record_authentication(request, method, **extra_data):
"""Here we keep a log of all authentication methods used within the current
session. Important to note is that having entries here does not imply that
a user is fully signed in. For example, consider a case where a user
authenticates using a password, but fails to complete the 2FA challenge.
Or, a user successfully signs in into an inactive account or one that still
needs verification. In such cases, ``request.user`` is still anonymous, yet,
we do have an entry here.

Example data::

{'method': 'password',
'at': 1701423602.7184925,
'username': 'john.doe'}

{'method': 'socialaccount',
'at': 1701423567.6368647,
'provider': 'amazon',
'uid': 'amzn1.account.K2LI23KL2LK2'}

{'method': 'mfa',
'at': 1701423602.6392953,
'id': 1,
'type': 'totp'}

"""
methods = request.session.get(AUTHENTICATION_METHODS_SESSION_KEY, [])
data = {
"method": method,
"at": time.time(),
**extra_data,
}
methods.append(data)
request.session[AUTHENTICATION_METHODS_SESSION_KEY] = methods


def _get_login_hook_kwargs(login: Login) -> Dict[str, Any]:
Expand Down
51 changes: 34 additions & 17 deletions allauth/account/internal/flows/login_by_code.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,33 @@

from allauth.account import app_settings
from allauth.account.adapter import get_adapter
from allauth.account.authentication import record_authentication
from allauth.account.internal.flows.login import perform_login
from allauth.account.internal.flows.email_verification import (
verify_email_indirectly,
)
from allauth.account.internal.flows.login import (
perform_login,
record_authentication,
)
from allauth.account.internal.flows.signup import send_unknown_account_mail
from allauth.account.models import Login


LOGIN_CODE_STATE_KEY = "login_code"


def request_login_code(request: HttpRequest, email: str) -> None:
def request_login_code(
request: HttpRequest, email: str, login: Optional[Login] = None
) -> None:
from allauth.account.utils import filter_users_by_email, stash_login

initiated_by_user = login is None
adapter = get_adapter()
users = filter_users_by_email(email, is_active=True, prefer_verified=True)
pending_login = {
"at": time.time(),
"email": email,
"failed_attempts": 0,
"initiated_by_user": initiated_by_user,
}
if not users:
user = None
Expand All @@ -41,16 +50,19 @@ def request_login_code(request: HttpRequest, email: str) -> None:
pending_login.update(
{"code": code, "user_id": user._meta.pk.value_to_string(user)}
)
login = Login(user=user, email=email)
login.state[LOGIN_CODE_STATE_KEY] = pending_login
login.state["stages"] = {"current": "login_by_code"}
adapter.add_message(
request,
messages.SUCCESS,
"account/messages/login_code_sent.txt",
{"email": email},
)
stash_login(request, login)
if initiated_by_user:
login = Login(user=user, email=email)
login.state["stages"] = {"current": "login_by_code"}
assert login
login.state[LOGIN_CODE_STATE_KEY] = pending_login
if initiated_by_user:
stash_login(request, login)


def get_pending_login(
Expand Down Expand Up @@ -94,18 +106,23 @@ def perform_login_by_code(
stage,
redirect_url: Optional[str],
):
state = stage.login.state.pop(LOGIN_CODE_STATE_KEY)
state = stage.login.state[LOGIN_CODE_STATE_KEY]
email = state["email"]
user = stage.login.user
record_authentication(request, method="code", email=email)
# Just requesting a login code does is not considered to be a real login,
# yet, is needed in order to make the stage machinery work. Now that we've
# completed the code, let's start a real login.
login = Login(
user=stage.login.user,
redirect_url=redirect_url,
email=email,
)
return perform_login(request, login)
verify_email_indirectly(request, user, email)
if state["initiated_by_user"]:
# Just requesting a login code does is not considered to be a real login,
# yet, is needed in order to make the stage machinery work. Now that we've
# completed the code, let's start a real login.
login = Login(
user=user,
redirect_url=redirect_url,
email=email,
)
return perform_login(request, login)
else:
return stage.exit()


def compare_code(*, actual, expected) -> bool:
Expand Down
6 changes: 2 additions & 4 deletions allauth/account/internal/flows/reauthentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,8 @@

from allauth import app_settings as allauth_settings
from allauth.account import app_settings
from allauth.account.authentication import (
get_authentication_records,
record_authentication,
)
from allauth.account.authentication import get_authentication_records
from allauth.account.internal.flows.login import record_authentication
from allauth.core.exceptions import ReauthenticationRequired
from allauth.core.internal.httpkit import (
deserialize_request,
Expand Down
3 changes: 2 additions & 1 deletion allauth/account/managers.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from datetime import timedelta
from typing import Optional

from django.db import models
from django.db.models import Q
Expand Down Expand Up @@ -69,7 +70,7 @@ def get_primary(self, user):
except self.model.DoesNotExist:
return None

def get_primary_email(self, user):
def get_primary_email(self, user) -> Optional[str]:
from allauth.account.utils import user_email

primary = self.get_primary(user)
Expand Down
15 changes: 13 additions & 2 deletions allauth/account/stages.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@

from allauth.account.adapter import get_adapter
from allauth.account.app_settings import EmailVerificationMethod
from allauth.account.models import EmailAddress
from allauth.account.utils import resume_login, stash_login, unstash_login
from allauth.core.internal.httpkit import headed_redirect_response
from allauth.utils import import_callable


Expand Down Expand Up @@ -141,8 +143,17 @@ def handle(self):
from allauth.account.internal.flows import login_by_code

user, data = login_by_code.get_pending_login(self.login, peek=True)
if data is None:
login_by_code_required = get_adapter().is_login_by_code_required(self.login)
if data is None and not login_by_code_required:
# No pending login, just continue.
return None, True
response = HttpResponseRedirect(reverse("account_confirm_login_code"))
elif data is None and login_by_code_required:
email = EmailAddress.objects.get_primary_email(self.login.user)
if not email:
# No way of contacting the user.. cannot meet the
# requirements. Abort.
return headed_redirect_response("account_login"), False
login_by_code.request_login_code(self.request, email, login=self.login)

response = headed_redirect_response("account_confirm_login_code")
return response, True
36 changes: 36 additions & 0 deletions allauth/account/tests/test_login_by_code.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from allauth.account.authentication import AUTHENTICATION_METHODS_SESSION_KEY
from allauth.account.internal.flows.login import LOGIN_SESSION_KEY
from allauth.account.internal.flows.login_by_code import LOGIN_CODE_STATE_KEY
from allauth.account.models import EmailAddress


@pytest.fixture
Expand Down Expand Up @@ -73,3 +74,38 @@ def test_login_by_code_unknown_user(mailoutbox, client, db):
assert resp.status_code == 302
assert resp["location"] == reverse("account_confirm_login_code")
resp = client.post(reverse("account_confirm_login_code"), data={"code": "123456"})


@pytest.mark.parametrize(
"setting,code_required",
[
(True, True),
({"password"}, True),
({"socialaccount"}, False),
],
)
def test_login_by_code_required(
client, settings, user_factory, password_factory, setting, code_required
):
password = password_factory()
user = user_factory(password=password, email_verified=False)
email_address = EmailAddress.objects.get(email=user.email)
assert not email_address.verified
settings.ACCOUNT_LOGIN_BY_CODE_REQUIRED = setting
resp = client.post(
reverse("account_login"),
data={"login": user.username, "password": password},
)
assert resp.status_code == 302
if code_required:
assert resp["location"] == reverse("account_confirm_login_code")
code = client.session[LOGIN_SESSION_KEY]["state"][LOGIN_CODE_STATE_KEY]["code"]
resp = client.get(
reverse("account_confirm_login_code"),
data={"login": user.username, "password": password},
)
assert resp.status_code == 200
resp = client.post(reverse("account_confirm_login_code"), data={"code": code})
email_address.refresh_from_db()
assert email_address.verified
assert resp["location"] == settings.LOGIN_REDIRECT_URL
Loading