You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

450 lines
14 KiB

from datetime import timedelta as td
import uuid
import re
from django.conf import settings
from django.contrib import messages
from django.contrib.auth import login as auth_login
from django.contrib.auth import logout as auth_logout
from django.contrib.auth import authenticate
from django.contrib.auth.decorators import login_required
from django.contrib.auth.models import User
from django.core import signing
from django.http import HttpResponseForbidden, HttpResponseBadRequest
from django.shortcuts import redirect, render
from django.utils.timezone import now
from django.urls import resolve, Resolver404
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_POST
from hc.accounts.forms import (ChangeEmailForm, EmailPasswordForm,
InviteTeamMemberForm, RemoveTeamMemberForm,
ReportSettingsForm, SetPasswordForm,
TeamNameForm, AvailableEmailForm,
ExistingEmailForm)
from hc.accounts.models import Profile, Member
from hc.api.models import Channel, Check
from hc.lib.badges import get_badge_url
from hc.payments.models import Subscription
NEXT_WHITELIST = ("hc-checks",
"hc-details",
"hc-log",
"hc-channels",
"hc-add-slack",
"hc-add-pushover")
def _is_whitelisted(path):
try:
match = resolve(path)
except Resolver404:
return False
return match.url_name in NEXT_WHITELIST
def _make_user(email):
username = str(uuid.uuid4())[:30]
user = User(username=username, email=email)
user.set_unusable_password()
user.save()
# Ensure a profile gets created
Profile.objects.for_user(user)
check = Check(user=user)
check.name = "My First Check"
check.save()
channel = Channel(user=user)
channel.kind = "email"
channel.value = email
channel.email_verified = True
channel.save()
channel.checks.add(check)
return user
def _ensure_own_team(request):
""" Make sure user is switched to their own team. """
if request.team != request.profile:
request.team = request.profile
request.profile.current_team = request.profile
request.profile.save()
def _redirect_after_login(request):
""" Redirect to the URL indicated in ?next= query parameter. """
redirect_url = request.GET.get("next")
if _is_whitelisted(redirect_url):
return redirect(redirect_url)
return redirect("hc-checks")
def login(request):
form = EmailPasswordForm()
magic_form = ExistingEmailForm()
if request.method == 'POST':
if request.POST.get("action") == "login":
form = EmailPasswordForm(request.POST)
if form.is_valid():
auth_login(request, form.user)
return _redirect_after_login(request)
else:
magic_form = ExistingEmailForm(request.POST)
if magic_form.is_valid():
profile = Profile.objects.for_user(magic_form.user)
redirect_url = request.GET.get("next")
if _is_whitelisted(redirect_url):
profile.send_instant_login_link(redirect_url=redirect_url)
else:
profile.send_instant_login_link()
return redirect("hc-login-link-sent")
bad_link = request.session.pop("bad_link", None)
ctx = {
"page": "login",
"form": form,
"magic_form": magic_form,
"bad_link": bad_link
}
return render(request, "accounts/login.html", ctx)
def logout(request):
auth_logout(request)
return redirect("hc-index")
@require_POST
def signup(request):
if not settings.REGISTRATION_OPEN:
return HttpResponseForbidden()
ctx = {}
form = AvailableEmailForm(request.POST)
if form.is_valid():
email = form.cleaned_data["identity"]
user = _make_user(email)
profile = Profile.objects.for_user(user)
profile.send_instant_login_link()
ctx["created"] = True
else:
ctx = {"form": form}
return render(request, "accounts/signup_result.html", ctx)
def login_link_sent(request):
return render(request, "accounts/login_link_sent.html")
def link_sent(request):
return render(request, "accounts/link_sent.html")
def check_token(request, username, token):
if request.user.is_authenticated and request.user.username == username:
# User is already logged in
return _redirect_after_login(request)
# Some email servers open links in emails to check for malicious content.
# To work around this, we sign user in if the method is POST.
#
# If the method is GET, we instead serve a HTML form and a piece
# of Javascript to automatically submit it.
if request.method == "POST":
user = authenticate(username=username, token=token)
if user is not None and user.is_active:
user.profile.token = ""
user.profile.save()
auth_login(request, user)
return _redirect_after_login(request)
request.session["bad_link"] = True
return redirect("hc-login")
return render(request, "accounts/check_token_submit.html")
@login_required
def profile(request):
_ensure_own_team(request)
profile = request.profile
ctx = {
"page": "profile",
"profile": profile,
"show_api_keys": False,
"api_status": "default",
"team_status": "default"
}
if request.method == "POST":
if "change_email" in request.POST:
profile.send_change_email_link()
return redirect("hc-link-sent")
elif "set_password" in request.POST:
profile.send_set_password_link()
return redirect("hc-link-sent")
elif "create_api_keys" in request.POST:
profile.set_api_keys()
ctx["show_api_keys"] = True
ctx["api_keys_created"] = True
ctx["api_status"] = "success"
elif "revoke_api_keys" in request.POST:
profile.api_key_id = ""
profile.api_key = ""
profile.api_key_readonly = ""
profile.save()
ctx["api_keys_revoked"] = True
ctx["api_status"] = "info"
elif "show_api_keys" in request.POST:
ctx["show_api_keys"] = True
elif "invite_team_member" in request.POST:
if not profile.can_invite():
return HttpResponseForbidden()
form = InviteTeamMemberForm(request.POST)
if form.is_valid():
email = form.cleaned_data["email"]
try:
user = User.objects.get(email=email)
except User.DoesNotExist:
user = _make_user(email)
profile.invite(user)
ctx["team_member_invited"] = email
ctx["team_status"] = "success"
elif "remove_team_member" in request.POST:
form = RemoveTeamMemberForm(request.POST)
if form.is_valid():
email = form.cleaned_data["email"]
farewell_user = User.objects.get(email=email)
farewell_user.profile.current_team = None
farewell_user.profile.save()
Member.objects.filter(team=profile,
user=farewell_user).delete()
ctx["team_member_removed"] = email
ctx["team_status"] = "info"
elif "set_team_name" in request.POST:
form = TeamNameForm(request.POST)
if form.is_valid():
profile.team_name = form.cleaned_data["team_name"]
profile.save()
ctx["team_name_updated"] = True
ctx["team_status"] = "success"
return render(request, "accounts/profile.html", ctx)
@login_required
def notifications(request):
_ensure_own_team(request)
profile = request.profile
ctx = {
"status": "default",
"page": "profile",
"profile": profile
}
if request.method == "POST":
form = ReportSettingsForm(request.POST)
if form.is_valid():
if profile.reports_allowed != form.cleaned_data["reports_allowed"]:
profile.reports_allowed = form.cleaned_data["reports_allowed"]
if profile.reports_allowed:
profile.next_report_date = now() + td(days=30)
else:
profile.next_report_date = None
if profile.nag_period != form.cleaned_data["nag_period"]:
# Set the new nag period
profile.nag_period = form.cleaned_data["nag_period"]
# and schedule next_nag_date:
if profile.nag_period:
profile.next_nag_date = now() + profile.nag_period
else:
profile.next_nag_date = None
profile.save()
ctx["status"] = "info"
return render(request, "accounts/notifications.html", ctx)
@login_required
def badges(request):
_ensure_own_team(request)
teams = [request.profile]
for membership in request.user.memberships.all():
teams.append(membership.team)
badge_sets = []
for team in teams:
tags = set()
for check in Check.objects.filter(user=team.user):
tags.update(check.tags_list())
sorted_tags = sorted(tags, key=lambda s: s.lower())
sorted_tags.append("*") # For the "overall status" badge
urls = []
username = team.user.username
for tag in sorted_tags:
if not re.match("^[\w-]+$", tag) and tag != "*":
continue
urls.append({
"svg": get_badge_url(username, tag),
"json": get_badge_url(username, tag, format="json"),
})
badge_sets.append({"team": team, "urls": urls})
ctx = {
"page": "profile",
"badges": badge_sets
}
return render(request, "accounts/badges.html", ctx)
@login_required
def set_password(request, token):
if not request.profile.check_token(token, "set-password"):
return HttpResponseBadRequest()
if request.method == "POST":
form = SetPasswordForm(request.POST)
if form.is_valid():
password = form.cleaned_data["password"]
request.user.set_password(password)
request.user.save()
request.profile.token = ""
request.profile.save()
# Setting a password logs the user out, so here we
# log them back in.
u = authenticate(username=request.user.email, password=password)
auth_login(request, u)
messages.success(request, "Your password has been set!")
return redirect("hc-profile")
return render(request, "accounts/set_password.html", {})
@login_required
def change_email(request, token):
if not request.profile.check_token(token, "change-email"):
return HttpResponseBadRequest()
if request.method == "POST":
form = ChangeEmailForm(request.POST)
if form.is_valid():
request.user.email = form.cleaned_data["email"]
request.user.set_unusable_password()
request.user.save()
request.profile.token = ""
request.profile.save()
return redirect("hc-change-email-done")
else:
form = ChangeEmailForm()
return render(request, "accounts/change_email.html", {"form": form})
def change_email_done(request):
return render(request, "accounts/change_email_done.html")
@csrf_exempt
def unsubscribe_reports(request, username):
signer = signing.TimestampSigner(salt="reports")
try:
username = signer.unsign(username)
except signing.BadSignature:
return render(request, "bad_link.html")
# Some email servers open links in emails to check for malicious content.
# To work around this, we serve a form that auto-submits with JS.
if "ask" in request.GET and request.method != "POST":
return render(request, "accounts/unsubscribe_submit.html")
user = User.objects.get(username=username)
profile = Profile.objects.for_user(user)
profile.reports_allowed = False
profile.next_report_date = None
profile.nag_period = td()
profile.next_nag_date = None
profile.save()
return render(request, "accounts/unsubscribed.html")
@login_required
def switch_team(request, target_username):
try:
target_team = Profile.objects.get(user__username=target_username)
except Profile.DoesNotExist:
return HttpResponseForbidden()
# The rules:
# Superuser can switch to any team.
access_ok = request.user.is_superuser
# Users can switch to their own teams.
if not access_ok and target_team == request.profile:
access_ok = True
# Users can switch to teams they are members of.
if not access_ok:
access_ok = request.user.memberships.filter(team=target_team).exists()
if not access_ok:
return HttpResponseForbidden()
request.profile.current_team = target_team
request.profile.save()
return redirect("hc-checks")
@require_POST
@login_required
def close(request):
user = request.user
# Subscription needs to be canceled before it is deleted:
sub = Subscription.objects.filter(user=user).first()
if sub:
sub.cancel()
user.delete()
# Deleting user also deletes its profile, checks, channels etc.
request.session.flush()
return redirect("hc-index")