You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

2071 lines
64 KiB

from datetime import datetime, timedelta as td
import email
import json
import os
import re
from secrets import token_urlsafe
from urllib.parse import urlencode
from cron_descriptor import ExpressionDescriptor
from croniter import croniter
from django.conf import settings
from django.contrib import messages
from django.contrib.auth.decorators import login_required
from django.core import signing
from django.core.exceptions import PermissionDenied
from django.db.models import Count, F
from django.http import (
Http404,
HttpResponse,
HttpResponseBadRequest,
HttpResponseForbidden,
JsonResponse,
)
from django.shortcuts import get_object_or_404, redirect, render
from django.template.loader import get_template, render_to_string
from django.urls import reverse
from django.utils import timezone
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_POST
from hc.accounts.models import Project, Member
from hc.api.models import (
DEFAULT_GRACE,
DEFAULT_TIMEOUT,
MAX_DELTA,
Channel,
Check,
Ping,
Notification,
)
from hc.api.transports import Telegram
from hc.front.decorators import require_setting
from hc.front import forms
from hc.front.schemas import telegram_callback
from hc.front.templatetags.hc_extras import (
num_down_title,
down_title,
sortchecks,
site_hostname,
site_scheme,
)
from hc.lib import jsonschema
from hc.lib.badges import get_badge_url
import pytz
from pytz.exceptions import UnknownTimeZoneError
import requests
VALID_SORT_VALUES = ("name", "-name", "last_ping", "-last_ping", "created")
STATUS_TEXT_TMPL = get_template("front/log_status_text.html")
LAST_PING_TMPL = get_template("front/last_ping_cell.html")
EVENTS_TMPL = get_template("front/details_events.html")
DOWNTIMES_TMPL = get_template("front/details_downtimes.html")
def _tags_statuses(checks):
tags, down, grace, num_down = {}, {}, {}, 0
for check in checks:
status = check.get_status()
if status == "down":
num_down += 1
for tag in check.tags_list():
down[tag] = "down"
elif status == "grace":
for tag in check.tags_list():
grace[tag] = "grace"
else:
for tag in check.tags_list():
tags[tag] = "up"
tags.update(grace)
tags.update(down)
return tags, num_down
def _get_check_for_user(request, code):
""" Return specified check if current user has access to it. """
assert request.user.is_authenticated
check = get_object_or_404(Check.objects.select_related("project"), code=code)
if request.user.is_superuser:
return check, True
if request.user.id == check.project.owner_id:
return check, True
membership = get_object_or_404(Member, project=check.project, user=request.user)
return check, membership.is_rw
def _get_rw_check_for_user(request, code):
check, rw = _get_check_for_user(request, code)
if not rw:
raise PermissionDenied
return check
def _get_channel_for_user(request, code):
""" Return specified channel if current user has access to it. """
assert request.user.is_authenticated
channel = get_object_or_404(Channel.objects.select_related("project"), code=code)
if request.user.is_superuser:
return channel, True
if request.user.id == channel.project.owner_id:
return channel, True
membership = get_object_or_404(Member, project=channel.project, user=request.user)
return channel, membership.is_rw
def _get_rw_channel_for_user(request, code):
channel, rw = _get_channel_for_user(request, code)
if not rw:
raise PermissionDenied
return channel
def _get_project_for_user(request, project_code):
""" Check access, return (project, rw) tuple. """
project = get_object_or_404(Project, code=project_code)
if request.user.is_superuser:
return project, True
if request.user.id == project.owner_id:
return project, True
membership = get_object_or_404(Member, project=project, user=request.user)
return project, membership.is_rw
def _get_rw_project_for_user(request, project_code):
""" Check access, return (project, rw) tuple. """
project, rw = _get_project_for_user(request, project_code)
if not rw:
raise PermissionDenied
return project
def _refresh_last_active_date(profile):
""" Update last_active_date if it is more than a day old. """
now = timezone.now()
if profile.last_active_date is None or (now - profile.last_active_date).days > 0:
profile.last_active_date = now
profile.save()
@login_required
def my_checks(request, code):
_refresh_last_active_date(request.profile)
project, rw = _get_project_for_user(request, code)
if request.GET.get("sort") in VALID_SORT_VALUES:
request.profile.sort = request.GET["sort"]
request.profile.save()
if request.GET.get("urls") in ("uuid", "slug") and rw:
project.show_slugs = request.GET["urls"] == "slug"
project.save()
if request.session.get("last_project_id") != project.id:
request.session["last_project_id"] = project.id
q = Check.objects.filter(project=project)
q = q.select_related("project")
checks = list(q.prefetch_related("channel_set"))
sortchecks(checks, request.profile.sort)
tags_statuses, num_down = _tags_statuses(checks)
pairs = list(tags_statuses.items())
pairs.sort(key=lambda pair: pair[0].lower())
channels = Channel.objects.filter(project=project)
channels = list(channels.order_by("created"))
hidden_checks = set()
# Hide checks that don't match selected tags:
selected_tags = set(request.GET.getlist("tag", []))
if selected_tags:
for check in checks:
if not selected_tags.issubset(check.tags_list()):
hidden_checks.add(check)
# Hide checks that don't match the search string:
search = request.GET.get("search", "")
if search:
for check in checks:
search_key = "%s\n%s" % (check.name.lower(), check.code)
if search not in search_key:
hidden_checks.add(check)
# Do we need to show the "Last Duration" header?
show_last_duration = False
for check in checks:
if check.clamped_last_duration():
show_last_duration = True
break
ctx = {
"page": "checks",
"rw": rw,
"checks": checks,
"channels": channels,
"num_down": num_down,
"tags": pairs,
"ping_endpoint": settings.PING_ENDPOINT,
"timezones": pytz.all_timezones,
"project": project,
"num_available": project.num_checks_available(),
"sort": request.profile.sort,
"selected_tags": selected_tags,
"search": search,
"hidden_checks": hidden_checks,
"show_last_duration": show_last_duration,
}
return render(request, "front/my_checks.html", ctx)
@login_required
def status(request, code):
_get_project_for_user(request, code)
checks = list(Check.objects.filter(project__code=code))
details = []
for check in checks:
ctx = {"check": check}
details.append(
{
"code": str(check.code),
"status": check.get_status(),
"last_ping": LAST_PING_TMPL.render(ctx).strip(),
"started": check.last_start is not None,
}
)
tags_statuses, num_down = _tags_statuses(checks)
return JsonResponse(
{"details": details, "tags": tags_statuses, "title": num_down_title(num_down)}
)
@login_required
@require_POST
def switch_channel(request, code, channel_code):
check = _get_rw_check_for_user(request, code)
channel = get_object_or_404(Channel, code=channel_code)
if channel.project_id != check.project_id:
return HttpResponseBadRequest()
if request.POST.get("state") == "on":
channel.checks.add(check)
else:
channel.checks.remove(check)
return HttpResponse()
def index(request):
if request.user.is_authenticated:
project_ids = request.profile.projects().values("id")
q = Project.objects.filter(id__in=project_ids)
q = q.annotate(n_checks=Count("check", distinct=True))
q = q.annotate(n_channels=Count("channel", distinct=True))
q = q.annotate(owner_email=F("owner__email"))
projects = list(q)
# Primary sort key: projects with overall_status=down go first
# Secondary sort key: project's name
projects.sort(key=lambda p: (p.overall_status() != "down", p.name))
ctx = {
"page": "projects",
"projects": projects,
"last_project_id": request.session.get("last_project_id"),
}
return render(request, "front/projects.html", ctx)
check = Check()
ctx = {
"page": "welcome",
"check": check,
"ping_url": check.url(),
"enable_apprise": settings.APPRISE_ENABLED is True,
"enable_call": settings.TWILIO_AUTH is not None,
"enable_discord": settings.DISCORD_CLIENT_ID is not None,
"enable_linenotify": settings.LINENOTIFY_CLIENT_ID is not None,
"enable_matrix": settings.MATRIX_ACCESS_TOKEN is not None,
"enable_mattermost": settings.MATTERMOST_ENABLED is True,
"enable_msteams": settings.MSTEAMS_ENABLED is True,
"enable_opsgenie": settings.OPSGENIE_ENABLED is True,
"enable_pagertree": settings.PAGERTREE_ENABLED is True,
"enable_pd": settings.PD_ENABLED is True,
"enable_pd_simple": settings.PD_APP_ID is not None,
"enable_prometheus": settings.PROMETHEUS_ENABLED is True,
"enable_pushbullet": settings.PUSHBULLET_CLIENT_ID is not None,
"enable_pushover": settings.PUSHOVER_API_TOKEN is not None,
"enable_shell": settings.SHELL_ENABLED is True,
"enable_signal": settings.SIGNAL_CLI_ENABLED is True,
"enable_slack": settings.SLACK_ENABLED is True,
"enable_slack_btn": settings.SLACK_CLIENT_ID is not None,
"enable_sms": settings.TWILIO_AUTH is not None,
"enable_spike": settings.SPIKE_ENABLED is True,
"enable_telegram": settings.TELEGRAM_TOKEN is not None,
"enable_trello": settings.TRELLO_APP_KEY is not None,
"enable_victorops": settings.VICTOROPS_ENABLED is True,
"enable_webhooks": settings.WEBHOOKS_ENABLED is True,
"enable_whatsapp": settings.TWILIO_USE_WHATSAPP,
"enable_zulip": settings.ZULIP_ENABLED is True,
"registration_open": settings.REGISTRATION_OPEN,
}
return render(request, "front/welcome.html", ctx)
def dashboard(request):
return render(request, "front/dashboard.html", {})
def serve_doc(request, doc="introduction"):
# Filenames in /templates/docs/ consist of lowercase letters and underscores,
# -- make sure we don't accept anything else
if not re.match(r"^[a-z_]+$", doc):
raise Http404("not found")
path = os.path.join(settings.BASE_DIR, "templates/docs", doc + ".html")
if not os.path.exists(path):
raise Http404("not found")
content = open(path, "r", encoding="utf-8").read()
if not doc.startswith("self_hosted"):
replaces = {
"{{ default_timeout }}": str(int(DEFAULT_TIMEOUT.total_seconds())),
"{{ default_grace }}": str(int(DEFAULT_GRACE.total_seconds())),
"SITE_NAME": settings.SITE_NAME,
"SITE_ROOT": settings.SITE_ROOT,
"SITE_HOSTNAME": site_hostname(),
"SITE_SCHEME": site_scheme(),
"PING_ENDPOINT": settings.PING_ENDPOINT,
"PING_URL": settings.PING_ENDPOINT + "your-uuid-here",
"IMG_URL": os.path.join(settings.STATIC_URL, "img/docs"),
}
for placeholder, value in replaces.items():
content = content.replace(placeholder, value)
ctx = {
"page": "docs",
"section": doc,
"content": content,
"first_line": content.split("\n")[0],
}
return render(request, "front/docs_single.html", ctx)
def docs_cron(request):
return render(request, "front/docs_cron.html", {})
@require_POST
@login_required
def add_check(request, code):
project = _get_rw_project_for_user(request, code)
if project.num_checks_available() <= 0:
return HttpResponseBadRequest()
check = Check(project=project)
check.save()
check.assign_all_channels()
url = reverse("hc-details", args=[check.code])
return redirect(url + "?new")
@require_POST
@login_required
def update_name(request, code):
check = _get_rw_check_for_user(request, code)
form = forms.NameTagsForm(request.POST)
if form.is_valid():
check.set_name_slug(form.cleaned_data["name"])
check.tags = form.cleaned_data["tags"]
check.desc = form.cleaned_data["desc"]
check.save()
if "/details/" in request.META.get("HTTP_REFERER", ""):
return redirect("hc-details", code)
return redirect("hc-checks", check.project.code)
@require_POST
@login_required
def filtering_rules(request, code):
check = _get_rw_check_for_user(request, code)
form = forms.FilteringRulesForm(request.POST)
if form.is_valid():
check.subject = form.cleaned_data["subject"]
check.subject_fail = form.cleaned_data["subject_fail"]
check.methods = form.cleaned_data["methods"]
check.manual_resume = form.cleaned_data["manual_resume"]
check.save()
return redirect("hc-details", code)
@require_POST
@login_required
def update_timeout(request, code):
check = _get_rw_check_for_user(request, code)
kind = request.POST.get("kind")
if kind == "simple":
form = forms.TimeoutForm(request.POST)
if not form.is_valid():
return HttpResponseBadRequest()
check.kind = "simple"
check.timeout = form.cleaned_data["timeout"]
check.grace = form.cleaned_data["grace"]
elif kind == "cron":
form = forms.CronForm(request.POST)
if not form.is_valid():
return HttpResponseBadRequest()
check.kind = "cron"
check.schedule = form.cleaned_data["schedule"]
check.tz = form.cleaned_data["tz"]
check.grace = td(minutes=form.cleaned_data["grace"])
check.alert_after = check.going_down_after()
if check.status == "up" and check.alert_after < timezone.now():
# Checks can flip from "up" to "down" state as a result of changing check's
# schedule. We don't want to send notifications when changing schedule
# interactively in the web UI. So we update the `alert_after` and `status`
# fields here the same way as `sendalerts` would do, but without sending
# an actual alert:
check.alert_after = None
check.status = "down"
check.save()
if "/details/" in request.META.get("HTTP_REFERER", ""):
return redirect("hc-details", code)
return redirect("hc-checks", check.project.code)
@require_POST
def cron_preview(request):
schedule = request.POST.get("schedule", "")
tz = request.POST.get("tz")
ctx = {"tz": tz, "dates": []}
try:
zone = pytz.timezone(tz)
now_local = timezone.localtime(timezone.now(), zone)
if len(schedule.split()) != 5:
raise ValueError()
it = croniter(schedule, now_local)
for i in range(0, 6):
ctx["dates"].append(it.get_next(datetime))
except UnknownTimeZoneError:
ctx["bad_tz"] = True
except:
ctx["bad_schedule"] = True
if ctx["dates"]:
try:
descriptor = ExpressionDescriptor(schedule, use_24hour_time_format=True)
ctx["desc"] = descriptor.get_description()
except:
# We assume the schedule is valid if croniter accepts it.
# If cron-descriptor throws an exception, don't show the description
# to the user.
pass
return render(request, "front/cron_preview.html", ctx)
@login_required
def ping_details(request, code, n=None):
check, rw = _get_check_for_user(request, code)
q = Ping.objects.filter(owner=check)
if n:
q = q.filter(n=n)
try:
ping = q.latest("created")
except Ping.DoesNotExist:
return render(request, "front/ping_details_not_found.html")
ctx = {"check": check, "ping": ping, "plain": None, "html": None}
if ping.scheme == "email":
parsed = email.message_from_string(ping.body, policy=email.policy.SMTP)
ctx["subject"] = parsed.get("subject", "")
plain_mime_part = parsed.get_body(("plain",))
if plain_mime_part:
ctx["plain"] = plain_mime_part.get_content()
html_mime_part = parsed.get_body(("html",))
if html_mime_part:
ctx["html"] = html_mime_part.get_content()
return render(request, "front/ping_details.html", ctx)
@require_POST
@login_required
def pause(request, code):
check = _get_rw_check_for_user(request, code)
check.status = "paused"
check.last_start = None
check.alert_after = None
check.save()
# After pausing a check we must check if all checks are up,
# and Profile.next_nag_date needs to be cleared out:
check.project.update_next_nag_dates()
# Don't redirect after an AJAX request:
if request.META.get("HTTP_X_REQUESTED_WITH") == "XMLHttpRequest":
return HttpResponse()
return redirect("hc-details", code)
@require_POST
@login_required
def resume(request, code):
check = _get_rw_check_for_user(request, code)
check.status = "new"
check.last_start = None
check.last_ping = None
check.alert_after = None
check.save()
return redirect("hc-details", code)
@require_POST
@login_required
def remove_check(request, code):
check = _get_rw_check_for_user(request, code)
project = check.project
check.delete()
return redirect("hc-checks", project.code)
def _get_events(check, limit):
pings = Ping.objects.filter(owner=check).order_by("-id")[:limit]
pings = list(pings)
prev = None
for ping in reversed(pings):
if ping.kind != "start" and prev and prev.kind == "start":
delta = ping.created - prev.created
if delta < MAX_DELTA:
setattr(ping, "delta", delta)
prev = ping
alerts = []
if len(pings):
cutoff = pings[-1].created
alerts = Notification.objects.select_related("channel").filter(
owner=check, check_status="down", created__gt=cutoff
)
events = pings + list(alerts)
events.sort(key=lambda el: el.created, reverse=True)
return events
@login_required
def log(request, code):
check, rw = _get_check_for_user(request, code)
limit = check.project.owner_profile.ping_log_limit
ctx = {
"project": check.project,
"check": check,
"events": _get_events(check, limit),
"limit": limit,
"show_limit_notice": check.n_pings > limit and settings.USE_PAYMENTS,
}
return render(request, "front/log.html", ctx)
@login_required
def details(request, code):
_refresh_last_active_date(request.profile)
check, rw = _get_check_for_user(request, code)
channels = Channel.objects.filter(project=check.project)
channels = list(channels.order_by("created"))
all_tags = set()
q = Check.objects.filter(project=check.project).exclude(tags="")
for tags in q.values_list("tags", flat=True):
all_tags.update(tags.split(" "))
ctx = {
"page": "details",
"project": check.project,
"check": check,
"rw": rw,
"channels": channels,
"enabled_channels": list(check.channel_set.all()),
"timezones": pytz.all_timezones,
"downtimes": check.downtimes(months=3),
"is_new": "new" in request.GET,
"is_copied": "copied" in request.GET,
"all_tags": " ".join(sorted(all_tags)),
}
return render(request, "front/details.html", ctx)
@login_required
def uncloak(request, unique_key):
for check in request.profile.checks_from_all_projects().only("code"):
if check.unique_key == unique_key:
return redirect("hc-details", check.code)
raise Http404("not found")
@login_required
def transfer(request, code):
check = _get_rw_check_for_user(request, code)
if request.method == "POST":
target_project = _get_rw_project_for_user(request, request.POST["project"])
if target_project.num_checks_available() <= 0:
return HttpResponseBadRequest()
check.project = target_project
check.save()
check.assign_all_channels()
messages.success(request, "Check transferred successfully!")
return redirect("hc-details", code)
ctx = {"check": check}
return render(request, "front/transfer_modal.html", ctx)
@require_POST
@login_required
def copy(request, code):
check = _get_rw_check_for_user(request, code)
if check.project.num_checks_available() <= 0:
return HttpResponseBadRequest()
new_name = check.name + " (copy)"
# Make sure we don't exceed the 100 character db field limit:
if len(new_name) > 100:
new_name = check.name[:90] + "... (copy)"
copied = Check(project=check.project)
copied.set_name_slug(new_name)
copied.desc, copied.tags = check.desc, check.tags
copied.subject, copied.subject_fail = check.subject, check.subject_fail
copied.methods = check.methods
copied.manual_resume = check.manual_resume
copied.kind = check.kind
copied.timeout, copied.grace = check.timeout, check.grace
copied.schedule, copied.tz = check.schedule, check.tz
copied.save()
copied.channel_set.add(*check.channel_set.all())
url = reverse("hc-details", args=[copied.code])
return redirect(url + "?copied")
@login_required
def status_single(request, code):
check, rw = _get_check_for_user(request, code)
status = check.get_status()
events = _get_events(check, 20)
updated = "1"
if len(events):
updated = str(events[0].created.timestamp())
doc = {
"status": status,
"status_text": STATUS_TEXT_TMPL.render({"check": check, "rw": rw}),
"title": down_title(check),
"updated": updated,
}
if updated != request.GET.get("u"):
doc["events"] = EVENTS_TMPL.render({"check": check, "events": events})
doc["downtimes"] = DOWNTIMES_TMPL.render({"downtimes": check.downtimes(3)})
return JsonResponse(doc)
@login_required
def badges(request, code):
project, rw = _get_project_for_user(request, code)
tags = set()
for check in Check.objects.filter(project=project):
tags.update(check.tags_list())
sorted_tags = sorted(tags, key=lambda s: s.lower())
sorted_tags.append("*") # For the "overall status" badge
key = project.badge_key
urls = []
for tag in sorted_tags:
urls.append(
{
"tag": tag,
"svg": get_badge_url(key, tag),
"svg3": get_badge_url(key, tag, with_late=True),
"json": get_badge_url(key, tag, fmt="json"),
"json3": get_badge_url(key, tag, fmt="json", with_late=True),
"shields": get_badge_url(key, tag, fmt="shields"),
"shields3": get_badge_url(key, tag, fmt="shields", with_late=True),
}
)
ctx = {
"have_tags": len(urls) > 1,
"page": "badges",
"project": project,
"badges": urls,
}
return render(request, "front/badges.html", ctx)
@login_required
def channels(request, code):
project, rw = _get_project_for_user(request, code)
if request.method == "POST":
if not rw:
return HttpResponseForbidden()
code = request.POST["channel"]
try:
channel = Channel.objects.get(code=code)
except Channel.DoesNotExist:
return HttpResponseBadRequest()
if channel.project_id != project.id:
return HttpResponseForbidden()
new_checks = []
for key in request.POST:
if key.startswith("check-"):
code = key[6:]
try:
check = Check.objects.get(code=code)
except Check.DoesNotExist:
return HttpResponseBadRequest()
if check.project_id != project.id:
return HttpResponseForbidden()
new_checks.append(check)
channel.checks.set(new_checks)
return redirect("hc-channels", project.code)
channels = Channel.objects.filter(project=project)
channels = channels.order_by("created")
channels = channels.annotate(n_checks=Count("checks"))
ctx = {
"page": "channels",
"rw": rw,
"project": project,
"profile": project.owner_profile,
"channels": channels,
"enable_apprise": settings.APPRISE_ENABLED is True,
"enable_call": settings.TWILIO_AUTH is not None,
"enable_discord": settings.DISCORD_CLIENT_ID is not None,
"enable_linenotify": settings.LINENOTIFY_CLIENT_ID is not None,
"enable_matrix": settings.MATRIX_ACCESS_TOKEN is not None,
"enable_mattermost": settings.MATTERMOST_ENABLED is True,
"enable_msteams": settings.MSTEAMS_ENABLED is True,
"enable_opsgenie": settings.OPSGENIE_ENABLED is True,
"enable_pagertree": settings.PAGERTREE_ENABLED is True,
"enable_pd": settings.PD_ENABLED is True,
"enable_prometheus": settings.PROMETHEUS_ENABLED is True,
"enable_pushbullet": settings.PUSHBULLET_CLIENT_ID is not None,
"enable_pushover": settings.PUSHOVER_API_TOKEN is not None,
"enable_shell": settings.SHELL_ENABLED is True,
"enable_signal": settings.SIGNAL_CLI_ENABLED is True,
"enable_slack": settings.SLACK_ENABLED is True,
"enable_slack_btn": settings.SLACK_CLIENT_ID is not None,
"enable_sms": settings.TWILIO_AUTH is not None,
"enable_spike": settings.SPIKE_ENABLED is True,
"enable_telegram": settings.TELEGRAM_TOKEN is not None,
"enable_trello": settings.TRELLO_APP_KEY is not None,
"enable_victorops": settings.VICTOROPS_ENABLED is True,
"enable_webhooks": settings.WEBHOOKS_ENABLED is True,
"enable_whatsapp": settings.TWILIO_USE_WHATSAPP,
"enable_zulip": settings.ZULIP_ENABLED is True,
"use_payments": settings.USE_PAYMENTS,
}
return render(request, "front/channels.html", ctx)
@login_required
def channel_checks(request, code):
channel = _get_rw_channel_for_user(request, code)
assigned = set(channel.checks.values_list("code", flat=True).distinct())
checks = Check.objects.filter(project=channel.project).order_by("created")
ctx = {"checks": checks, "assigned": assigned, "channel": channel}
return render(request, "front/channel_checks.html", ctx)
@require_POST
@login_required
def update_channel_name(request, code):
channel = _get_rw_channel_for_user(request, code)
form = forms.ChannelNameForm(request.POST)
if form.is_valid():
channel.name = form.cleaned_data["name"]
channel.save()
return redirect("hc-channels", channel.project.code)
def verify_email(request, code, token):
channel = get_object_or_404(Channel, code=code)
if channel.make_token() == token:
channel.email_verified = True
channel.save()
return render(request, "front/verify_email_success.html")
return render(request, "bad_link.html")
@csrf_exempt
def unsubscribe_email(request, code, signed_token):
ctx = {}
# Some email servers open links in emails to check for malicious content.
# To work around this, on GET requests we serve a confirmation form.
# If the signature is at least 5 minutes old, we also include JS code to
# auto-submit the form.
signer = signing.TimestampSigner(salt="alerts")
# First, check the signature without looking at the timestamp:
try:
token = signer.unsign(signed_token)
except signing.BadSignature:
return render(request, "bad_link.html")
# Then, check if timestamp is older than 5 minutes:
try:
signer.unsign(signed_token, max_age=300)
except signing.SignatureExpired:
ctx["autosubmit"] = True
channel = get_object_or_404(Channel, code=code, kind="email")
if channel.make_token() != token:
return render(request, "bad_link.html")
if request.method != "POST":
return render(request, "accounts/unsubscribe_submit.html", ctx)
channel.delete()
return render(request, "front/unsubscribe_success.html")
@require_POST
@login_required
def send_test_notification(request, code):
channel, rw = _get_channel_for_user(request, code)
dummy = Check(name="TEST", status="down", project=channel.project)
dummy.last_ping = timezone.now() - td(days=1)
dummy.n_pings = 42
if channel.kind == "webhook" and not channel.url_down:
if channel.url_up:
# If we don't have url_down, but do have have url_up then
# send "TEST is UP" notification instead:
dummy.status = "up"
# Delete all older test notifications for this channel
Notification.objects.filter(channel=channel, owner=None).delete()
# Send the test notification
error = channel.notify(dummy, is_test=True)
if error:
messages.warning(request, "Could not send a test notification. %s" % error)
else:
messages.success(request, "Test notification sent!")
return redirect("hc-channels", channel.project.code)
@require_POST
@login_required
def remove_channel(request, code):
channel = _get_rw_channel_for_user(request, code)
project = channel.project
channel.delete()
return redirect("hc-channels", project.code)
@login_required
def email_form(request, channel=None, code=None):
""" Add email integration or edit an existing email integration. """
is_new = channel is None
if is_new:
project = _get_rw_project_for_user(request, code)
channel = Channel(project=project, kind="email")
if request.method == "POST":
form = forms.EmailForm(request.POST)
if form.is_valid():
if form.cleaned_data["value"] != channel.email_value:
if not settings.EMAIL_USE_VERIFICATION:
# In self-hosted setting, administator can set
# EMAIL_USE_VERIFICATION=False to disable email verification
channel.email_verified = True
elif form.cleaned_data["value"] == request.user.email:
# If the user is adding *their own* address
# we skip the verification step
channel.email_verified = True
else:
channel.email_verified = False
channel.value = form.get_value()
channel.save()
if is_new:
channel.assign_all_checks()
if not channel.email_verified:
channel.send_verify_link()
return redirect("hc-channels", channel.project.code)
elif is_new:
form = forms.EmailForm()
else:
form = forms.EmailForm(
{
"value": channel.email_value,
"up": channel.email_notify_up,
"down": channel.email_notify_down,
}
)
ctx = {
"page": "channels",
"project": channel.project,
"use_verification": settings.EMAIL_USE_VERIFICATION,
"form": form,
"is_new": is_new,
}
return render(request, "integrations/email_form.html", ctx)
@login_required
def edit_channel(request, code):
channel = _get_rw_channel_for_user(request, code)
if channel.kind == "email":
return email_form(request, channel=channel)
if channel.kind == "webhook":
return webhook_form(request, channel=channel)
if channel.kind == "sms":
return sms_form(request, channel=channel)
if channel.kind == "signal":
return signal_form(request, channel=channel)
if channel.kind == "whatsapp":
return whatsapp_form(request, channel=channel)
return HttpResponseBadRequest()
@require_setting("WEBHOOKS_ENABLED")
@login_required
def webhook_form(request, channel=None, code=None):
is_new = channel is None
if is_new:
project = _get_rw_project_for_user(request, code)
channel = Channel(project=project, kind="webhook")
if request.method == "POST":
form = forms.WebhookForm(request.POST)
if form.is_valid():
channel.name = form.cleaned_data["name"]
channel.value = form.get_value()
channel.save()
if is_new:
channel.assign_all_checks()
return redirect("hc-channels", channel.project.code)
elif is_new:
form = forms.WebhookForm()
else:
def flatten(d):
return "\n".join("%s: %s" % pair for pair in d.items())
doc = json.loads(channel.value)
doc["headers_down"] = flatten(doc["headers_down"])
doc["headers_up"] = flatten(doc["headers_up"])
doc["name"] = channel.name
form = forms.WebhookForm(doc)
ctx = {
"page": "channels",
"project": channel.project,
"form": form,
"is_new": is_new,
}
return render(request, "integrations/webhook_form.html", ctx)
@require_setting("SHELL_ENABLED")
@login_required
def add_shell(request, code):
project = _get_rw_project_for_user(request, code)
if request.method == "POST":
form = forms.AddShellForm(request.POST)
if form.is_valid():
channel = Channel(project=project, kind="shell")
channel.value = form.get_value()
channel.save()
channel.assign_all_checks()
return redirect("hc-channels", project.code)
else:
form = forms.AddShellForm()
ctx = {
"page": "channels",
"project": project,
"form": form,
}
return render(request, "integrations/add_shell.html", ctx)
@require_setting("PD_ENABLED")
@login_required
def add_pd(request, code):
project = _get_rw_project_for_user(request, code)
# Simple Install Flow
if settings.PD_APP_ID:
state = token_urlsafe()
redirect_url = settings.SITE_ROOT + reverse("hc-add-pd-complete")
redirect_url += "?" + urlencode({"state": state})
install_url = "https://app.pagerduty.com/install/integration?" + urlencode(
{"app_id": settings.PD_APP_ID, "redirect_url": redirect_url, "version": "2"}
)
ctx = {"page": "channels", "project": project, "install_url": install_url}
request.session["pagerduty"] = (state, str(project.code))
return render(request, "integrations/add_pd_simple.html", ctx)
if request.method == "POST":
form = forms.AddPdForm(request.POST)
if form.is_valid():
channel = Channel(project=project, kind="pd")
channel.value = form.cleaned_data["value"]
channel.save()
channel.assign_all_checks()
return redirect("hc-channels", project.code)
else:
form = forms.AddPdForm()
ctx = {"page": "channels", "project": project, "form": form}
return render(request, "integrations/add_pd.html", ctx)
@require_setting("PD_ENABLED")
@require_setting("PD_APP_ID")
@login_required
def add_pd_complete(request):
if "pagerduty" not in request.session:
return HttpResponseBadRequest()
state, code = request.session.pop("pagerduty")
if request.GET.get("state") != state:
return HttpResponseForbidden()
project = _get_rw_project_for_user(request, code)
doc = json.loads(request.GET["config"])
for item in doc["integration_keys"]:
channel = Channel(kind="pd", project=project)
channel.name = item["name"]
channel.value = json.dumps(
{"service_key": item["integration_key"], "account": doc["account"]["name"]}
)
channel.save()
channel.assign_all_checks()
messages.success(request, "The PagerDuty integration has been added!")
return redirect("hc-channels", project.code)
@require_setting("PD_ENABLED")
@require_setting("PD_APP_ID")
def pd_help(request):
ctx = {"page": "channels"}
return render(request, "integrations/add_pd_simple.html", ctx)
@require_setting("PAGERTREE_ENABLED")
@login_required
def add_pagertree(request, code):
project = _get_rw_project_for_user(request, code)
if request.method == "POST":
form = forms.AddUrlForm(request.POST)
if form.is_valid():
channel = Channel(project=project, kind="pagertree")
channel.value = form.cleaned_data["value"]
channel.save()
channel.assign_all_checks()
return redirect("hc-channels", project.code)
else:
form = forms.AddUrlForm()
ctx = {"page": "channels", "project": project, "form": form}
return render(request, "integrations/add_pagertree.html", ctx)
@require_setting("SLACK_ENABLED")
@login_required
def add_slack(request, code):
project = _get_rw_project_for_user(request, code)
if request.method == "POST":
form = forms.AddUrlForm(request.POST)
if form.is_valid():
channel = Channel(project=project, kind="slack")
channel.value = form.cleaned_data["value"]
channel.save()
channel.assign_all_checks()
return redirect("hc-channels", project.code)
else:
form = forms.AddUrlForm()
ctx = {
"page": "channels",
"form": form,
}
return render(request, "integrations/add_slack.html", ctx)
@require_setting("SLACK_ENABLED")
@require_setting("SLACK_CLIENT_ID")
def slack_help(request):
ctx = {"page": "channels"}
return render(request, "integrations/add_slack_btn.html", ctx)
@require_setting("SLACK_ENABLED")
@require_setting("SLACK_CLIENT_ID")
@login_required
def add_slack_btn(request, code):
project = _get_rw_project_for_user(request, code)
state = token_urlsafe()
authorize_url = "https://slack.com/oauth/v2/authorize?" + urlencode(
{
"scope": "incoming-webhook",
"client_id": settings.SLACK_CLIENT_ID,
"state": state,
}
)
ctx = {
"project": project,
"page": "channels",
"authorize_url": authorize_url,
}
request.session["add_slack"] = (state, str(project.code))
return render(request, "integrations/add_slack_btn.html", ctx)
@require_setting("SLACK_ENABLED")
@require_setting("SLACK_CLIENT_ID")
@login_required
def add_slack_complete(request):
if "add_slack" not in request.session:
return HttpResponseForbidden()
state, code = request.session.pop("add_slack")
project = _get_rw_project_for_user(request, code)
if request.GET.get("error") == "access_denied":
messages.warning(request, "Slack setup was cancelled.")
return redirect("hc-channels", project.code)
if request.GET.get("state") != state:
return HttpResponseForbidden()
result = requests.post(
"https://slack.com/api/oauth.v2.access",
{
"client_id": settings.SLACK_CLIENT_ID,
"client_secret": settings.SLACK_CLIENT_SECRET,
"code": request.GET.get("code"),
},
)
doc = result.json()
if doc.get("ok"):
channel = Channel(kind="slack", project=project)
channel.value = result.text
channel.save()
channel.assign_all_checks()
messages.success(request, "The Slack integration has been added!")
else:
s = doc.get("error")
messages.warning(request, "Error message from slack: %s" % s)
return redirect("hc-channels", project.code)
@require_setting("MATTERMOST_ENABLED")
@login_required
def add_mattermost(request, code):
project = _get_rw_project_for_user(request, code)
if request.method == "POST":
form = forms.AddUrlForm(request.POST)
if form.is_valid():
channel = Channel(project=project, kind="mattermost")
channel.value = form.cleaned_data["value"]
channel.save()
channel.assign_all_checks()
return redirect("hc-channels", project.code)
else:
form = forms.AddUrlForm()
ctx = {"page": "channels", "form": form, "project": project}
return render(request, "integrations/add_mattermost.html", ctx)
@require_setting("PUSHBULLET_CLIENT_ID")
@login_required
def add_pushbullet(request, code):
project = _get_rw_project_for_user(request, code)
state = token_urlsafe()
authorize_url = "https://www.pushbullet.com/authorize?" + urlencode(
{
"client_id": settings.PUSHBULLET_CLIENT_ID,
"redirect_uri": settings.SITE_ROOT + reverse(add_pushbullet_complete),
"response_type": "code",
"state": state,
}
)
ctx = {
"page": "channels",
"project": project,
"authorize_url": authorize_url,
}
request.session["add_pushbullet"] = (state, str(project.code))
return render(request, "integrations/add_pushbullet.html", ctx)
@require_setting("PUSHBULLET_CLIENT_ID")
@login_required
def add_pushbullet_complete(request):
if "add_pushbullet" not in request.session:
return HttpResponseForbidden()
state, code = request.session.pop("add_pushbullet")
project = _get_rw_project_for_user(request, code)
if request.GET.get("error") == "access_denied":
messages.warning(request, "Pushbullet setup was cancelled.")
return redirect("hc-channels", project.code)
if request.GET.get("state") != state:
return HttpResponseForbidden()
result = requests.post(
"https://api.pushbullet.com/oauth2/token",
{
"client_id": settings.PUSHBULLET_CLIENT_ID,
"client_secret": settings.PUSHBULLET_CLIENT_SECRET,
"code": request.GET.get("code"),
"grant_type": "authorization_code",
},
)
doc = result.json()
if "access_token" in doc:
channel = Channel(kind="pushbullet", project=project)
channel.value = doc["access_token"]
channel.save()
channel.assign_all_checks()
messages.success(request, "The Pushbullet integration has been added!")
else:
messages.warning(request, "Something went wrong")
return redirect("hc-channels", project.code)
@require_setting("DISCORD_CLIENT_ID")
@login_required
def add_discord(request, code):
project = _get_rw_project_for_user(request, code)
state = token_urlsafe()
auth_url = "https://discordapp.com/api/oauth2/authorize?" + urlencode(
{
"client_id": settings.DISCORD_CLIENT_ID,
"scope": "webhook.incoming",
"redirect_uri": settings.SITE_ROOT + reverse(add_discord_complete),
"response_type": "code",
"state": state,
}
)
ctx = {"page": "channels", "project": project, "authorize_url": auth_url}
request.session["add_discord"] = (state, str(project.code))
return render(request, "integrations/add_discord.html", ctx)
@require_setting("DISCORD_CLIENT_ID")
@login_required
def add_discord_complete(request):
if "add_discord" not in request.session:
return HttpResponseForbidden()
state, code = request.session.pop("add_discord")
project = _get_rw_project_for_user(request, code)
if request.GET.get("error") == "access_denied":
messages.warning(request, "Discord setup was cancelled.")
return redirect("hc-channels", project.code)
if request.GET.get("state") != state:
return HttpResponseForbidden()
result = requests.post(
"https://discordapp.com/api/oauth2/token",
{
"client_id": settings.DISCORD_CLIENT_ID,
"client_secret": settings.DISCORD_CLIENT_SECRET,
"code": request.GET.get("code"),
"grant_type": "authorization_code",
"redirect_uri": settings.SITE_ROOT + reverse(add_discord_complete),
},
)
doc = result.json()
if "access_token" in doc:
channel = Channel(kind="discord", project=project)
channel.value = result.text
channel.save()
channel.assign_all_checks()
messages.success(request, "The Discord integration has been added!")
else:
messages.warning(request, "Something went wrong.")
return redirect("hc-channels", project.code)
@require_setting("PUSHOVER_API_TOKEN")
def pushover_help(request):
ctx = {"page": "channels"}
return render(request, "integrations/add_pushover_help.html", ctx)
@require_setting("PUSHOVER_API_TOKEN")
@login_required
def add_pushover(request, code):
project = _get_rw_project_for_user(request, code)
if request.method == "POST":
state = token_urlsafe()
failure_url = settings.SITE_ROOT + reverse("hc-channels", args=[project.code])
success_url = (
settings.SITE_ROOT
+ reverse("hc-add-pushover", args=[project.code])
+ "?"
+ urlencode(
{
"state": state,
"prio": request.POST.get("po_priority", "0"),
"prio_up": request.POST.get("po_priority_up", "0"),
}
)
)
subscription_url = (
settings.PUSHOVER_SUBSCRIPTION_URL
+ "?"
+ urlencode({"success": success_url, "failure": failure_url})
)
request.session["pushover"] = state
return redirect(subscription_url)
# Handle successful subscriptions
if "pushover_user_key" in request.GET:
if "pushover" not in request.session:
return HttpResponseForbidden()
state = request.session.pop("pushover")
if request.GET.get("state") != state:
return HttpResponseForbidden()
if request.GET.get("pushover_unsubscribed") == "1":
# Unsubscription: delete all Pushover channels for this project
Channel.objects.filter(project=project, kind="po").delete()
return redirect("hc-channels", project.code)
form = forms.AddPushoverForm(request.GET)
if not form.is_valid():
return HttpResponseBadRequest()
channel = Channel(project=project, kind="po")
channel.value = form.get_value()
channel.save()
channel.assign_all_checks()
messages.success(request, "The Pushover integration has been added!")
return redirect("hc-channels", project.code)
# Show Integration Settings form
ctx = {
"page": "channels",
"project": project,
"po_retry_delay": td(seconds=settings.PUSHOVER_EMERGENCY_RETRY_DELAY),
"po_expiration": td(seconds=settings.PUSHOVER_EMERGENCY_EXPIRATION),
}
return render(request, "integrations/add_pushover.html", ctx)
@require_setting("OPSGENIE_ENABLED")
@login_required
def add_opsgenie(request, code):
project = _get_rw_project_for_user(request, code)
if request.method == "POST":
form = forms.AddOpsgenieForm(request.POST)
if form.is_valid():
channel = Channel(project=project, kind="opsgenie")
v = {"region": form.cleaned_data["region"], "key": form.cleaned_data["key"]}
channel.value = json.dumps(v)
channel.save()
channel.assign_all_checks()
return redirect("hc-channels", project.code)
else:
form = forms.AddOpsgenieForm()
ctx = {"page": "channels", "project": project, "form": form}
return render(request, "integrations/add_opsgenie.html", ctx)
@require_setting("VICTOROPS_ENABLED")
@login_required
def add_victorops(request, code):
project = _get_rw_project_for_user(request, code)
if request.method == "POST":
form = forms.AddUrlForm(request.POST)
if form.is_valid():
channel = Channel(project=project, kind="victorops")
channel.value = form.cleaned_data["value"]
channel.save()
channel.assign_all_checks()
return redirect("hc-channels", project.code)
else:
form = forms.AddUrlForm()
ctx = {"page": "channels", "project": project, "form": form}
return render(request, "integrations/add_victorops.html", ctx)
@require_setting("ZULIP_ENABLED")
@login_required
def add_zulip(request, code):
project = _get_rw_project_for_user(request, code)
if request.method == "POST":
form = forms.AddZulipForm(request.POST)
if form.is_valid():
channel = Channel(project=project, kind="zulip")
channel.value = form.get_value()
channel.save()
channel.assign_all_checks()
return redirect("hc-channels", project.code)
else:
form = forms.AddZulipForm()
ctx = {"page": "channels", "project": project, "form": form}
return render(request, "integrations/add_zulip.html", ctx)
@csrf_exempt
@require_POST
def telegram_bot(request):
try:
doc = json.loads(request.body.decode())
jsonschema.validate(doc, telegram_callback)
except ValueError:
return HttpResponseBadRequest()
except jsonschema.ValidationError:
# We don't recognize the message format, but don't want Telegram
# retrying this over and over again, so respond with 200 OK
return HttpResponse()
if "/start" not in doc["message"]["text"]:
return HttpResponse()
chat = doc["message"]["chat"]
name = max(chat.get("title", ""), chat.get("username", ""))
invite = render_to_string(
"integrations/telegram_invite.html",
{"qs": signing.dumps((chat["id"], chat["type"], name))},
)
Telegram.send(chat["id"], invite)
return HttpResponse()
@require_setting("TELEGRAM_TOKEN")
def telegram_help(request):
ctx = {
"page": "channels",
"bot_name": settings.TELEGRAM_BOT_NAME,
}
return render(request, "integrations/add_telegram.html", ctx)
@require_setting("TELEGRAM_TOKEN")
@login_required
def add_telegram(request):
chat_id, chat_type, chat_name = None, None, None
qs = request.META["QUERY_STRING"]
if qs:
try:
chat_id, chat_type, chat_name = signing.loads(qs, max_age=600)
except signing.BadSignature:
return render(request, "bad_link.html")
if request.method == "POST":
project = _get_rw_project_for_user(request, request.POST.get("project"))
channel = Channel(project=project, kind="telegram")
channel.value = json.dumps(
{"id": chat_id, "type": chat_type, "name": chat_name}
)
channel.save()
channel.assign_all_checks()
messages.success(request, "The Telegram integration has been added!")
return redirect("hc-channels", project.code)
ctx = {
"page": "channels",
"projects": request.profile.projects(),
"chat_id": chat_id,
"chat_type": chat_type,
"chat_name": chat_name,
"bot_name": settings.TELEGRAM_BOT_NAME,
}
return render(request, "integrations/add_telegram.html", ctx)
@require_setting("TWILIO_AUTH")
@login_required
def sms_form(request, channel=None, code=None):
is_new = channel is None
if is_new:
project = _get_rw_project_for_user(request, code)
channel = Channel(project=project, kind="sms")
if request.method == "POST":
form = forms.PhoneUpDownForm(request.POST)
if form.is_valid():
channel.name = form.cleaned_data["label"]
channel.value = form.get_json()
channel.save()
if is_new:
channel.assign_all_checks()
return redirect("hc-channels", channel.project.code)
elif is_new:
form = forms.PhoneUpDownForm(initial={"up": False})
else:
form = forms.PhoneUpDownForm(
{
"label": channel.name,
"phone": channel.phone_number,
"up": channel.sms_notify_up,
"down": channel.sms_notify_down,
}
)
ctx = {
"page": "channels",
"project": channel.project,
"form": form,
"profile": channel.project.owner_profile,
"is_new": is_new,
}
return render(request, "integrations/sms_form.html", ctx)
@require_setting("TWILIO_AUTH")
@login_required
def add_call(request, code):
project = _get_rw_project_for_user(request, code)
if request.method == "POST":
form = forms.PhoneNumberForm(request.POST)
if form.is_valid():
channel = Channel(project=project, kind="call")
channel.name = form.cleaned_data["label"]
channel.value = form.get_json()
channel.save()
channel.assign_all_checks()
return redirect("hc-channels", project.code)
else:
form = forms.PhoneNumberForm()
ctx = {
"page": "channels",
"project": project,
"form": form,
"profile": project.owner_profile,
}
return render(request, "integrations/add_call.html", ctx)
@require_setting("TWILIO_USE_WHATSAPP")
@login_required
def whatsapp_form(request, channel=None, code=None):
is_new = channel is None
if is_new:
project = _get_rw_project_for_user(request, code)
channel = Channel(project=project, kind="whatsapp")
if request.method == "POST":
form = forms.PhoneUpDownForm(request.POST)
if form.is_valid():
channel.name = form.cleaned_data["label"]
channel.value = form.get_json()
channel.save()
if is_new:
channel.assign_all_checks()
return redirect("hc-channels", channel.project.code)
elif is_new:
form = forms.PhoneUpDownForm()
else:
form = forms.PhoneUpDownForm(
{
"label": channel.name,
"phone": channel.phone_number,
"up": channel.whatsapp_notify_up,
"down": channel.whatsapp_notify_down,
}
)
ctx = {
"page": "channels",
"project": channel.project,
"form": form,
"profile": channel.project.owner_profile,
"is_new": is_new,
}
return render(request, "integrations/whatsapp_form.html", ctx)
@require_setting("SIGNAL_CLI_ENABLED")
@login_required
def signal_form(request, channel=None, code=None):
is_new = channel is None
if is_new:
project = _get_rw_project_for_user(request, code)
channel = Channel(project=project, kind="signal")
if request.method == "POST":
form = forms.PhoneUpDownForm(request.POST)
if form.is_valid():
channel.name = form.cleaned_data["label"]
channel.value = form.get_json()
channel.save()
if is_new:
channel.assign_all_checks()
return redirect("hc-channels", channel.project.code)
elif is_new:
form = forms.PhoneUpDownForm()
else:
form = forms.PhoneUpDownForm(
{
"label": channel.name,
"phone": channel.phone_number,
"up": channel.signal_notify_up,
"down": channel.signal_notify_down,
}
)
ctx = {
"page": "channels",
"project": channel.project,
"form": form,
"is_new": is_new,
}
return render(request, "integrations/signal_form.html", ctx)
@require_setting("TRELLO_APP_KEY")
@login_required
def add_trello(request, code):
project = _get_rw_project_for_user(request, code)
if request.method == "POST":
form = forms.AddTrelloForm(request.POST)
if not form.is_valid():
return HttpResponseBadRequest()
channel = Channel(project=project, kind="trello")
channel.value = form.get_value()
channel.save()
channel.assign_all_checks()
return redirect("hc-channels", project.code)
return_url = settings.SITE_ROOT + reverse("hc-add-trello", args=[project.code])
authorize_url = "https://trello.com/1/authorize?" + urlencode(
{
"expiration": "never",
"name": settings.SITE_NAME,
"scope": "read,write",
"response_type": "token",
"key": settings.TRELLO_APP_KEY,
"return_url": return_url,
}
)
ctx = {
"page": "channels",
"project": project,
"authorize_url": authorize_url,
}
return render(request, "integrations/add_trello.html", ctx)
@require_setting("MATRIX_ACCESS_TOKEN")
@login_required
def add_matrix(request, code):
project = _get_rw_project_for_user(request, code)
if request.method == "POST":
form = forms.AddMatrixForm(request.POST)
if form.is_valid():
channel = Channel(project=project, kind="matrix")
channel.value = form.cleaned_data["room_id"]
# If user supplied room alias instead of ID, use it as channel name
alias = form.cleaned_data["alias"]
if not alias.startswith("!"):
channel.name = alias
channel.save()
channel.assign_all_checks()
messages.success(request, "The Matrix integration has been added!")
return redirect("hc-channels", project.code)
else:
form = forms.AddMatrixForm()
ctx = {
"page": "channels",
"project": project,
"form": form,
"matrix_user_id": settings.MATRIX_USER_ID,
}
return render(request, "integrations/add_matrix.html", ctx)
@require_setting("APPRISE_ENABLED")
@login_required
def add_apprise(request, code):
project = _get_rw_project_for_user(request, code)
if request.method == "POST":
form = forms.AddAppriseForm(request.POST)
if form.is_valid():
channel = Channel(project=project, kind="apprise")
channel.value = form.cleaned_data["url"]
channel.save()
channel.assign_all_checks()
messages.success(request, "The Apprise integration has been added!")
return redirect("hc-channels", project.code)
else:
form = forms.AddAppriseForm()
ctx = {"page": "channels", "project": project, "form": form}
return render(request, "integrations/add_apprise.html", ctx)
@require_setting("TRELLO_APP_KEY")
@login_required
@require_POST
def trello_settings(request):
token = request.POST.get("token")
url = "https://api.trello.com/1/members/me/boards?" + urlencode(
{
"key": settings.TRELLO_APP_KEY,
"token": token,
"filter": "open",
"fields": "id,name",
"lists": "open",
"list_fields": "id,name",
}
)
boards = requests.get(url).json()
num_lists = sum(len(board["lists"]) for board in boards)
ctx = {"token": token, "boards": boards, "num_lists": num_lists}
return render(request, "integrations/trello_settings.html", ctx)
@require_setting("MSTEAMS_ENABLED")
@login_required
def add_msteams(request, code):
project = _get_rw_project_for_user(request, code)
if request.method == "POST":
form = forms.AddUrlForm(request.POST)
if form.is_valid():
channel = Channel(project=project, kind="msteams")
channel.value = form.cleaned_data["value"]
channel.save()
channel.assign_all_checks()
return redirect("hc-channels", project.code)
else:
form = forms.AddUrlForm()
ctx = {"page": "channels", "project": project, "form": form}
return render(request, "integrations/add_msteams.html", ctx)
@require_setting("PROMETHEUS_ENABLED")
@login_required
def add_prometheus(request, code):
project, rw = _get_project_for_user(request, code)
ctx = {"page": "channels", "project": project}
return render(request, "integrations/add_prometheus.html", ctx)
@require_setting("PROMETHEUS_ENABLED")
def metrics(request, code, key):
if len(key) != 32:
return HttpResponseBadRequest()
q = Project.objects.filter(code=code, api_key_readonly=key)
try:
project = q.get()
except Project.DoesNotExist:
return HttpResponseForbidden()
checks = Check.objects.filter(project_id=project.id).order_by("id")
def esc(s):
return s.replace("\\", "\\\\").replace('"', '\\"').replace("\n", "\\n")
def output(checks):
yield "# HELP hc_check_up Whether the check is currently up (1 for yes, 0 for no).\n"
yield "# TYPE hc_check_up gauge\n"
TMPL = """hc_check_up{name="%s", tags="%s", unique_key="%s"} %d\n"""
for check in checks:
value = 0 if check.get_status() == "down" else 1
yield TMPL % (esc(check.name), esc(check.tags), check.unique_key, value)
tags_statuses, num_down = _tags_statuses(checks)
yield "\n"
yield "# HELP hc_tag_up Whether all checks with this tag are up (1 for yes, 0 for no).\n"
yield "# TYPE hc_tag_up gauge\n"
TMPL = """hc_tag_up{tag="%s"} %d\n"""
for tag in sorted(tags_statuses):
value = 0 if tags_statuses[tag] == "down" else 1
yield TMPL % (esc(tag), value)
yield "\n"
yield "# HELP hc_checks_total The total number of checks.\n"
yield "# TYPE hc_checks_total gauge\n"
yield "hc_checks_total %d\n" % len(checks)
yield "\n"
yield "# HELP hc_checks_down_total The number of checks currently down.\n"
yield "# TYPE hc_checks_down_total gauge\n"
yield "hc_checks_down_total %d\n" % num_down
return HttpResponse(output(checks), content_type="text/plain")
@require_setting("SPIKE_ENABLED")
@login_required
def add_spike(request, code):
project = _get_rw_project_for_user(request, code)
if request.method == "POST":
form = forms.AddUrlForm(request.POST)
if form.is_valid():
channel = Channel(project=project, kind="spike")
channel.value = form.cleaned_data["value"]
channel.save()
channel.assign_all_checks()
return redirect("hc-channels", project.code)
else:
form = forms.AddUrlForm()
ctx = {"page": "channels", "project": project, "form": form}
return render(request, "integrations/add_spike.html", ctx)
@require_setting("LINENOTIFY_CLIENT_ID")
@login_required
def add_linenotify(request, code):
project = _get_rw_project_for_user(request, code)
state = token_urlsafe()
authorize_url = " https://notify-bot.line.me/oauth/authorize?" + urlencode(
{
"client_id": settings.LINENOTIFY_CLIENT_ID,
"redirect_uri": settings.SITE_ROOT + reverse(add_linenotify_complete),
"response_type": "code",
"state": state,
"scope": "notify",
}
)
ctx = {
"page": "channels",
"project": project,
"authorize_url": authorize_url,
}
request.session["add_linenotify"] = (state, str(project.code))
return render(request, "integrations/add_linenotify.html", ctx)
@require_setting("LINENOTIFY_CLIENT_ID")
@login_required
def add_linenotify_complete(request):
if "add_linenotify" not in request.session:
return HttpResponseForbidden()
state, code = request.session.pop("add_linenotify")
if request.GET.get("state") != state:
return HttpResponseForbidden()
project = _get_rw_project_for_user(request, code)
if request.GET.get("error") == "access_denied":
messages.warning(request, "LINE Notify setup was cancelled.")
return redirect("hc-channels", project.code)
# Exchange code for access token
result = requests.post(
"https://notify-bot.line.me/oauth/token",
{
"grant_type": "authorization_code",
"code": request.GET.get("code"),
"redirect_uri": settings.SITE_ROOT + reverse(add_linenotify_complete),
"client_id": settings.LINENOTIFY_CLIENT_ID,
"client_secret": settings.LINENOTIFY_CLIENT_SECRET,
},
)
doc = result.json()
if doc.get("status") != 200:
messages.warning(request, "Something went wrong.")
return redirect("hc-channels", project.code)
# Fetch notification target's name, will use it as channel name:
token = doc["access_token"]
result = requests.get(
"https://notify-api.line.me/api/status",
headers={"Authorization": "Bearer %s" % token},
)
doc = result.json()
channel = Channel(kind="linenotify", project=project)
channel.name = doc.get("target")
channel.value = token
channel.save()
channel.assign_all_checks()
messages.success(request, "The LINE Notify integration has been added!")
return redirect("hc-channels", project.code)
# Forks: add custom views after this line