You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

545 lines
17 KiB

from datetime import timedelta as td
from urllib.parse import urlparse
import uuid
from django.db import transaction
from django.conf import settings
from django.contrib import messages
from django.contrib.auth import login as auth_login
from django.contrib.auth import logout as auth_logout
from django.contrib.auth import authenticate
from django.contrib.auth.decorators import login_required
from django.contrib.auth.models import User
from django.core import signing
from django.http import (
HttpResponseForbidden,
HttpResponseBadRequest,
HttpResponseNotFound,
)
from django.shortcuts import get_object_or_404, redirect, render
from django.utils.timezone import now
from django.urls import resolve, Resolver404
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_POST
from hc.accounts import forms
from hc.accounts.models import Profile, Project, Member
from hc.api.models import Channel, Check, TokenBucket
from hc.lib.date import choose_next_report_date
from hc.payments.models import Subscription
NEXT_WHITELIST = (
"hc-checks",
"hc-details",
"hc-log",
"hc-p-channels",
"hc-add-slack",
"hc-add-pushover",
"hc-add-telegram",
"hc-project-settings",
)
def _is_whitelisted(redirect_url):
if not redirect_url:
return False
parsed = urlparse(redirect_url)
try:
match = resolve(parsed.path)
except Resolver404:
return False
return match.url_name in NEXT_WHITELIST
def _make_user(email, with_project=True):
username = str(uuid.uuid4())[:30]
user = User(username=username, email=email)
user.set_unusable_password()
user.save()
project = None
if with_project:
project = Project(owner=user)
project.badge_key = user.username
project.save()
check = Check(project=project)
check.name = "My First Check"
check.save()
channel = Channel(project=project)
channel.kind = "email"
channel.value = email
channel.email_verified = True
channel.save()
channel.checks.add(check)
# Ensure a profile gets created
Profile.objects.for_user(user)
return user
def _redirect_after_login(request):
""" Redirect to the URL indicated in ?next= query parameter. """
redirect_url = request.GET.get("next")
if _is_whitelisted(redirect_url):
return redirect(redirect_url)
if request.user.project_set.count() == 1:
project = request.user.project_set.first()
return redirect("hc-checks", project.code)
return redirect("hc-index")
def login(request):
form = forms.PasswordLoginForm()
magic_form = forms.EmailLoginForm()
if request.method == "POST":
if request.POST.get("action") == "login":
form = forms.PasswordLoginForm(request.POST)
if form.is_valid():
auth_login(request, form.user)
return _redirect_after_login(request)
else:
magic_form = forms.EmailLoginForm(request.POST)
if magic_form.is_valid():
redirect_url = request.GET.get("next")
if not _is_whitelisted(redirect_url):
redirect_url = None
profile = Profile.objects.for_user(magic_form.user)
profile.send_instant_login_link(redirect_url=redirect_url)
response = redirect("hc-login-link-sent")
# check_token_submit looks for this cookie to decide if
# it needs to do the extra POST step.
response.set_cookie("auto-login", "1", max_age=300, httponly=True)
return response
bad_link = request.session.pop("bad_link", None)
ctx = {
"page": "login",
"form": form,
"magic_form": magic_form,
"bad_link": bad_link,
"registration_open": settings.REGISTRATION_OPEN,
}
return render(request, "accounts/login.html", ctx)
def logout(request):
auth_logout(request)
return redirect("hc-index")
@require_POST
@csrf_exempt
def signup(request):
if not settings.REGISTRATION_OPEN:
return HttpResponseForbidden()
ctx = {}
form = forms.AvailableEmailForm(request.POST)
if form.is_valid():
email = form.cleaned_data["identity"]
user = _make_user(email)
profile = Profile.objects.for_user(user)
profile.send_instant_login_link()
ctx["created"] = True
else:
ctx = {"form": form}
response = render(request, "accounts/signup_result.html", ctx)
if ctx.get("created"):
response.set_cookie("auto-login", "1", max_age=300, httponly=True)
return response
def login_link_sent(request):
return render(request, "accounts/login_link_sent.html")
def link_sent(request):
return render(request, "accounts/link_sent.html")
def check_token(request, username, token):
if request.user.is_authenticated and request.user.username == username:
# User is already logged in
return _redirect_after_login(request)
# Some email servers open links in emails to check for malicious content.
# To work around this, we sign user in if the method is POST
# *or* if the browser presents a cookie we had set when sending the login link.
#
# If the method is GET, we instead serve a HTML form and a piece
# of Javascript to automatically submit it.
if request.method == "POST" or "auto-login" in request.COOKIES:
user = authenticate(username=username, token=token)
if user is not None and user.is_active:
user.profile.token = ""
user.profile.save()
auth_login(request, user)
return _redirect_after_login(request)
request.session["bad_link"] = True
return redirect("hc-login")
return render(request, "accounts/check_token_submit.html")
@login_required
def profile(request):
profile = request.profile
ctx = {"page": "profile", "profile": profile, "my_projects_status": "default"}
if request.method == "POST":
if "change_email" in request.POST:
profile.send_change_email_link()
return redirect("hc-link-sent")
elif "set_password" in request.POST:
profile.send_set_password_link()
return redirect("hc-link-sent")
elif "leave_project" in request.POST:
code = request.POST["code"]
try:
project = Project.objects.get(code=code, member__user=request.user)
except Project.DoesNotExist:
return HttpResponseBadRequest()
Member.objects.filter(project=project, user=request.user).delete()
ctx["left_project"] = project
ctx["my_projects_status"] = "info"
# Retrieve projects right before rendering the template--
# The list of the projects might have *just* changed
ctx["projects"] = list(profile.projects())
return render(request, "accounts/profile.html", ctx)
@login_required
@require_POST
def add_project(request):
form = forms.ProjectNameForm(request.POST)
if not form.is_valid():
return HttpResponseBadRequest()
project = Project(owner=request.user)
project.code = project.badge_key = str(uuid.uuid4())
project.name = form.cleaned_data["name"]
project.save()
return redirect("hc-checks", project.code)
@login_required
def project(request, code):
if request.user.is_superuser:
q = Project.objects
else:
q = request.profile.projects()
try:
project = q.get(code=code)
except Project.DoesNotExist:
return HttpResponseNotFound()
is_owner = project.owner_id == request.user.id
ctx = {
"page": "project",
"project": project,
"is_owner": is_owner,
"show_api_keys": "show_api_keys" in request.GET,
}
if request.method == "POST":
if "create_api_keys" in request.POST:
project.set_api_keys()
project.save()
ctx["show_api_keys"] = True
ctx["api_keys_created"] = True
ctx["api_status"] = "success"
elif "revoke_api_keys" in request.POST:
project.api_key = ""
project.api_key_readonly = ""
project.save()
ctx["api_keys_revoked"] = True
ctx["api_status"] = "info"
elif "show_api_keys" in request.POST:
ctx["show_api_keys"] = True
elif "invite_team_member" in request.POST:
if not is_owner:
return HttpResponseForbidden()
form = forms.InviteTeamMemberForm(request.POST)
if form.is_valid():
email = form.cleaned_data["email"]
invite_suggestions = project.invite_suggestions()
if not invite_suggestions.filter(email=email).exists():
# We're inviting a new user. Are we within team size limit?
if not project.can_invite_new_users():
return HttpResponseForbidden()
# And are we not hitting a rate limit?
if not TokenBucket.authorize_invite(request.user):
return render(request, "try_later.html")
try:
user = User.objects.get(email=email)
except User.DoesNotExist:
user = _make_user(email, with_project=False)
project.invite(user)
ctx["team_member_invited"] = email
ctx["team_status"] = "success"
elif "remove_team_member" in request.POST:
if not is_owner:
return HttpResponseForbidden()
form = forms.RemoveTeamMemberForm(request.POST)
if form.is_valid():
q = User.objects
q = q.filter(email=form.cleaned_data["email"])
q = q.filter(memberships__project=project)
farewell_user = q.first()
if farewell_user is None:
return HttpResponseBadRequest()
Member.objects.filter(project=project, user=farewell_user).delete()
ctx["team_member_removed"] = form.cleaned_data["email"]
ctx["team_status"] = "info"
elif "set_project_name" in request.POST:
form = forms.ProjectNameForm(request.POST)
if form.is_valid():
project.name = form.cleaned_data["name"]
project.save()
ctx["project_name_updated"] = True
ctx["project_name_status"] = "success"
elif "transfer_project" in request.POST:
if not is_owner:
return HttpResponseForbidden()
form = forms.TransferForm(request.POST)
if form.is_valid():
# Look up the proposed new owner
email = form.cleaned_data["email"]
try:
membership = project.member_set.filter(user__email=email).get()
except Member.DoesNotExist:
return HttpResponseBadRequest()
# Revoke any previous transfer requests
project.member_set.update(transfer_request_date=None)
# Initiate the new request
membership.transfer_request_date = now()
membership.save()
# Send an email notification
profile = Profile.objects.for_user(membership.user)
profile.send_transfer_request(project)
ctx["transfer_initiated"] = True
ctx["transfer_status"] = "success"
elif "cancel_transfer" in request.POST:
if not is_owner:
return HttpResponseForbidden()
project.member_set.update(transfer_request_date=None)
ctx["transfer_cancelled"] = True
ctx["transfer_status"] = "success"
elif "accept_transfer" in request.POST:
tr = project.transfer_request()
if not tr or tr.user != request.user:
return HttpResponseForbidden()
if not tr.can_accept():
return HttpResponseBadRequest()
with transaction.atomic():
# 1. Reuse the existing membership, and change its user
tr.user = project.owner
tr.transfer_request_date = None
tr.save()
# 2. Change project's owner
project.owner = request.user
project.save()
ctx["is_owner"] = True
messages.success(request, "You are now the owner of this project!")
elif "reject_transfer" in request.POST:
tr = project.transfer_request()
if not tr or tr.user != request.user:
return HttpResponseForbidden()
tr.transfer_request_date = None
tr.save()
return render(request, "accounts/project.html", ctx)
@login_required
def notifications(request):
profile = request.profile
ctx = {"status": "default", "page": "profile", "profile": profile}
if request.method == "POST":
form = forms.ReportSettingsForm(request.POST)
if form.is_valid():
if profile.reports_allowed != form.cleaned_data["reports_allowed"]:
profile.reports_allowed = form.cleaned_data["reports_allowed"]
if profile.reports_allowed:
profile.next_report_date = choose_next_report_date()
else:
profile.next_report_date = None
if profile.nag_period != form.cleaned_data["nag_period"]:
# Set the new nag period
profile.nag_period = form.cleaned_data["nag_period"]
# and schedule next_nag_date:
if profile.nag_period:
profile.next_nag_date = now() + profile.nag_period
else:
profile.next_nag_date = None
profile.save()
ctx["status"] = "info"
return render(request, "accounts/notifications.html", ctx)
@login_required
def set_password(request, token):
if not request.profile.check_token(token, "set-password"):
return HttpResponseBadRequest()
if request.method == "POST":
form = forms.SetPasswordForm(request.POST)
if form.is_valid():
password = form.cleaned_data["password"]
request.user.set_password(password)
request.user.save()
request.profile.token = ""
request.profile.save()
# Setting a password logs the user out, so here we
# log them back in.
u = authenticate(username=request.user.email, password=password)
auth_login(request, u)
messages.success(request, "Your password has been set!")
return redirect("hc-profile")
return render(request, "accounts/set_password.html", {})
@login_required
def change_email(request, token):
if not request.profile.check_token(token, "change-email"):
return HttpResponseBadRequest()
if request.method == "POST":
form = forms.ChangeEmailForm(request.POST)
if form.is_valid():
request.user.email = form.cleaned_data["email"]
request.user.set_unusable_password()
request.user.save()
request.profile.token = ""
request.profile.save()
return redirect("hc-change-email-done")
else:
form = forms.ChangeEmailForm()
return render(request, "accounts/change_email.html", {"form": form})
def change_email_done(request):
return render(request, "accounts/change_email_done.html")
@csrf_exempt
def unsubscribe_reports(request, signed_username):
# Some email servers open links in emails to check for malicious content.
# To work around this, for GET requests we serve a confirmation form.
# If the signature is more than 5 minutes old, we also include JS code to
# auto-submit the form.
ctx = {}
signer = signing.TimestampSigner(salt="reports")
# First, check the signature without looking at the timestamp:
try:
username = signer.unsign(signed_username)
except signing.BadSignature:
return render(request, "bad_link.html")
# Check if timestamp is older than 5 minutes:
try:
username = signer.unsign(signed_username, max_age=300)
except signing.SignatureExpired:
ctx["autosubmit"] = True
if request.method != "POST":
return render(request, "accounts/unsubscribe_submit.html", ctx)
user = User.objects.get(username=username)
profile = Profile.objects.for_user(user)
profile.reports_allowed = False
profile.next_report_date = None
profile.nag_period = td()
profile.next_nag_date = None
profile.save()
return render(request, "accounts/unsubscribed.html")
@require_POST
@login_required
def close(request):
user = request.user
# Cancel their subscription:
sub = Subscription.objects.filter(user=user).first()
if sub:
sub.cancel()
user.delete()
# Deleting user also deletes its profile, checks, channels etc.
request.session.flush()
return redirect("hc-index")
@require_POST
@login_required
def remove_project(request, code):
project = get_object_or_404(Project, code=code, owner=request.user)
project.delete()
return redirect("hc-index")