diff --git a/hc/accounts/tests/test_check_token.py b/hc/accounts/tests/test_check_token.py index 4de17e91..d0122949 100644 --- a/hc/accounts/tests/test_check_token.py +++ b/hc/accounts/tests/test_check_token.py @@ -60,5 +60,7 @@ class CheckTokenTestCase(BaseTestCase): # It should not log the user in yet self.assertNotIn("_auth_user_id", self.client.session) + # Instead, it should set 2fa_user_id in the session - self.assertEqual(self.client.session["2fa_user_id"], self.alice.id) + user_id, email, valid_until = self.client.session["2fa_user"] + self.assertEqual(user_id, self.alice.id) diff --git a/hc/accounts/tests/test_login.py b/hc/accounts/tests/test_login.py index 99cd86e5..80061b0f 100644 --- a/hc/accounts/tests/test_login.py +++ b/hc/accounts/tests/test_login.py @@ -124,5 +124,7 @@ class LoginTestCase(BaseTestCase): # It should not log the user in yet self.assertNotIn("_auth_user_id", self.client.session) + # Instead, it should set 2fa_user_id in the session - self.assertEqual(self.client.session["2fa_user_id"], self.alice.id) + user_id, email, valid_until = self.client.session["2fa_user"] + self.assertEqual(user_id, self.alice.id) diff --git a/hc/accounts/tests/test_login_webauthn.py b/hc/accounts/tests/test_login_webauthn.py index 092c3296..ac725a95 100644 --- a/hc/accounts/tests/test_login_webauthn.py +++ b/hc/accounts/tests/test_login_webauthn.py @@ -1,3 +1,4 @@ +import time from unittest.mock import patch from django.test.utils import override_settings @@ -11,7 +12,7 @@ class LoginWebAuthnTestCase(BaseTestCase): # This is the user we're trying to authenticate session = self.client.session - session["2fa_user_id"] = self.alice.id + session["2fa_user"] = [self.alice.id, self.alice.email, (time.time()) + 300] session.save() self.url = "/accounts/login/two_factor/" @@ -24,6 +25,28 @@ class LoginWebAuthnTestCase(BaseTestCase): # It should put a "state" key in the session: self.assertIn("state", self.client.session) + def test_it_requires_unauthenticated_user(self): + self.client.login(username="alice@example.org", password="password") + + r = self.client.get(self.url) + self.assertEqual(r.status_code, 400) + + def test_it_rejects_changed_email(self): + session = self.client.session + session["2fa_user"] = [self.alice.id, "eve@example.org", int(time.time())] + session.save() + + r = self.client.get(self.url) + self.assertEqual(r.status_code, 400) + + def test_it_rejects_old_timestamp(self): + session = self.client.session + session["2fa_user"] = [self.alice.id, self.alice.email, int(time.time()) - 310] + session.save() + + r = self.client.get(self.url) + self.assertRedirects(r, "/accounts/login/") + @override_settings(RP_ID=None) def test_it_requires_rp_id(self): r = self.client.get(self.url) diff --git a/hc/accounts/views.py b/hc/accounts/views.py index 724275c9..354b72c8 100644 --- a/hc/accounts/views.py +++ b/hc/accounts/views.py @@ -2,6 +2,7 @@ import base64 from datetime import timedelta as td from secrets import token_bytes from urllib.parse import urlparse +import time import uuid from django.db import transaction @@ -104,7 +105,12 @@ def _redirect_after_login(request): def _check_2fa(request, user): if user.credentials.exists(): - request.session["2fa_user_id"] = user.id + # We have verified user's password or token, and now must + # verify their security key. We store the following in user's session: + # - user.id, to look up the user in the login_webauthn view + # - user.email, to make sure email was not changed between the auth steps + # - timestamp, to limit the max time between the auth steps + request.session["2fa_user"] = [user.id, user.email, int(time.time())] path = reverse("hc-login-webauthn") redirect_url = request.GET.get("next") @@ -684,14 +690,26 @@ def _check_credential(request, form, credentials): def login_webauthn(request): - if "2fa_user_id" not in request.session: - return HttpResponseBadRequest() - # We require RP_ID. Fail predicably if it is not set: if not settings.RP_ID: return HttpResponse(status=500) - user = User.objects.get(id=request.session["2fa_user_id"]) + # Expect an unauthenticated user + if request.user.is_authenticated: + return HttpResponseBadRequest() + + if "2fa_user" not in request.session: + return HttpResponseBadRequest() + + user_id, email, timestamp = request.session["2fa_user"] + if timestamp + 300 < time.time(): + return redirect("hc-login") + + try: + user = User.objects.get(id=user_id, email=email) + except User.DoesNotExist: + return HttpResponseBadRequest() + credentials = [c.unpack() for c in user.credentials.all()] if request.method == "POST": @@ -703,7 +721,7 @@ def login_webauthn(request): return HttpResponseBadRequest() request.session.pop("state") - request.session.pop("2fa_user_id") + request.session.pop("2fa_user") auth_login(request, user, "hc.accounts.backends.EmailBackend") return _redirect_after_login(request)