diff --git a/hc/accounts/forms.py b/hc/accounts/forms.py index 232a0fd3..aa68bb95 100644 --- a/hc/accounts/forms.py +++ b/hc/accounts/forms.py @@ -175,3 +175,5 @@ class TotpForm(forms.Form): def clean_code(self): if not self.totp.verify(self.cleaned_data["code"], valid_window=1): raise forms.ValidationError("The code you entered was incorrect.") + + return self.cleaned_data["code"] diff --git a/hc/accounts/tests/test_login_totp.py b/hc/accounts/tests/test_login_totp.py index ed0ed767..afaa4521 100644 --- a/hc/accounts/tests/test_login_totp.py +++ b/hc/accounts/tests/test_login_totp.py @@ -84,3 +84,14 @@ class LoginTotpTestCase(BaseTestCase): r = self.client.post(self.url, {"code": "000000"}) self.assertContains(r, "Too Many Requests") + + @patch("hc.accounts.views.pyotp.totp.TOTP") + def test_it_rejects_used_code(self, mock_TOTP): + mock_TOTP.return_value.verify.return_value = True + + obj = TokenBucket(value=f"totpc-{self.alice.id}-000000") + obj.tokens = 0 + obj.save() + + r = self.client.post(self.url, {"code": "000000"}) + self.assertContains(r, "Too Many Requests") diff --git a/hc/accounts/views.py b/hc/accounts/views.py index d173ce42..db22627d 100644 --- a/hc/accounts/views.py +++ b/hc/accounts/views.py @@ -848,11 +848,14 @@ def login_totp(request): totp = pyotp.totp.TOTP(user.profile.totp) if request.method == "POST": - if not TokenBucket.authorize_totp(user): + if not TokenBucket.authorize_totp_attempt(user): return render(request, "try_later.html") form = forms.TotpForm(totp, request.POST) if form.is_valid(): + if not TokenBucket.authorize_totp_code(user, form.cleaned_data["code"]): + return render(request, "try_later.html") + request.session.pop("2fa_user") auth_login(request, user, "hc.accounts.backends.EmailBackend") return _redirect_after_login(request) diff --git a/hc/api/models.py b/hc/api/models.py index bfbf3b8f..fb4e3535 100644 --- a/hc/api/models.py +++ b/hc/api/models.py @@ -985,9 +985,18 @@ class TokenBucket(models.Model): return TokenBucket.authorize(value, 10, 3600 * 24) @staticmethod - def authorize_totp(user): + def authorize_totp_attempt(user): value = "totp-%d" % user.id - # 96 attempts per 24 hours + # 96 attempts per user per 24 hours # (or, on average, one attempt per 15 minutes) return TokenBucket.authorize(value, 96, 3600 * 24) + + @staticmethod + def authorize_totp_code(user, code): + value = "totpc-%d-%s" % (user.id, code) + + # A code has a validity period of 3 * 30 = 90 seconds. + # During that period, allow the code to only be used once, + # so an eavesdropping attacker cannot reuse a code. + return TokenBucket.authorize(value, 1, 90)