You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

929 lines
30 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
  1. import base64
  2. from datetime import timedelta as td
  3. from secrets import token_bytes, token_urlsafe
  4. from urllib.parse import urlparse
  5. import time
  6. import uuid
  7. from django.db import transaction
  8. from django.conf import settings
  9. from django.contrib import messages
  10. from django.contrib.auth import login as auth_login
  11. from django.contrib.auth import logout as auth_logout
  12. from django.contrib.auth import authenticate, update_session_auth_hash
  13. from django.contrib.auth.decorators import login_required
  14. from django.contrib.auth.models import User
  15. from django.core import signing
  16. from django.http import HttpResponse, HttpResponseForbidden, HttpResponseBadRequest
  17. from django.shortcuts import get_object_or_404, redirect, render
  18. from django.utils.timezone import now
  19. from django.urls import resolve, reverse, Resolver404
  20. from django.views.decorators.csrf import csrf_exempt
  21. from django.views.decorators.http import require_POST
  22. from fido2.ctap2 import AttestationObject, AuthenticatorData
  23. from fido2.client import ClientData
  24. from fido2.server import Fido2Server
  25. from fido2.webauthn import PublicKeyCredentialRpEntity
  26. from fido2 import cbor
  27. from hc.accounts import forms
  28. from hc.accounts.decorators import require_sudo_mode
  29. from hc.accounts.models import Credential, Profile, Project, Member
  30. from hc.api.models import Channel, Check, TokenBucket
  31. from hc.payments.models import Subscription
  32. import pyotp
  33. import segno
  34. POST_LOGIN_ROUTES = (
  35. "hc-checks",
  36. "hc-details",
  37. "hc-log",
  38. "hc-channels",
  39. "hc-add-slack",
  40. "hc-add-pushover",
  41. "hc-add-telegram",
  42. "hc-project-settings",
  43. "hc-uncloak",
  44. )
  45. FIDO2_SERVER = Fido2Server(PublicKeyCredentialRpEntity(settings.RP_ID, "healthchecks"))
  46. def _allow_redirect(redirect_url):
  47. if not redirect_url:
  48. return False
  49. parsed = urlparse(redirect_url)
  50. if parsed.netloc:
  51. # Allow redirects only to relative URLs
  52. return False
  53. try:
  54. match = resolve(parsed.path)
  55. except Resolver404:
  56. return False
  57. return match.url_name in POST_LOGIN_ROUTES
  58. def _make_user(email, tz=None, with_project=True):
  59. username = str(uuid.uuid4())[:30]
  60. user = User(username=username, email=email)
  61. user.set_unusable_password()
  62. user.save()
  63. project = None
  64. if with_project:
  65. project = Project(owner=user)
  66. project.badge_key = user.username
  67. project.save()
  68. check = Check(project=project)
  69. check.set_name_slug("My First Check")
  70. check.save()
  71. channel = Channel(project=project)
  72. channel.kind = "email"
  73. channel.value = email
  74. channel.email_verified = True
  75. channel.save()
  76. channel.checks.add(check)
  77. # Ensure a profile gets created
  78. profile = Profile.objects.for_user(user)
  79. if tz:
  80. profile.tz = tz
  81. profile.save()
  82. return user
  83. def _redirect_after_login(request):
  84. """ Redirect to the URL indicated in ?next= query parameter. """
  85. redirect_url = request.GET.get("next")
  86. if _allow_redirect(redirect_url):
  87. return redirect(redirect_url)
  88. if request.user.project_set.count() == 1:
  89. project = request.user.project_set.first()
  90. return redirect("hc-checks", project.code)
  91. return redirect("hc-index")
  92. def _check_2fa(request, user):
  93. have_keys = user.credentials.exists()
  94. profile = Profile.objects.for_user(user)
  95. if have_keys or profile.totp:
  96. # We have verified user's password or token, and now must
  97. # verify their security key. We store the following in user's session:
  98. # - user.id, to look up the user in the login_webauthn view
  99. # - user.email, to make sure email was not changed between the auth steps
  100. # - timestamp, to limit the max time between the auth steps
  101. request.session["2fa_user"] = [user.id, user.email, int(time.time())]
  102. if have_keys:
  103. path = reverse("hc-login-webauthn")
  104. else:
  105. path = reverse("hc-login-totp")
  106. redirect_url = request.GET.get("next")
  107. if _allow_redirect(redirect_url):
  108. path += "?next=%s" % redirect_url
  109. return redirect(path)
  110. auth_login(request, user)
  111. return _redirect_after_login(request)
  112. def _new_key(nbytes=24):
  113. while True:
  114. candidate = token_urlsafe(nbytes)
  115. if candidate[0] not in "-_" and candidate[-1] not in "-_":
  116. return candidate
  117. def login(request):
  118. form = forms.PasswordLoginForm()
  119. magic_form = forms.EmailLoginForm()
  120. if request.method == "POST":
  121. if request.POST.get("action") == "login":
  122. form = forms.PasswordLoginForm(request.POST)
  123. if form.is_valid():
  124. return _check_2fa(request, form.user)
  125. else:
  126. magic_form = forms.EmailLoginForm(request.POST)
  127. if magic_form.is_valid():
  128. redirect_url = request.GET.get("next")
  129. if not _allow_redirect(redirect_url):
  130. redirect_url = None
  131. profile = Profile.objects.for_user(magic_form.user)
  132. profile.send_instant_login_link(redirect_url=redirect_url)
  133. response = redirect("hc-login-link-sent")
  134. # check_token_submit looks for this cookie to decide if
  135. # it needs to do the extra POST step.
  136. response.set_cookie("auto-login", "1", max_age=300, httponly=True)
  137. return response
  138. if request.user.is_authenticated:
  139. return _redirect_after_login(request)
  140. bad_link = request.session.pop("bad_link", None)
  141. ctx = {
  142. "page": "login",
  143. "form": form,
  144. "magic_form": magic_form,
  145. "bad_link": bad_link,
  146. "registration_open": settings.REGISTRATION_OPEN,
  147. "support_email": settings.SUPPORT_EMAIL,
  148. }
  149. return render(request, "accounts/login.html", ctx)
  150. def logout(request):
  151. auth_logout(request)
  152. return redirect("hc-index")
  153. @require_POST
  154. @csrf_exempt
  155. def signup(request):
  156. if not settings.REGISTRATION_OPEN:
  157. return HttpResponseForbidden()
  158. ctx = {}
  159. form = forms.SignupForm(request.POST)
  160. if form.is_valid():
  161. email = form.cleaned_data["identity"]
  162. tz = form.cleaned_data["tz"]
  163. user = _make_user(email, tz)
  164. profile = Profile.objects.for_user(user)
  165. profile.send_instant_login_link()
  166. ctx["created"] = True
  167. else:
  168. ctx = {"form": form}
  169. response = render(request, "accounts/signup_result.html", ctx)
  170. if ctx.get("created"):
  171. response.set_cookie("auto-login", "1", max_age=300, httponly=True)
  172. return response
  173. def login_link_sent(request):
  174. return render(request, "accounts/login_link_sent.html")
  175. def check_token(request, username, token):
  176. if request.user.is_authenticated and request.user.username == username:
  177. # User is already logged in
  178. return _redirect_after_login(request)
  179. # Some email servers open links in emails to check for malicious content.
  180. # To work around this, we sign user in if the method is POST
  181. # *or* if the browser presents a cookie we had set when sending the login link.
  182. #
  183. # If the method is GET and the auto-login cookie isn't present, we serve
  184. # a HTML form with a submit button.
  185. if request.method == "POST" or "auto-login" in request.COOKIES:
  186. user = authenticate(username=username, token=token)
  187. if user is not None and user.is_active:
  188. user.profile.token = ""
  189. user.profile.save()
  190. return _check_2fa(request, user)
  191. request.session["bad_link"] = True
  192. return redirect("hc-login")
  193. return render(request, "accounts/check_token_submit.html")
  194. @login_required
  195. def profile(request):
  196. profile = request.profile
  197. ctx = {
  198. "page": "profile",
  199. "profile": profile,
  200. "my_projects_status": "default",
  201. "2fa_status": "default",
  202. "added_credential_name": request.session.pop("added_credential_name", ""),
  203. "removed_credential_name": request.session.pop("removed_credential_name", ""),
  204. "enabled_totp": request.session.pop("enabled_totp", False),
  205. "disabled_totp": request.session.pop("disabled_totp", False),
  206. "credentials": list(request.user.credentials.order_by("id")),
  207. "use_webauthn": settings.RP_ID,
  208. }
  209. if ctx["added_credential_name"] or ctx["enabled_totp"]:
  210. ctx["2fa_status"] = "success"
  211. if ctx["removed_credential_name"] or ctx["disabled_totp"]:
  212. ctx["2fa_status"] = "info"
  213. if request.session.pop("changed_password", False):
  214. ctx["changed_password"] = True
  215. ctx["email_password_status"] = "success"
  216. if request.method == "POST" and "leave_project" in request.POST:
  217. code = request.POST["code"]
  218. try:
  219. project = Project.objects.get(code=code, member__user=request.user)
  220. except Project.DoesNotExist:
  221. return HttpResponseBadRequest()
  222. Member.objects.filter(project=project, user=request.user).delete()
  223. ctx["left_project"] = project
  224. ctx["my_projects_status"] = "info"
  225. return render(request, "accounts/profile.html", ctx)
  226. @login_required
  227. @require_POST
  228. def add_project(request):
  229. form = forms.ProjectNameForm(request.POST)
  230. if not form.is_valid():
  231. return HttpResponseBadRequest()
  232. project = Project(owner=request.user)
  233. project.code = project.badge_key = str(uuid.uuid4())
  234. project.name = form.cleaned_data["name"]
  235. project.save()
  236. return redirect("hc-checks", project.code)
  237. @login_required
  238. def project(request, code):
  239. project = get_object_or_404(Project, code=code)
  240. is_owner = project.owner_id == request.user.id
  241. if request.user.is_superuser or is_owner:
  242. is_manager = True
  243. rw = True
  244. else:
  245. membership = get_object_or_404(Member, project=project, user=request.user)
  246. is_manager = membership.role == Member.Role.MANAGER
  247. rw = membership.is_rw
  248. ctx = {
  249. "page": "project",
  250. "rw": rw,
  251. "project": project,
  252. "is_owner": is_owner,
  253. "is_manager": is_manager,
  254. "show_api_keys": "show_api_keys" in request.GET,
  255. "enable_prometheus": settings.PROMETHEUS_ENABLED is True,
  256. }
  257. if request.method == "POST":
  258. if "create_key" in request.POST:
  259. if not rw:
  260. return HttpResponseForbidden()
  261. if request.POST["create_key"] == "api_key":
  262. project.api_key = _new_key(24)
  263. elif request.POST["create_key"] == "api_key_readonly":
  264. project.api_key_readonly = _new_key(24)
  265. elif request.POST["create_key"] == "ping_key":
  266. project.ping_key = _new_key(16)
  267. project.save()
  268. ctx["key_created"] = True
  269. ctx["api_status"] = "success"
  270. ctx["show_keys"] = True
  271. elif "revoke_key" in request.POST:
  272. if not rw:
  273. return HttpResponseForbidden()
  274. if request.POST["revoke_key"] == "api_key":
  275. project.api_key = ""
  276. elif request.POST["revoke_key"] == "api_key_readonly":
  277. project.api_key_readonly = ""
  278. elif request.POST["revoke_key"] == "ping_key":
  279. project.ping_key = None
  280. project.save()
  281. ctx["key_revoked"] = True
  282. ctx["api_status"] = "info"
  283. elif "show_keys" in request.POST:
  284. if not rw:
  285. return HttpResponseForbidden()
  286. ctx["show_keys"] = True
  287. elif "invite_team_member" in request.POST:
  288. if not is_manager:
  289. return HttpResponseForbidden()
  290. form = forms.InviteTeamMemberForm(request.POST)
  291. if form.is_valid():
  292. email = form.cleaned_data["email"]
  293. invite_suggestions = project.invite_suggestions()
  294. if not invite_suggestions.filter(email=email).exists():
  295. # We're inviting a new user. Are we within team size limit?
  296. if not project.can_invite_new_users():
  297. return HttpResponseForbidden()
  298. # And are we not hitting a rate limit?
  299. if not TokenBucket.authorize_invite(request.user):
  300. return render(request, "try_later.html")
  301. try:
  302. user = User.objects.get(email=email)
  303. except User.DoesNotExist:
  304. user = _make_user(email, with_project=False)
  305. if project.invite(user, role=form.cleaned_data["role"]):
  306. ctx["team_member_invited"] = email
  307. ctx["team_status"] = "success"
  308. else:
  309. ctx["team_member_duplicate"] = email
  310. ctx["team_status"] = "info"
  311. elif "remove_team_member" in request.POST:
  312. if not is_manager:
  313. return HttpResponseForbidden()
  314. form = forms.RemoveTeamMemberForm(request.POST)
  315. if form.is_valid():
  316. q = User.objects
  317. q = q.filter(email=form.cleaned_data["email"])
  318. q = q.filter(memberships__project=project)
  319. farewell_user = q.first()
  320. if farewell_user is None:
  321. return HttpResponseBadRequest()
  322. if farewell_user == request.user:
  323. return HttpResponseBadRequest()
  324. Member.objects.filter(project=project, user=farewell_user).delete()
  325. ctx["team_member_removed"] = form.cleaned_data["email"]
  326. ctx["team_status"] = "info"
  327. elif "set_project_name" in request.POST:
  328. if not rw:
  329. return HttpResponseForbidden()
  330. form = forms.ProjectNameForm(request.POST)
  331. if form.is_valid():
  332. project.name = form.cleaned_data["name"]
  333. project.save()
  334. ctx["project_name_updated"] = True
  335. ctx["project_name_status"] = "success"
  336. elif "transfer_project" in request.POST:
  337. if not is_owner:
  338. return HttpResponseForbidden()
  339. form = forms.TransferForm(request.POST)
  340. if form.is_valid():
  341. # Look up the proposed new owner
  342. email = form.cleaned_data["email"]
  343. try:
  344. membership = project.member_set.filter(user__email=email).get()
  345. except Member.DoesNotExist:
  346. return HttpResponseBadRequest()
  347. # Revoke any previous transfer requests
  348. project.member_set.update(transfer_request_date=None)
  349. # Initiate the new request
  350. membership.transfer_request_date = now()
  351. membership.save()
  352. # Send an email notification
  353. profile = Profile.objects.for_user(membership.user)
  354. profile.send_transfer_request(project)
  355. ctx["transfer_initiated"] = True
  356. ctx["transfer_status"] = "success"
  357. elif "cancel_transfer" in request.POST:
  358. if not is_owner:
  359. return HttpResponseForbidden()
  360. project.member_set.update(transfer_request_date=None)
  361. ctx["transfer_cancelled"] = True
  362. ctx["transfer_status"] = "success"
  363. elif "accept_transfer" in request.POST:
  364. tr = project.transfer_request()
  365. if not tr or tr.user != request.user:
  366. return HttpResponseForbidden()
  367. if not tr.can_accept():
  368. return HttpResponseBadRequest()
  369. with transaction.atomic():
  370. # 1. Reuse the existing membership, and change its user
  371. tr.user = project.owner
  372. tr.transfer_request_date = None
  373. # The previous owner becomes a regular member
  374. # (not readonly, not manager):
  375. tr.role = Member.Role.REGULAR
  376. tr.save()
  377. # 2. Change project's owner
  378. project.owner = request.user
  379. project.save()
  380. ctx["is_owner"] = True
  381. ctx["is_manager"] = True
  382. messages.success(request, "You are now the owner of this project!")
  383. elif "reject_transfer" in request.POST:
  384. tr = project.transfer_request()
  385. if not tr or tr.user != request.user:
  386. return HttpResponseForbidden()
  387. tr.transfer_request_date = None
  388. tr.save()
  389. q = project.member_set.select_related("user").order_by("user__email")
  390. ctx["memberships"] = list(q)
  391. return render(request, "accounts/project.html", ctx)
  392. @login_required
  393. def notifications(request):
  394. profile = request.profile
  395. ctx = {"status": "default", "page": "profile", "profile": profile}
  396. if request.method == "POST":
  397. form = forms.ReportSettingsForm(request.POST)
  398. if form.is_valid():
  399. if form.cleaned_data["tz"]:
  400. profile.tz = form.cleaned_data["tz"]
  401. profile.reports = form.cleaned_data["reports"]
  402. profile.next_report_date = profile.choose_next_report_date()
  403. if profile.nag_period != form.cleaned_data["nag_period"]:
  404. # Set the new nag period
  405. profile.nag_period = form.cleaned_data["nag_period"]
  406. # and update next_nag_date:
  407. if profile.nag_period:
  408. profile.update_next_nag_date()
  409. else:
  410. profile.next_nag_date = None
  411. profile.save()
  412. ctx["status"] = "info"
  413. return render(request, "accounts/notifications.html", ctx)
  414. @login_required
  415. @require_sudo_mode
  416. def set_password(request):
  417. if request.method == "POST":
  418. form = forms.SetPasswordForm(request.POST)
  419. if form.is_valid():
  420. password = form.cleaned_data["password"]
  421. request.user.set_password(password)
  422. request.user.save()
  423. request.profile.token = ""
  424. request.profile.save()
  425. # update the session with the new password hash so that
  426. # the user doesn't get logged out
  427. update_session_auth_hash(request, request.user)
  428. request.session["changed_password"] = True
  429. return redirect("hc-profile")
  430. return render(request, "accounts/set_password.html", {})
  431. @login_required
  432. @require_sudo_mode
  433. def change_email(request):
  434. if request.method == "POST":
  435. form = forms.ChangeEmailForm(request.POST)
  436. if form.is_valid():
  437. request.user.email = form.cleaned_data["email"]
  438. request.user.set_unusable_password()
  439. request.user.save()
  440. request.profile.token = ""
  441. request.profile.save()
  442. return redirect("hc-change-email-done")
  443. else:
  444. form = forms.ChangeEmailForm()
  445. return render(request, "accounts/change_email.html", {"form": form})
  446. def change_email_done(request):
  447. return render(request, "accounts/change_email_done.html")
  448. @csrf_exempt
  449. def unsubscribe_reports(request, signed_username):
  450. # Some email servers open links in emails to check for malicious content.
  451. # To work around this, for GET requests we serve a confirmation form.
  452. # If the signature is more than 5 minutes old, we also include JS code to
  453. # auto-submit the form.
  454. signer = signing.TimestampSigner(salt="reports")
  455. # First, check the signature without looking at the timestamp:
  456. try:
  457. username = signer.unsign(signed_username)
  458. except signing.BadSignature:
  459. return render(request, "bad_link.html")
  460. try:
  461. user = User.objects.get(username=username)
  462. except User.DoesNotExist:
  463. # This is likely an old unsubscribe link, and the user account has already
  464. # been deleted. Show the "Unsubscribed!" page nevertheless.
  465. return render(request, "accounts/unsubscribed.html")
  466. if request.method != "POST":
  467. # Unsign again, now with max_age set,
  468. # to see if the timestamp is older than 5 minutes
  469. try:
  470. autosubmit = False
  471. username = signer.unsign(signed_username, max_age=300)
  472. except signing.SignatureExpired:
  473. autosubmit = True
  474. ctx = {"autosubmit": autosubmit}
  475. return render(request, "accounts/unsubscribe_submit.html", ctx)
  476. profile = Profile.objects.for_user(user)
  477. profile.reports = "off"
  478. profile.next_report_date = None
  479. profile.nag_period = td()
  480. profile.next_nag_date = None
  481. profile.save()
  482. return render(request, "accounts/unsubscribed.html")
  483. @login_required
  484. @require_sudo_mode
  485. def close(request):
  486. user = request.user
  487. if request.method == "POST":
  488. if request.POST.get("confirmation") == request.user.email:
  489. # Cancel their subscription:
  490. sub = Subscription.objects.filter(user=user).first()
  491. if sub:
  492. sub.cancel()
  493. # Deleting user also deletes its profile, checks, channels etc.
  494. user.delete()
  495. request.session.flush()
  496. return redirect("hc-index")
  497. ctx = {}
  498. if "confirmation" in request.POST:
  499. ctx["wrong_confirmation"] = True
  500. return render(request, "accounts/close_account.html", ctx)
  501. @require_POST
  502. @login_required
  503. def remove_project(request, code):
  504. project = get_object_or_404(Project, code=code, owner=request.user)
  505. project.delete()
  506. return redirect("hc-index")
  507. def _get_credential_data(request, form):
  508. """ Complete WebAuthn registration, return binary credential data.
  509. This function is an interface to the fido2 library. It is separated
  510. out so that we don't need to mock ClientData, AttestationObject,
  511. register_complete separately in tests.
  512. """
  513. try:
  514. auth_data = FIDO2_SERVER.register_complete(
  515. request.session["state"],
  516. ClientData(form.cleaned_data["client_data_json"]),
  517. AttestationObject(form.cleaned_data["attestation_object"]),
  518. )
  519. except ValueError:
  520. return None
  521. return auth_data.credential_data
  522. @login_required
  523. @require_sudo_mode
  524. def add_webauthn(request):
  525. if not settings.RP_ID:
  526. return HttpResponse(status=404)
  527. if request.method == "POST":
  528. form = forms.AddWebAuthnForm(request.POST)
  529. if not form.is_valid():
  530. return HttpResponseBadRequest()
  531. credential_data = _get_credential_data(request, form)
  532. if not credential_data:
  533. return HttpResponseBadRequest()
  534. c = Credential(user=request.user)
  535. c.name = form.cleaned_data["name"]
  536. c.data = credential_data
  537. c.save()
  538. request.session["added_credential_name"] = c.name
  539. return redirect("hc-profile")
  540. credentials = [c.unpack() for c in request.user.credentials.all()]
  541. # User handle is used in a username-less authentication, to map a credential
  542. # received from browser with an user account in the database.
  543. # Since we only use security keys as a second factor,
  544. # the user handle is not of much use to us.
  545. #
  546. # The user handle:
  547. # - must not be blank,
  548. # - must not be a constant value,
  549. # - must not contain personally identifiable information.
  550. # So we use random bytes, and don't store them on our end:
  551. options, state = FIDO2_SERVER.register_begin(
  552. {
  553. "id": token_bytes(16),
  554. "name": request.user.email,
  555. "displayName": request.user.email,
  556. },
  557. credentials,
  558. )
  559. request.session["state"] = state
  560. ctx = {"options": base64.b64encode(cbor.encode(options)).decode()}
  561. return render(request, "accounts/add_credential.html", ctx)
  562. @login_required
  563. @require_sudo_mode
  564. def add_totp(request):
  565. if request.profile.totp:
  566. # TOTP is already configured, refuse to continue
  567. return HttpResponseBadRequest()
  568. if "totp_secret" not in request.session:
  569. request.session["totp_secret"] = pyotp.random_base32()
  570. totp = pyotp.totp.TOTP(request.session["totp_secret"])
  571. if request.method == "POST":
  572. form = forms.TotpForm(totp, request.POST)
  573. if form.is_valid():
  574. request.profile.totp = request.session["totp_secret"]
  575. request.profile.totp_created = now()
  576. request.profile.save()
  577. request.session["enabled_totp"] = True
  578. request.session.pop("totp_secret")
  579. return redirect("hc-profile")
  580. else:
  581. form = forms.TotpForm(totp)
  582. uri = totp.provisioning_uri(name=request.user.email, issuer_name=settings.SITE_NAME)
  583. qr_data_uri = segno.make(uri).png_data_uri(scale=8)
  584. ctx = {"form": form, "qr_data_uri": qr_data_uri}
  585. return render(request, "accounts/add_totp.html", ctx)
  586. @login_required
  587. @require_sudo_mode
  588. def remove_totp(request):
  589. if request.method == "POST" and "disable_totp" in request.POST:
  590. request.profile.totp = None
  591. request.profile.totp_created = None
  592. request.profile.save()
  593. request.session["disabled_totp"] = True
  594. return redirect("hc-profile")
  595. ctx = {"is_last": not request.user.credentials.exists()}
  596. return render(request, "accounts/remove_totp.html", ctx)
  597. @login_required
  598. @require_sudo_mode
  599. def remove_credential(request, code):
  600. if not settings.RP_ID:
  601. return HttpResponse(status=404)
  602. try:
  603. credential = Credential.objects.get(user=request.user, code=code)
  604. except Credential.DoesNotExist:
  605. return HttpResponseBadRequest()
  606. if request.method == "POST" and "remove_credential" in request.POST:
  607. request.session["removed_credential_name"] = credential.name
  608. credential.delete()
  609. return redirect("hc-profile")
  610. if request.profile.totp:
  611. is_last = False
  612. else:
  613. is_last = request.user.credentials.count() == 1
  614. ctx = {"credential": credential, "is_last": is_last}
  615. return render(request, "accounts/remove_credential.html", ctx)
  616. def _check_credential(request, form, credentials):
  617. """ Complete WebAuthn authentication, return True on success.
  618. This function is an interface to the fido2 library. It is separated
  619. out so that we don't need to mock ClientData, AuthenticatorData,
  620. authenticate_complete separately in tests.
  621. """
  622. try:
  623. FIDO2_SERVER.authenticate_complete(
  624. request.session["state"],
  625. credentials,
  626. form.cleaned_data["credential_id"],
  627. ClientData(form.cleaned_data["client_data_json"]),
  628. AuthenticatorData(form.cleaned_data["authenticator_data"]),
  629. form.cleaned_data["signature"],
  630. )
  631. except ValueError:
  632. return False
  633. return True
  634. def login_webauthn(request):
  635. # We require RP_ID. Fail predicably if it is not set:
  636. if not settings.RP_ID:
  637. return HttpResponse(status=500)
  638. # Expect an unauthenticated user
  639. if request.user.is_authenticated:
  640. return HttpResponseBadRequest()
  641. if "2fa_user" not in request.session:
  642. return HttpResponseBadRequest()
  643. user_id, email, timestamp = request.session["2fa_user"]
  644. if timestamp + 300 < time.time():
  645. return redirect("hc-login")
  646. try:
  647. user = User.objects.get(id=user_id, email=email)
  648. except User.DoesNotExist:
  649. return HttpResponseBadRequest()
  650. credentials = [c.unpack() for c in user.credentials.all()]
  651. if request.method == "POST":
  652. form = forms.WebAuthnForm(request.POST)
  653. if not form.is_valid():
  654. return HttpResponseBadRequest()
  655. if not _check_credential(request, form, credentials):
  656. return HttpResponseBadRequest()
  657. request.session.pop("state")
  658. request.session.pop("2fa_user")
  659. auth_login(request, user, "hc.accounts.backends.EmailBackend")
  660. return _redirect_after_login(request)
  661. options, state = FIDO2_SERVER.authenticate_begin(credentials)
  662. request.session["state"] = state
  663. totp_url = None
  664. if user.profile.totp:
  665. totp_url = reverse("hc-login-totp")
  666. redirect_url = request.GET.get("next")
  667. if _allow_redirect(redirect_url):
  668. totp_url += "?next=%s" % redirect_url
  669. ctx = {
  670. "options": base64.b64encode(cbor.encode(options)).decode(),
  671. "totp_url": totp_url,
  672. }
  673. return render(request, "accounts/login_webauthn.html", ctx)
  674. def login_totp(request):
  675. # Expect an unauthenticated user
  676. if request.user.is_authenticated:
  677. return HttpResponseBadRequest()
  678. if "2fa_user" not in request.session:
  679. return HttpResponseBadRequest()
  680. user_id, email, timestamp = request.session["2fa_user"]
  681. if timestamp + 300 < time.time():
  682. return redirect("hc-login")
  683. try:
  684. user = User.objects.get(id=user_id, email=email)
  685. except User.DoesNotExist:
  686. return HttpResponseBadRequest()
  687. if not user.profile.totp:
  688. return HttpResponseBadRequest()
  689. totp = pyotp.totp.TOTP(user.profile.totp)
  690. if request.method == "POST":
  691. # To guard against brute-forcing TOTP codes, we allow
  692. # 96 attempts per user per 24h.
  693. if not TokenBucket.authorize_totp_attempt(user):
  694. return render(request, "try_later.html")
  695. form = forms.TotpForm(totp, request.POST)
  696. if form.is_valid():
  697. # We blacklist an used TOTP code for 90 seconds,
  698. # so an attacker cannot reuse a stolen code.
  699. if not TokenBucket.authorize_totp_code(user, form.cleaned_data["code"]):
  700. return render(request, "try_later.html")
  701. request.session.pop("2fa_user")
  702. auth_login(request, user, "hc.accounts.backends.EmailBackend")
  703. return _redirect_after_login(request)
  704. else:
  705. form = forms.TotpForm(totp)
  706. return render(request, "accounts/login_totp.html", {"form": form})
  707. @login_required
  708. def appearance(request):
  709. profile = request.profile
  710. ctx = {
  711. "page": "appearance",
  712. "profile": profile,
  713. "status": "default",
  714. }
  715. if request.method == "POST":
  716. theme = request.POST.get("theme", "")
  717. if theme in ("", "dark"):
  718. profile.theme = theme
  719. profile.save()
  720. ctx["status"] = "info"
  721. return render(request, "accounts/appearance.html", ctx)