- from datetime import timedelta as td
- import email
- import json
- import os
- import re
- from secrets import token_urlsafe
- from urllib.parse import urlencode
-
- from cron_descriptor import ExpressionDescriptor
- from cronsim.cronsim import CronSim, CronSimError
- from django.conf import settings
- from django.contrib import messages
- from django.contrib.auth.decorators import login_required
- from django.core import signing
- from django.core.exceptions import PermissionDenied
- from django.db.models import Count, F
- from django.http import (
- Http404,
- HttpResponse,
- HttpResponseBadRequest,
- HttpResponseForbidden,
- JsonResponse,
- )
- from django.shortcuts import get_object_or_404, redirect, render
- from django.template.loader import get_template, render_to_string
- from django.urls import reverse
- from django.utils import timezone
- from django.views.decorators.csrf import csrf_exempt
- from django.views.decorators.http import require_POST
- from hc.accounts.models import Project, Member
- from hc.api.models import (
- DEFAULT_GRACE,
- DEFAULT_TIMEOUT,
- MAX_DELTA,
- Channel,
- Check,
- Ping,
- Notification,
- )
- from hc.api.transports import Telegram
- from hc.front.decorators import require_setting
- from hc.front import forms
- from hc.front.schemas import telegram_callback
- from hc.front.templatetags.hc_extras import (
- num_down_title,
- down_title,
- sortchecks,
- site_hostname,
- site_scheme,
- )
- from hc.lib import jsonschema
- from hc.lib.badges import get_badge_url
- import pytz
- from pytz.exceptions import UnknownTimeZoneError
- import requests
-
-
- VALID_SORT_VALUES = ("name", "-name", "last_ping", "-last_ping", "created")
- STATUS_TEXT_TMPL = get_template("front/log_status_text.html")
- LAST_PING_TMPL = get_template("front/last_ping_cell.html")
- EVENTS_TMPL = get_template("front/details_events.html")
- DOWNTIMES_TMPL = get_template("front/details_downtimes.html")
-
-
- def _tags_statuses(checks):
- tags, down, grace, num_down = {}, {}, {}, 0
- for check in checks:
- status = check.get_status()
-
- if status == "down":
- num_down += 1
- for tag in check.tags_list():
- down[tag] = "down"
- elif status == "grace":
- for tag in check.tags_list():
- grace[tag] = "grace"
- else:
- for tag in check.tags_list():
- tags[tag] = "up"
-
- tags.update(grace)
- tags.update(down)
- return tags, num_down
-
-
- def _get_check_for_user(request, code):
- """ Return specified check if current user has access to it. """
-
- assert request.user.is_authenticated
-
- check = get_object_or_404(Check.objects.select_related("project"), code=code)
- if request.user.is_superuser:
- return check, True
-
- if request.user.id == check.project.owner_id:
- return check, True
-
- membership = get_object_or_404(Member, project=check.project, user=request.user)
- return check, membership.is_rw
-
-
- def _get_rw_check_for_user(request, code):
- check, rw = _get_check_for_user(request, code)
- if not rw:
- raise PermissionDenied
-
- return check
-
-
- def _get_channel_for_user(request, code):
- """ Return specified channel if current user has access to it. """
-
- assert request.user.is_authenticated
-
- channel = get_object_or_404(Channel.objects.select_related("project"), code=code)
- if request.user.is_superuser:
- return channel, True
-
- if request.user.id == channel.project.owner_id:
- return channel, True
-
- membership = get_object_or_404(Member, project=channel.project, user=request.user)
- return channel, membership.is_rw
-
-
- def _get_rw_channel_for_user(request, code):
- channel, rw = _get_channel_for_user(request, code)
- if not rw:
- raise PermissionDenied
-
- return channel
-
-
- def _get_project_for_user(request, project_code):
- """ Check access, return (project, rw) tuple. """
-
- project = get_object_or_404(Project, code=project_code)
- if request.user.is_superuser:
- return project, True
-
- if request.user.id == project.owner_id:
- return project, True
-
- membership = get_object_or_404(Member, project=project, user=request.user)
-
- return project, membership.is_rw
-
-
- def _get_rw_project_for_user(request, project_code):
- """ Check access, return (project, rw) tuple. """
-
- project, rw = _get_project_for_user(request, project_code)
- if not rw:
- raise PermissionDenied
-
- return project
-
-
- def _refresh_last_active_date(profile):
- """ Update last_active_date if it is more than a day old. """
-
- now = timezone.now()
- if profile.last_active_date is None or (now - profile.last_active_date).days > 0:
- profile.last_active_date = now
- profile.save()
-
-
- @login_required
- def my_checks(request, code):
- _refresh_last_active_date(request.profile)
- project, rw = _get_project_for_user(request, code)
-
- if request.GET.get("sort") in VALID_SORT_VALUES:
- request.profile.sort = request.GET["sort"]
- request.profile.save()
-
- if request.GET.get("urls") in ("uuid", "slug") and rw:
- project.show_slugs = request.GET["urls"] == "slug"
- project.save()
-
- if request.session.get("last_project_id") != project.id:
- request.session["last_project_id"] = project.id
-
- q = Check.objects.filter(project=project)
- q = q.select_related("project")
- checks = list(q.prefetch_related("channel_set"))
- sortchecks(checks, request.profile.sort)
-
- tags_statuses, num_down = _tags_statuses(checks)
- pairs = list(tags_statuses.items())
- pairs.sort(key=lambda pair: pair[0].lower())
-
- channels = Channel.objects.filter(project=project)
- channels = list(channels.order_by("created"))
-
- hidden_checks = set()
- # Hide checks that don't match selected tags:
- selected_tags = set(request.GET.getlist("tag", []))
- if selected_tags:
- for check in checks:
- if not selected_tags.issubset(check.tags_list()):
- hidden_checks.add(check)
-
- # Hide checks that don't match the search string:
- search = request.GET.get("search", "")
- if search:
- for check in checks:
- search_key = "%s\n%s" % (check.name.lower(), check.code)
- if search not in search_key:
- hidden_checks.add(check)
-
- # Figure out which checks have ambiguous ping URLs
- seen, ambiguous = set(), set()
- if project.show_slugs:
- for check in checks:
- if check.slug and check.slug in seen:
- ambiguous.add(check.slug)
- else:
- seen.add(check.slug)
-
- # Do we need to show the "Last Duration" header?
- show_last_duration = False
- for check in checks:
- if check.clamped_last_duration():
- show_last_duration = True
- break
-
- ctx = {
- "page": "checks",
- "rw": rw,
- "checks": checks,
- "channels": channels,
- "num_down": num_down,
- "tags": pairs,
- "ping_endpoint": settings.PING_ENDPOINT,
- "timezones": pytz.all_timezones,
- "project": project,
- "num_available": project.num_checks_available(),
- "sort": request.profile.sort,
- "selected_tags": selected_tags,
- "search": search,
- "hidden_checks": hidden_checks,
- "ambiguous": ambiguous,
- "show_last_duration": show_last_duration,
- }
-
- return render(request, "front/my_checks.html", ctx)
-
-
- @login_required
- def status(request, code):
- _get_project_for_user(request, code)
-
- checks = list(Check.objects.filter(project__code=code))
-
- details = []
- for check in checks:
- ctx = {"check": check}
- details.append(
- {
- "code": str(check.code),
- "status": check.get_status(),
- "last_ping": LAST_PING_TMPL.render(ctx).strip(),
- "started": check.last_start is not None,
- }
- )
-
- tags_statuses, num_down = _tags_statuses(checks)
- return JsonResponse(
- {"details": details, "tags": tags_statuses, "title": num_down_title(num_down)}
- )
-
-
- @login_required
- @require_POST
- def switch_channel(request, code, channel_code):
- check = _get_rw_check_for_user(request, code)
-
- channel = get_object_or_404(Channel, code=channel_code)
- if channel.project_id != check.project_id:
- return HttpResponseBadRequest()
-
- if request.POST.get("state") == "on":
- channel.checks.add(check)
- else:
- channel.checks.remove(check)
-
- return HttpResponse()
-
-
- def index(request):
- if request.user.is_authenticated:
- project_ids = request.profile.projects().values("id")
-
- q = Project.objects.filter(id__in=project_ids)
- q = q.annotate(n_checks=Count("check", distinct=True))
- q = q.annotate(n_channels=Count("channel", distinct=True))
- q = q.annotate(owner_email=F("owner__email"))
-
- projects = list(q)
- # Primary sort key: projects with overall_status=down go first
- # Secondary sort key: project's name
- projects.sort(key=lambda p: (p.overall_status() != "down", p.name))
-
- ctx = {
- "page": "projects",
- "projects": projects,
- "last_project_id": request.session.get("last_project_id"),
- }
-
- return render(request, "front/projects.html", ctx)
-
- check = Check()
-
- ctx = {
- "page": "welcome",
- "check": check,
- "ping_url": check.url(),
- "enable_apprise": settings.APPRISE_ENABLED is True,
- "enable_call": settings.TWILIO_AUTH is not None,
- "enable_discord": settings.DISCORD_CLIENT_ID is not None,
- "enable_linenotify": settings.LINENOTIFY_CLIENT_ID is not None,
- "enable_matrix": settings.MATRIX_ACCESS_TOKEN is not None,
- "enable_mattermost": settings.MATTERMOST_ENABLED is True,
- "enable_msteams": settings.MSTEAMS_ENABLED is True,
- "enable_opsgenie": settings.OPSGENIE_ENABLED is True,
- "enable_pagertree": settings.PAGERTREE_ENABLED is True,
- "enable_pd": settings.PD_ENABLED is True,
- "enable_pd_simple": settings.PD_APP_ID is not None,
- "enable_prometheus": settings.PROMETHEUS_ENABLED is True,
- "enable_pushbullet": settings.PUSHBULLET_CLIENT_ID is not None,
- "enable_pushover": settings.PUSHOVER_API_TOKEN is not None,
- "enable_shell": settings.SHELL_ENABLED is True,
- "enable_signal": settings.SIGNAL_CLI_ENABLED is True,
- "enable_slack": settings.SLACK_ENABLED is True,
- "enable_slack_btn": settings.SLACK_CLIENT_ID is not None,
- "enable_sms": settings.TWILIO_AUTH is not None,
- "enable_spike": settings.SPIKE_ENABLED is True,
- "enable_telegram": settings.TELEGRAM_TOKEN is not None,
- "enable_trello": settings.TRELLO_APP_KEY is not None,
- "enable_victorops": settings.VICTOROPS_ENABLED is True,
- "enable_webhooks": settings.WEBHOOKS_ENABLED is True,
- "enable_whatsapp": settings.TWILIO_USE_WHATSAPP,
- "enable_zulip": settings.ZULIP_ENABLED is True,
- "registration_open": settings.REGISTRATION_OPEN,
- }
-
- return render(request, "front/welcome.html", ctx)
-
-
- def dashboard(request):
- return render(request, "front/dashboard.html", {})
-
-
- def serve_doc(request, doc="introduction"):
- # Filenames in /templates/docs/ consist of lowercase letters and underscores,
- # -- make sure we don't accept anything else
- if not re.match(r"^[a-z_]+$", doc):
- raise Http404("not found")
-
- path = os.path.join(settings.BASE_DIR, "templates/docs", doc + ".html")
- if not os.path.exists(path):
- raise Http404("not found")
-
- content = open(path, "r", encoding="utf-8").read()
-
- if not doc.startswith("self_hosted"):
- replaces = {
- "{{ default_timeout }}": str(int(DEFAULT_TIMEOUT.total_seconds())),
- "{{ default_grace }}": str(int(DEFAULT_GRACE.total_seconds())),
- "SITE_NAME": settings.SITE_NAME,
- "SITE_ROOT": settings.SITE_ROOT,
- "SITE_HOSTNAME": site_hostname(),
- "SITE_SCHEME": site_scheme(),
- "PING_ENDPOINT": settings.PING_ENDPOINT,
- "PING_URL": settings.PING_ENDPOINT + "your-uuid-here",
- "IMG_URL": os.path.join(settings.STATIC_URL, "img/docs"),
- }
-
- for placeholder, value in replaces.items():
- content = content.replace(placeholder, value)
-
- ctx = {
- "page": "docs",
- "section": doc,
- "content": content,
- "first_line": content.split("\n")[0],
- }
-
- return render(request, "front/docs_single.html", ctx)
-
-
- def docs_cron(request):
- return render(request, "front/docs_cron.html", {})
-
-
- @require_POST
- @login_required
- def add_check(request, code):
- project = _get_rw_project_for_user(request, code)
- if project.num_checks_available() <= 0:
- return HttpResponseBadRequest()
-
- check = Check(project=project)
- check.save()
-
- check.assign_all_channels()
-
- url = reverse("hc-details", args=[check.code])
- return redirect(url + "?new")
-
-
- @require_POST
- @login_required
- def update_name(request, code):
- check = _get_rw_check_for_user(request, code)
-
- form = forms.NameTagsForm(request.POST)
- if form.is_valid():
- check.set_name_slug(form.cleaned_data["name"])
- check.tags = form.cleaned_data["tags"]
- check.desc = form.cleaned_data["desc"]
- check.save()
-
- if "/details/" in request.META.get("HTTP_REFERER", ""):
- return redirect("hc-details", code)
-
- return redirect("hc-checks", check.project.code)
-
-
- @require_POST
- @login_required
- def filtering_rules(request, code):
- check = _get_rw_check_for_user(request, code)
-
- form = forms.FilteringRulesForm(request.POST)
- if form.is_valid():
- check.subject = form.cleaned_data["subject"]
- check.subject_fail = form.cleaned_data["subject_fail"]
- check.methods = form.cleaned_data["methods"]
- check.manual_resume = form.cleaned_data["manual_resume"]
- check.save()
-
- return redirect("hc-details", code)
-
-
- @require_POST
- @login_required
- def update_timeout(request, code):
- check = _get_rw_check_for_user(request, code)
-
- kind = request.POST.get("kind")
- if kind == "simple":
- form = forms.TimeoutForm(request.POST)
- if not form.is_valid():
- return HttpResponseBadRequest()
-
- check.kind = "simple"
- check.timeout = form.cleaned_data["timeout"]
- check.grace = form.cleaned_data["grace"]
- elif kind == "cron":
- form = forms.CronForm(request.POST)
- if not form.is_valid():
- return HttpResponseBadRequest()
-
- check.kind = "cron"
- check.schedule = form.cleaned_data["schedule"]
- check.tz = form.cleaned_data["tz"]
- check.grace = td(minutes=form.cleaned_data["grace"])
-
- check.alert_after = check.going_down_after()
- if check.status == "up" and check.alert_after < timezone.now():
- # Checks can flip from "up" to "down" state as a result of changing check's
- # schedule. We don't want to send notifications when changing schedule
- # interactively in the web UI. So we update the `alert_after` and `status`
- # fields here the same way as `sendalerts` would do, but without sending
- # an actual alert:
- check.alert_after = None
- check.status = "down"
-
- check.save()
-
- if "/details/" in request.META.get("HTTP_REFERER", ""):
- return redirect("hc-details", code)
-
- return redirect("hc-checks", check.project.code)
-
-
- @require_POST
- def cron_preview(request):
- schedule = request.POST.get("schedule", "")
- tz = request.POST.get("tz")
- ctx = {"tz": tz, "dates": []}
-
- try:
- zone = pytz.timezone(tz)
- now_local = timezone.localtime(timezone.now(), zone)
-
- it = CronSim(schedule, now_local)
- for i in range(0, 6):
- ctx["dates"].append(next(it))
-
- except UnknownTimeZoneError:
- ctx["bad_tz"] = True
- except CronSimError:
- ctx["bad_schedule"] = True
-
- if ctx["dates"]:
- try:
- descriptor = ExpressionDescriptor(schedule, use_24hour_time_format=True)
- ctx["desc"] = descriptor.get_description()
- except:
- # We assume the schedule is valid if cronsim accepts it.
- # If cron-descriptor throws an exception, don't show the description
- # to the user.
- pass
-
- return render(request, "front/cron_preview.html", ctx)
-
-
- @login_required
- def ping_details(request, code, n=None):
- check, rw = _get_check_for_user(request, code)
- q = Ping.objects.filter(owner=check)
- if n:
- q = q.filter(n=n)
-
- try:
- ping = q.latest("created")
- except Ping.DoesNotExist:
- return render(request, "front/ping_details_not_found.html")
-
- ctx = {"check": check, "ping": ping, "plain": None, "html": None}
-
- if ping.scheme == "email":
- parsed = email.message_from_string(ping.body, policy=email.policy.SMTP)
- ctx["subject"] = parsed.get("subject", "")
-
- plain_mime_part = parsed.get_body(("plain",))
- if plain_mime_part:
- ctx["plain"] = plain_mime_part.get_content()
-
- html_mime_part = parsed.get_body(("html",))
- if html_mime_part:
- ctx["html"] = html_mime_part.get_content()
-
- return render(request, "front/ping_details.html", ctx)
-
-
- @require_POST
- @login_required
- def pause(request, code):
- check = _get_rw_check_for_user(request, code)
-
- check.status = "paused"
- check.last_start = None
- check.alert_after = None
- check.save()
-
- # After pausing a check we must check if all checks are up,
- # and Profile.next_nag_date needs to be cleared out:
- check.project.update_next_nag_dates()
-
- # Don't redirect after an AJAX request:
- if request.META.get("HTTP_X_REQUESTED_WITH") == "XMLHttpRequest":
- return HttpResponse()
-
- return redirect("hc-details", code)
-
-
- @require_POST
- @login_required
- def resume(request, code):
- check = _get_rw_check_for_user(request, code)
-
- check.status = "new"
- check.last_start = None
- check.last_ping = None
- check.alert_after = None
- check.save()
-
- return redirect("hc-details", code)
-
-
- @require_POST
- @login_required
- def remove_check(request, code):
- check = _get_rw_check_for_user(request, code)
-
- project = check.project
- check.delete()
- return redirect("hc-checks", project.code)
-
-
- def _get_events(check, limit):
- pings = Ping.objects.filter(owner=check).order_by("-id")[:limit]
- pings = list(pings)
-
- prev = None
- for ping in reversed(pings):
- if ping.kind != "start" and prev and prev.kind == "start":
- delta = ping.created - prev.created
- if delta < MAX_DELTA:
- setattr(ping, "delta", delta)
-
- prev = ping
-
- alerts = []
- if len(pings):
- cutoff = pings[-1].created
- alerts = Notification.objects.select_related("channel").filter(
- owner=check, check_status="down", created__gt=cutoff
- )
-
- events = pings + list(alerts)
- events.sort(key=lambda el: el.created, reverse=True)
- return events
-
-
- @login_required
- def log(request, code):
- check, rw = _get_check_for_user(request, code)
-
- limit = check.project.owner_profile.ping_log_limit
- ctx = {
- "project": check.project,
- "check": check,
- "events": _get_events(check, limit),
- "limit": limit,
- "show_limit_notice": check.n_pings > limit and settings.USE_PAYMENTS,
- }
-
- return render(request, "front/log.html", ctx)
-
-
- @login_required
- def details(request, code):
- _refresh_last_active_date(request.profile)
- check, rw = _get_check_for_user(request, code)
-
- if request.GET.get("urls") in ("uuid", "slug") and rw:
- check.project.show_slugs = request.GET["urls"] == "slug"
- check.project.save()
-
- channels = Channel.objects.filter(project=check.project)
- channels = list(channels.order_by("created"))
-
- all_tags = set()
- q = Check.objects.filter(project=check.project).exclude(tags="")
- for tags in q.values_list("tags", flat=True):
- all_tags.update(tags.split(" "))
-
- ctx = {
- "page": "details",
- "project": check.project,
- "check": check,
- "rw": rw,
- "channels": channels,
- "enabled_channels": list(check.channel_set.all()),
- "timezones": pytz.all_timezones,
- "downtimes": check.downtimes(months=3),
- "is_new": "new" in request.GET,
- "is_copied": "copied" in request.GET,
- "all_tags": " ".join(sorted(all_tags)),
- }
-
- return render(request, "front/details.html", ctx)
-
-
- @login_required
- def uncloak(request, unique_key):
- for check in request.profile.checks_from_all_projects().only("code"):
- if check.unique_key == unique_key:
- return redirect("hc-details", check.code)
-
- raise Http404("not found")
-
-
- @login_required
- def transfer(request, code):
- check = _get_rw_check_for_user(request, code)
-
- if request.method == "POST":
- target_project = _get_rw_project_for_user(request, request.POST["project"])
- if target_project.num_checks_available() <= 0:
- return HttpResponseBadRequest()
-
- check.project = target_project
- check.save()
- check.assign_all_channels()
-
- messages.success(request, "Check transferred successfully!")
- return redirect("hc-details", code)
-
- ctx = {"check": check}
- return render(request, "front/transfer_modal.html", ctx)
-
-
- @require_POST
- @login_required
- def copy(request, code):
- check = _get_rw_check_for_user(request, code)
-
- if check.project.num_checks_available() <= 0:
- return HttpResponseBadRequest()
-
- new_name = check.name + " (copy)"
- # Make sure we don't exceed the 100 character db field limit:
- if len(new_name) > 100:
- new_name = check.name[:90] + "... (copy)"
-
- copied = Check(project=check.project)
- copied.set_name_slug(new_name)
- copied.desc, copied.tags = check.desc, check.tags
- copied.subject, copied.subject_fail = check.subject, check.subject_fail
- copied.methods = check.methods
- copied.manual_resume = check.manual_resume
-
- copied.kind = check.kind
- copied.timeout, copied.grace = check.timeout, check.grace
- copied.schedule, copied.tz = check.schedule, check.tz
- copied.save()
-
- copied.channel_set.add(*check.channel_set.all())
-
- url = reverse("hc-details", args=[copied.code])
- return redirect(url + "?copied")
-
-
- @login_required
- def status_single(request, code):
- check, rw = _get_check_for_user(request, code)
-
- status = check.get_status()
- events = _get_events(check, 20)
- updated = "1"
- if len(events):
- updated = str(events[0].created.timestamp())
-
- doc = {
- "status": status,
- "status_text": STATUS_TEXT_TMPL.render({"check": check, "rw": rw}),
- "title": down_title(check),
- "updated": updated,
- }
-
- if updated != request.GET.get("u"):
- doc["events"] = EVENTS_TMPL.render({"check": check, "events": events})
- doc["downtimes"] = DOWNTIMES_TMPL.render({"downtimes": check.downtimes(3)})
-
- return JsonResponse(doc)
-
-
- @login_required
- def badges(request, code):
- project, rw = _get_project_for_user(request, code)
-
- tags = set()
- for check in Check.objects.filter(project=project):
- tags.update(check.tags_list())
-
- sorted_tags = sorted(tags, key=lambda s: s.lower())
- sorted_tags.append("*") # For the "overall status" badge
-
- key = project.badge_key
- urls = []
- for tag in sorted_tags:
- urls.append(
- {
- "tag": tag,
- "svg": get_badge_url(key, tag),
- "svg3": get_badge_url(key, tag, with_late=True),
- "json": get_badge_url(key, tag, fmt="json"),
- "json3": get_badge_url(key, tag, fmt="json", with_late=True),
- "shields": get_badge_url(key, tag, fmt="shields"),
- "shields3": get_badge_url(key, tag, fmt="shields", with_late=True),
- }
- )
-
- ctx = {
- "have_tags": len(urls) > 1,
- "page": "badges",
- "project": project,
- "badges": urls,
- }
-
- return render(request, "front/badges.html", ctx)
-
-
- @login_required
- def channels(request, code):
- project, rw = _get_project_for_user(request, code)
-
- if request.method == "POST":
- if not rw:
- return HttpResponseForbidden()
-
- code = request.POST["channel"]
- try:
- channel = Channel.objects.get(code=code)
- except Channel.DoesNotExist:
- return HttpResponseBadRequest()
- if channel.project_id != project.id:
- return HttpResponseForbidden()
-
- new_checks = []
- for key in request.POST:
- if key.startswith("check-"):
- code = key[6:]
- try:
- check = Check.objects.get(code=code)
- except Check.DoesNotExist:
- return HttpResponseBadRequest()
- if check.project_id != project.id:
- return HttpResponseForbidden()
- new_checks.append(check)
-
- channel.checks.set(new_checks)
- return redirect("hc-channels", project.code)
-
- channels = Channel.objects.filter(project=project)
- channels = channels.order_by("created")
- channels = channels.annotate(n_checks=Count("checks"))
-
- ctx = {
- "page": "channels",
- "rw": rw,
- "project": project,
- "profile": project.owner_profile,
- "channels": channels,
- "enable_apprise": settings.APPRISE_ENABLED is True,
- "enable_call": settings.TWILIO_AUTH is not None,
- "enable_discord": settings.DISCORD_CLIENT_ID is not None,
- "enable_linenotify": settings.LINENOTIFY_CLIENT_ID is not None,
- "enable_matrix": settings.MATRIX_ACCESS_TOKEN is not None,
- "enable_mattermost": settings.MATTERMOST_ENABLED is True,
- "enable_msteams": settings.MSTEAMS_ENABLED is True,
- "enable_opsgenie": settings.OPSGENIE_ENABLED is True,
- "enable_pagertree": settings.PAGERTREE_ENABLED is True,
- "enable_pd": settings.PD_ENABLED is True,
- "enable_prometheus": settings.PROMETHEUS_ENABLED is True,
- "enable_pushbullet": settings.PUSHBULLET_CLIENT_ID is not None,
- "enable_pushover": settings.PUSHOVER_API_TOKEN is not None,
- "enable_shell": settings.SHELL_ENABLED is True,
- "enable_signal": settings.SIGNAL_CLI_ENABLED is True,
- "enable_slack": settings.SLACK_ENABLED is True,
- "enable_slack_btn": settings.SLACK_CLIENT_ID is not None,
- "enable_sms": settings.TWILIO_AUTH is not None,
- "enable_spike": settings.SPIKE_ENABLED is True,
- "enable_telegram": settings.TELEGRAM_TOKEN is not None,
- "enable_trello": settings.TRELLO_APP_KEY is not None,
- "enable_victorops": settings.VICTOROPS_ENABLED is True,
- "enable_webhooks": settings.WEBHOOKS_ENABLED is True,
- "enable_whatsapp": settings.TWILIO_USE_WHATSAPP,
- "enable_zulip": settings.ZULIP_ENABLED is True,
- "use_payments": settings.USE_PAYMENTS,
- }
-
- return render(request, "front/channels.html", ctx)
-
-
- @login_required
- def channel_checks(request, code):
- channel = _get_rw_channel_for_user(request, code)
-
- assigned = set(channel.checks.values_list("code", flat=True).distinct())
- checks = Check.objects.filter(project=channel.project).order_by("created")
-
- ctx = {"checks": checks, "assigned": assigned, "channel": channel}
-
- return render(request, "front/channel_checks.html", ctx)
-
-
- @require_POST
- @login_required
- def update_channel_name(request, code):
- channel = _get_rw_channel_for_user(request, code)
-
- form = forms.ChannelNameForm(request.POST)
- if form.is_valid():
- channel.name = form.cleaned_data["name"]
- channel.save()
-
- return redirect("hc-channels", channel.project.code)
-
-
- def verify_email(request, code, token):
- channel = get_object_or_404(Channel, code=code)
- if channel.make_token() == token:
- channel.email_verified = True
- channel.save()
- return render(request, "front/verify_email_success.html")
-
- return render(request, "bad_link.html")
-
-
- @csrf_exempt
- def unsubscribe_email(request, code, signed_token):
- ctx = {}
-
- # Some email servers open links in emails to check for malicious content.
- # To work around this, on GET requests we serve a confirmation form.
- # If the signature is at least 5 minutes old, we also include JS code to
- # auto-submit the form.
- signer = signing.TimestampSigner(salt="alerts")
-
- # First, check the signature without looking at the timestamp:
- try:
- token = signer.unsign(signed_token)
- except signing.BadSignature:
- return render(request, "bad_link.html")
-
- # Then, check if timestamp is older than 5 minutes:
- try:
- signer.unsign(signed_token, max_age=300)
- except signing.SignatureExpired:
- ctx["autosubmit"] = True
-
- channel = get_object_or_404(Channel, code=code, kind="email")
- if channel.make_token() != token:
- return render(request, "bad_link.html")
-
- if request.method != "POST":
- return render(request, "accounts/unsubscribe_submit.html", ctx)
-
- channel.delete()
- return render(request, "front/unsubscribe_success.html")
-
-
- @require_POST
- @login_required
- def send_test_notification(request, code):
- channel, rw = _get_channel_for_user(request, code)
-
- dummy = Check(name="TEST", status="down", project=channel.project)
- dummy.last_ping = timezone.now() - td(days=1)
- dummy.n_pings = 42
-
- if channel.kind == "webhook" and not channel.url_down:
- if channel.url_up:
- # If we don't have url_down, but do have have url_up then
- # send "TEST is UP" notification instead:
- dummy.status = "up"
-
- # Delete all older test notifications for this channel
- Notification.objects.filter(channel=channel, owner=None).delete()
-
- # Send the test notification
- error = channel.notify(dummy, is_test=True)
-
- if error:
- messages.warning(request, "Could not send a test notification. %s" % error)
- else:
- messages.success(request, "Test notification sent!")
-
- return redirect("hc-channels", channel.project.code)
-
-
- @require_POST
- @login_required
- def remove_channel(request, code):
- channel = _get_rw_channel_for_user(request, code)
- project = channel.project
- channel.delete()
-
- return redirect("hc-channels", project.code)
-
-
- @login_required
- def email_form(request, channel=None, code=None):
- """ Add email integration or edit an existing email integration. """
-
- is_new = channel is None
- if is_new:
- project = _get_rw_project_for_user(request, code)
- channel = Channel(project=project, kind="email")
-
- if request.method == "POST":
- form = forms.EmailForm(request.POST)
- if form.is_valid():
- if form.cleaned_data["value"] != channel.email_value:
- if not settings.EMAIL_USE_VERIFICATION:
- # In self-hosted setting, administator can set
- # EMAIL_USE_VERIFICATION=False to disable email verification
- channel.email_verified = True
- elif form.cleaned_data["value"] == request.user.email:
- # If the user is adding *their own* address
- # we skip the verification step
- channel.email_verified = True
- else:
- channel.email_verified = False
-
- channel.value = form.get_value()
- channel.save()
-
- if is_new:
- channel.assign_all_checks()
-
- if not channel.email_verified:
- channel.send_verify_link()
-
- return redirect("hc-channels", channel.project.code)
- elif is_new:
- form = forms.EmailForm()
- else:
- form = forms.EmailForm(
- {
- "value": channel.email_value,
- "up": channel.email_notify_up,
- "down": channel.email_notify_down,
- }
- )
-
- ctx = {
- "page": "channels",
- "project": channel.project,
- "use_verification": settings.EMAIL_USE_VERIFICATION,
- "form": form,
- "is_new": is_new,
- }
- return render(request, "integrations/email_form.html", ctx)
-
-
- @login_required
- def edit_channel(request, code):
- channel = _get_rw_channel_for_user(request, code)
- if channel.kind == "email":
- return email_form(request, channel=channel)
- if channel.kind == "webhook":
- return webhook_form(request, channel=channel)
- if channel.kind == "sms":
- return sms_form(request, channel=channel)
- if channel.kind == "signal":
- return signal_form(request, channel=channel)
- if channel.kind == "whatsapp":
- return whatsapp_form(request, channel=channel)
-
- return HttpResponseBadRequest()
-
-
- @require_setting("WEBHOOKS_ENABLED")
- @login_required
- def webhook_form(request, channel=None, code=None):
- is_new = channel is None
- if is_new:
- project = _get_rw_project_for_user(request, code)
- channel = Channel(project=project, kind="webhook")
-
- if request.method == "POST":
- form = forms.WebhookForm(request.POST)
- if form.is_valid():
- channel.name = form.cleaned_data["name"]
- channel.value = form.get_value()
- channel.save()
-
- if is_new:
- channel.assign_all_checks()
-
- return redirect("hc-channels", channel.project.code)
-
- elif is_new:
- form = forms.WebhookForm()
- else:
-
- def flatten(d):
- return "\n".join("%s: %s" % pair for pair in d.items())
-
- doc = json.loads(channel.value)
- doc["headers_down"] = flatten(doc["headers_down"])
- doc["headers_up"] = flatten(doc["headers_up"])
- doc["name"] = channel.name
- form = forms.WebhookForm(doc)
-
- ctx = {
- "page": "channels",
- "project": channel.project,
- "form": form,
- "is_new": is_new,
- }
- return render(request, "integrations/webhook_form.html", ctx)
-
-
- @require_setting("SHELL_ENABLED")
- @login_required
- def add_shell(request, code):
- project = _get_rw_project_for_user(request, code)
- if request.method == "POST":
- form = forms.AddShellForm(request.POST)
- if form.is_valid():
- channel = Channel(project=project, kind="shell")
- channel.value = form.get_value()
- channel.save()
-
- channel.assign_all_checks()
- return redirect("hc-channels", project.code)
- else:
- form = forms.AddShellForm()
-
- ctx = {
- "page": "channels",
- "project": project,
- "form": form,
- }
- return render(request, "integrations/add_shell.html", ctx)
-
-
- @require_setting("PD_ENABLED")
- @login_required
- def add_pd(request, code):
- project = _get_rw_project_for_user(request, code)
-
- # Simple Install Flow
- if settings.PD_APP_ID:
- state = token_urlsafe()
-
- redirect_url = settings.SITE_ROOT + reverse("hc-add-pd-complete")
- redirect_url += "?" + urlencode({"state": state})
-
- install_url = "https://app.pagerduty.com/install/integration?" + urlencode(
- {"app_id": settings.PD_APP_ID, "redirect_url": redirect_url, "version": "2"}
- )
-
- ctx = {"page": "channels", "project": project, "install_url": install_url}
- request.session["pagerduty"] = (state, str(project.code))
- return render(request, "integrations/add_pd_simple.html", ctx)
-
- if request.method == "POST":
- form = forms.AddPdForm(request.POST)
- if form.is_valid():
- channel = Channel(project=project, kind="pd")
- channel.value = form.cleaned_data["value"]
- channel.save()
-
- channel.assign_all_checks()
- return redirect("hc-channels", project.code)
- else:
- form = forms.AddPdForm()
-
- ctx = {"page": "channels", "project": project, "form": form}
- return render(request, "integrations/add_pd.html", ctx)
-
-
- @require_setting("PD_ENABLED")
- @require_setting("PD_APP_ID")
- @login_required
- def add_pd_complete(request):
- if "pagerduty" not in request.session:
- return HttpResponseBadRequest()
-
- state, code = request.session.pop("pagerduty")
- if request.GET.get("state") != state:
- return HttpResponseForbidden()
-
- project = _get_rw_project_for_user(request, code)
-
- doc = json.loads(request.GET["config"])
- for item in doc["integration_keys"]:
- channel = Channel(kind="pd", project=project)
- channel.name = item["name"]
- channel.value = json.dumps(
- {"service_key": item["integration_key"], "account": doc["account"]["name"]}
- )
- channel.save()
- channel.assign_all_checks()
-
- messages.success(request, "The PagerDuty integration has been added!")
- return redirect("hc-channels", project.code)
-
-
- @require_setting("PD_ENABLED")
- @require_setting("PD_APP_ID")
- def pd_help(request):
- ctx = {"page": "channels"}
- return render(request, "integrations/add_pd_simple.html", ctx)
-
-
- @require_setting("PAGERTREE_ENABLED")
- @login_required
- def add_pagertree(request, code):
- project = _get_rw_project_for_user(request, code)
-
- if request.method == "POST":
- form = forms.AddUrlForm(request.POST)
- if form.is_valid():
- channel = Channel(project=project, kind="pagertree")
- channel.value = form.cleaned_data["value"]
- channel.save()
-
- channel.assign_all_checks()
- return redirect("hc-channels", project.code)
- else:
- form = forms.AddUrlForm()
-
- ctx = {"page": "channels", "project": project, "form": form}
- return render(request, "integrations/add_pagertree.html", ctx)
-
-
- @require_setting("SLACK_ENABLED")
- @login_required
- def add_slack(request, code):
- project = _get_rw_project_for_user(request, code)
-
- if request.method == "POST":
- form = forms.AddUrlForm(request.POST)
- if form.is_valid():
- channel = Channel(project=project, kind="slack")
- channel.value = form.cleaned_data["value"]
- channel.save()
-
- channel.assign_all_checks()
- return redirect("hc-channels", project.code)
- else:
- form = forms.AddUrlForm()
-
- ctx = {
- "page": "channels",
- "form": form,
- }
-
- return render(request, "integrations/add_slack.html", ctx)
-
-
- @require_setting("SLACK_ENABLED")
- @require_setting("SLACK_CLIENT_ID")
- def slack_help(request):
- ctx = {"page": "channels"}
- return render(request, "integrations/add_slack_btn.html", ctx)
-
-
- @require_setting("SLACK_ENABLED")
- @require_setting("SLACK_CLIENT_ID")
- @login_required
- def add_slack_btn(request, code):
- project = _get_rw_project_for_user(request, code)
-
- state = token_urlsafe()
- authorize_url = "https://slack.com/oauth/v2/authorize?" + urlencode(
- {
- "scope": "incoming-webhook",
- "client_id": settings.SLACK_CLIENT_ID,
- "state": state,
- }
- )
-
- ctx = {
- "project": project,
- "page": "channels",
- "authorize_url": authorize_url,
- }
-
- request.session["add_slack"] = (state, str(project.code))
- return render(request, "integrations/add_slack_btn.html", ctx)
-
-
- @require_setting("SLACK_ENABLED")
- @require_setting("SLACK_CLIENT_ID")
- @login_required
- def add_slack_complete(request):
- if "add_slack" not in request.session:
- return HttpResponseForbidden()
-
- state, code = request.session.pop("add_slack")
- project = _get_rw_project_for_user(request, code)
- if request.GET.get("error") == "access_denied":
- messages.warning(request, "Slack setup was cancelled.")
- return redirect("hc-channels", project.code)
-
- if request.GET.get("state") != state:
- return HttpResponseForbidden()
-
- result = requests.post(
- "https://slack.com/api/oauth.v2.access",
- {
- "client_id": settings.SLACK_CLIENT_ID,
- "client_secret": settings.SLACK_CLIENT_SECRET,
- "code": request.GET.get("code"),
- },
- )
-
- doc = result.json()
- if doc.get("ok"):
- channel = Channel(kind="slack", project=project)
- channel.value = result.text
- channel.save()
- channel.assign_all_checks()
- messages.success(request, "The Slack integration has been added!")
- else:
- s = doc.get("error")
- messages.warning(request, "Error message from slack: %s" % s)
-
- return redirect("hc-channels", project.code)
-
-
- @require_setting("MATTERMOST_ENABLED")
- @login_required
- def add_mattermost(request, code):
- project = _get_rw_project_for_user(request, code)
-
- if request.method == "POST":
- form = forms.AddUrlForm(request.POST)
- if form.is_valid():
- channel = Channel(project=project, kind="mattermost")
- channel.value = form.cleaned_data["value"]
- channel.save()
-
- channel.assign_all_checks()
- return redirect("hc-channels", project.code)
- else:
- form = forms.AddUrlForm()
-
- ctx = {"page": "channels", "form": form, "project": project}
- return render(request, "integrations/add_mattermost.html", ctx)
-
-
- @require_setting("PUSHBULLET_CLIENT_ID")
- @login_required
- def add_pushbullet(request, code):
- project = _get_rw_project_for_user(request, code)
-
- state = token_urlsafe()
- authorize_url = "https://www.pushbullet.com/authorize?" + urlencode(
- {
- "client_id": settings.PUSHBULLET_CLIENT_ID,
- "redirect_uri": settings.SITE_ROOT + reverse(add_pushbullet_complete),
- "response_type": "code",
- "state": state,
- }
- )
-
- ctx = {
- "page": "channels",
- "project": project,
- "authorize_url": authorize_url,
- }
-
- request.session["add_pushbullet"] = (state, str(project.code))
- return render(request, "integrations/add_pushbullet.html", ctx)
-
-
- @require_setting("PUSHBULLET_CLIENT_ID")
- @login_required
- def add_pushbullet_complete(request):
- if "add_pushbullet" not in request.session:
- return HttpResponseForbidden()
-
- state, code = request.session.pop("add_pushbullet")
- project = _get_rw_project_for_user(request, code)
-
- if request.GET.get("error") == "access_denied":
- messages.warning(request, "Pushbullet setup was cancelled.")
- return redirect("hc-channels", project.code)
-
- if request.GET.get("state") != state:
- return HttpResponseForbidden()
-
- result = requests.post(
- "https://api.pushbullet.com/oauth2/token",
- {
- "client_id": settings.PUSHBULLET_CLIENT_ID,
- "client_secret": settings.PUSHBULLET_CLIENT_SECRET,
- "code": request.GET.get("code"),
- "grant_type": "authorization_code",
- },
- )
-
- doc = result.json()
- if "access_token" in doc:
- channel = Channel(kind="pushbullet", project=project)
- channel.value = doc["access_token"]
- channel.save()
- channel.assign_all_checks()
- messages.success(request, "The Pushbullet integration has been added!")
- else:
- messages.warning(request, "Something went wrong")
-
- return redirect("hc-channels", project.code)
-
-
- @require_setting("DISCORD_CLIENT_ID")
- @login_required
- def add_discord(request, code):
- project = _get_rw_project_for_user(request, code)
- state = token_urlsafe()
- auth_url = "https://discordapp.com/api/oauth2/authorize?" + urlencode(
- {
- "client_id": settings.DISCORD_CLIENT_ID,
- "scope": "webhook.incoming",
- "redirect_uri": settings.SITE_ROOT + reverse(add_discord_complete),
- "response_type": "code",
- "state": state,
- }
- )
-
- ctx = {"page": "channels", "project": project, "authorize_url": auth_url}
-
- request.session["add_discord"] = (state, str(project.code))
- return render(request, "integrations/add_discord.html", ctx)
-
-
- @require_setting("DISCORD_CLIENT_ID")
- @login_required
- def add_discord_complete(request):
- if "add_discord" not in request.session:
- return HttpResponseForbidden()
-
- state, code = request.session.pop("add_discord")
- project = _get_rw_project_for_user(request, code)
-
- if request.GET.get("error") == "access_denied":
- messages.warning(request, "Discord setup was cancelled.")
- return redirect("hc-channels", project.code)
-
- if request.GET.get("state") != state:
- return HttpResponseForbidden()
-
- result = requests.post(
- "https://discordapp.com/api/oauth2/token",
- {
- "client_id": settings.DISCORD_CLIENT_ID,
- "client_secret": settings.DISCORD_CLIENT_SECRET,
- "code": request.GET.get("code"),
- "grant_type": "authorization_code",
- "redirect_uri": settings.SITE_ROOT + reverse(add_discord_complete),
- },
- )
-
- doc = result.json()
- if "access_token" in doc:
- channel = Channel(kind="discord", project=project)
- channel.value = result.text
- channel.save()
- channel.assign_all_checks()
- messages.success(request, "The Discord integration has been added!")
- else:
- messages.warning(request, "Something went wrong.")
-
- return redirect("hc-channels", project.code)
-
-
- @require_setting("PUSHOVER_API_TOKEN")
- def pushover_help(request):
- ctx = {"page": "channels"}
- return render(request, "integrations/add_pushover_help.html", ctx)
-
-
- @require_setting("PUSHOVER_API_TOKEN")
- @login_required
- def add_pushover(request, code):
- project = _get_rw_project_for_user(request, code)
-
- if request.method == "POST":
- state = token_urlsafe()
-
- failure_url = settings.SITE_ROOT + reverse("hc-channels", args=[project.code])
- success_url = (
- settings.SITE_ROOT
- + reverse("hc-add-pushover", args=[project.code])
- + "?"
- + urlencode(
- {
- "state": state,
- "prio": request.POST.get("po_priority", "0"),
- "prio_up": request.POST.get("po_priority_up", "0"),
- }
- )
- )
- subscription_url = (
- settings.PUSHOVER_SUBSCRIPTION_URL
- + "?"
- + urlencode({"success": success_url, "failure": failure_url})
- )
-
- request.session["pushover"] = state
- return redirect(subscription_url)
-
- # Handle successful subscriptions
- if "pushover_user_key" in request.GET:
- if "pushover" not in request.session:
- return HttpResponseForbidden()
-
- state = request.session.pop("pushover")
- if request.GET.get("state") != state:
- return HttpResponseForbidden()
-
- if request.GET.get("pushover_unsubscribed") == "1":
- # Unsubscription: delete all Pushover channels for this project
- Channel.objects.filter(project=project, kind="po").delete()
- return redirect("hc-channels", project.code)
-
- form = forms.AddPushoverForm(request.GET)
- if not form.is_valid():
- return HttpResponseBadRequest()
-
- channel = Channel(project=project, kind="po")
- channel.value = form.get_value()
- channel.save()
- channel.assign_all_checks()
-
- messages.success(request, "The Pushover integration has been added!")
- return redirect("hc-channels", project.code)
-
- # Show Integration Settings form
- ctx = {
- "page": "channels",
- "project": project,
- "po_retry_delay": td(seconds=settings.PUSHOVER_EMERGENCY_RETRY_DELAY),
- "po_expiration": td(seconds=settings.PUSHOVER_EMERGENCY_EXPIRATION),
- }
- return render(request, "integrations/add_pushover.html", ctx)
-
-
- @require_setting("OPSGENIE_ENABLED")
- @login_required
- def add_opsgenie(request, code):
- project = _get_rw_project_for_user(request, code)
-
- if request.method == "POST":
- form = forms.AddOpsgenieForm(request.POST)
- if form.is_valid():
- channel = Channel(project=project, kind="opsgenie")
- v = {"region": form.cleaned_data["region"], "key": form.cleaned_data["key"]}
- channel.value = json.dumps(v)
- channel.save()
-
- channel.assign_all_checks()
- return redirect("hc-channels", project.code)
- else:
- form = forms.AddOpsgenieForm()
-
- ctx = {"page": "channels", "project": project, "form": form}
- return render(request, "integrations/add_opsgenie.html", ctx)
-
-
- @require_setting("VICTOROPS_ENABLED")
- @login_required
- def add_victorops(request, code):
- project = _get_rw_project_for_user(request, code)
-
- if request.method == "POST":
- form = forms.AddUrlForm(request.POST)
- if form.is_valid():
- channel = Channel(project=project, kind="victorops")
- channel.value = form.cleaned_data["value"]
- channel.save()
-
- channel.assign_all_checks()
- return redirect("hc-channels", project.code)
- else:
- form = forms.AddUrlForm()
-
- ctx = {"page": "channels", "project": project, "form": form}
- return render(request, "integrations/add_victorops.html", ctx)
-
-
- @require_setting("ZULIP_ENABLED")
- @login_required
- def add_zulip(request, code):
- project = _get_rw_project_for_user(request, code)
-
- if request.method == "POST":
- form = forms.AddZulipForm(request.POST)
- if form.is_valid():
- channel = Channel(project=project, kind="zulip")
- channel.value = form.get_value()
- channel.save()
-
- channel.assign_all_checks()
- return redirect("hc-channels", project.code)
- else:
- form = forms.AddZulipForm()
-
- ctx = {"page": "channels", "project": project, "form": form}
- return render(request, "integrations/add_zulip.html", ctx)
-
-
- @csrf_exempt
- @require_POST
- def telegram_bot(request):
- try:
- doc = json.loads(request.body.decode())
- jsonschema.validate(doc, telegram_callback)
- except ValueError:
- return HttpResponseBadRequest()
- except jsonschema.ValidationError:
- # We don't recognize the message format, but don't want Telegram
- # retrying this over and over again, so respond with 200 OK
- return HttpResponse()
-
- if "/start" not in doc["message"]["text"]:
- return HttpResponse()
-
- chat = doc["message"]["chat"]
- name = max(chat.get("title", ""), chat.get("username", ""))
-
- invite = render_to_string(
- "integrations/telegram_invite.html",
- {"qs": signing.dumps((chat["id"], chat["type"], name))},
- )
-
- Telegram.send(chat["id"], invite)
- return HttpResponse()
-
-
- @require_setting("TELEGRAM_TOKEN")
- def telegram_help(request):
- ctx = {
- "page": "channels",
- "bot_name": settings.TELEGRAM_BOT_NAME,
- }
-
- return render(request, "integrations/add_telegram.html", ctx)
-
-
- @require_setting("TELEGRAM_TOKEN")
- @login_required
- def add_telegram(request):
- chat_id, chat_type, chat_name = None, None, None
- qs = request.META["QUERY_STRING"]
- if qs:
- try:
- chat_id, chat_type, chat_name = signing.loads(qs, max_age=600)
- except signing.BadSignature:
- return render(request, "bad_link.html")
-
- if request.method == "POST":
- project = _get_rw_project_for_user(request, request.POST.get("project"))
- channel = Channel(project=project, kind="telegram")
- channel.value = json.dumps(
- {"id": chat_id, "type": chat_type, "name": chat_name}
- )
- channel.save()
-
- channel.assign_all_checks()
- messages.success(request, "The Telegram integration has been added!")
- return redirect("hc-channels", project.code)
-
- ctx = {
- "page": "channels",
- "projects": request.profile.projects(),
- "chat_id": chat_id,
- "chat_type": chat_type,
- "chat_name": chat_name,
- "bot_name": settings.TELEGRAM_BOT_NAME,
- }
-
- return render(request, "integrations/add_telegram.html", ctx)
-
-
- @require_setting("TWILIO_AUTH")
- @login_required
- def sms_form(request, channel=None, code=None):
- is_new = channel is None
- if is_new:
- project = _get_rw_project_for_user(request, code)
- channel = Channel(project=project, kind="sms")
-
- if request.method == "POST":
- form = forms.PhoneUpDownForm(request.POST)
- if form.is_valid():
- channel.name = form.cleaned_data["label"]
- channel.value = form.get_json()
- channel.save()
-
- if is_new:
- channel.assign_all_checks()
- return redirect("hc-channels", channel.project.code)
- elif is_new:
- form = forms.PhoneUpDownForm(initial={"up": False})
- else:
- form = forms.PhoneUpDownForm(
- {
- "label": channel.name,
- "phone": channel.phone_number,
- "up": channel.sms_notify_up,
- "down": channel.sms_notify_down,
- }
- )
-
- ctx = {
- "page": "channels",
- "project": channel.project,
- "form": form,
- "profile": channel.project.owner_profile,
- "is_new": is_new,
- }
- return render(request, "integrations/sms_form.html", ctx)
-
-
- @require_setting("TWILIO_AUTH")
- @login_required
- def add_call(request, code):
- project = _get_rw_project_for_user(request, code)
- if request.method == "POST":
- form = forms.PhoneNumberForm(request.POST)
- if form.is_valid():
- channel = Channel(project=project, kind="call")
- channel.name = form.cleaned_data["label"]
- channel.value = form.get_json()
- channel.save()
-
- channel.assign_all_checks()
- return redirect("hc-channels", project.code)
- else:
- form = forms.PhoneNumberForm()
-
- ctx = {
- "page": "channels",
- "project": project,
- "form": form,
- "profile": project.owner_profile,
- }
- return render(request, "integrations/add_call.html", ctx)
-
-
- @require_setting("TWILIO_USE_WHATSAPP")
- @login_required
- def whatsapp_form(request, channel=None, code=None):
- is_new = channel is None
- if is_new:
- project = _get_rw_project_for_user(request, code)
- channel = Channel(project=project, kind="whatsapp")
-
- if request.method == "POST":
- form = forms.PhoneUpDownForm(request.POST)
- if form.is_valid():
- channel.name = form.cleaned_data["label"]
- channel.value = form.get_json()
- channel.save()
-
- if is_new:
- channel.assign_all_checks()
- return redirect("hc-channels", channel.project.code)
- elif is_new:
- form = forms.PhoneUpDownForm()
- else:
- form = forms.PhoneUpDownForm(
- {
- "label": channel.name,
- "phone": channel.phone_number,
- "up": channel.whatsapp_notify_up,
- "down": channel.whatsapp_notify_down,
- }
- )
-
- ctx = {
- "page": "channels",
- "project": channel.project,
- "form": form,
- "profile": channel.project.owner_profile,
- "is_new": is_new,
- }
- return render(request, "integrations/whatsapp_form.html", ctx)
-
-
- @require_setting("SIGNAL_CLI_ENABLED")
- @login_required
- def signal_form(request, channel=None, code=None):
- is_new = channel is None
- if is_new:
- project = _get_rw_project_for_user(request, code)
- channel = Channel(project=project, kind="signal")
-
- if request.method == "POST":
- form = forms.PhoneUpDownForm(request.POST)
- if form.is_valid():
- channel.name = form.cleaned_data["label"]
- channel.value = form.get_json()
- channel.save()
-
- if is_new:
- channel.assign_all_checks()
- return redirect("hc-channels", channel.project.code)
- elif is_new:
- form = forms.PhoneUpDownForm()
- else:
- form = forms.PhoneUpDownForm(
- {
- "label": channel.name,
- "phone": channel.phone_number,
- "up": channel.signal_notify_up,
- "down": channel.signal_notify_down,
- }
- )
-
- ctx = {
- "page": "channels",
- "project": channel.project,
- "form": form,
- "is_new": is_new,
- }
- return render(request, "integrations/signal_form.html", ctx)
-
-
- @require_setting("TRELLO_APP_KEY")
- @login_required
- def add_trello(request, code):
- project = _get_rw_project_for_user(request, code)
- if request.method == "POST":
- form = forms.AddTrelloForm(request.POST)
- if not form.is_valid():
- return HttpResponseBadRequest()
-
- channel = Channel(project=project, kind="trello")
- channel.value = form.get_value()
- channel.save()
-
- channel.assign_all_checks()
- return redirect("hc-channels", project.code)
-
- return_url = settings.SITE_ROOT + reverse("hc-add-trello", args=[project.code])
- authorize_url = "https://trello.com/1/authorize?" + urlencode(
- {
- "expiration": "never",
- "name": settings.SITE_NAME,
- "scope": "read,write",
- "response_type": "token",
- "key": settings.TRELLO_APP_KEY,
- "return_url": return_url,
- }
- )
-
- ctx = {
- "page": "channels",
- "project": project,
- "authorize_url": authorize_url,
- }
-
- return render(request, "integrations/add_trello.html", ctx)
-
-
- @require_setting("MATRIX_ACCESS_TOKEN")
- @login_required
- def add_matrix(request, code):
- project = _get_rw_project_for_user(request, code)
- if request.method == "POST":
- form = forms.AddMatrixForm(request.POST)
- if form.is_valid():
- channel = Channel(project=project, kind="matrix")
- channel.value = form.cleaned_data["room_id"]
-
- # If user supplied room alias instead of ID, use it as channel name
- alias = form.cleaned_data["alias"]
- if not alias.startswith("!"):
- channel.name = alias
-
- channel.save()
-
- channel.assign_all_checks()
- messages.success(request, "The Matrix integration has been added!")
- return redirect("hc-channels", project.code)
- else:
- form = forms.AddMatrixForm()
-
- ctx = {
- "page": "channels",
- "project": project,
- "form": form,
- "matrix_user_id": settings.MATRIX_USER_ID,
- }
- return render(request, "integrations/add_matrix.html", ctx)
-
-
- @require_setting("APPRISE_ENABLED")
- @login_required
- def add_apprise(request, code):
- project = _get_rw_project_for_user(request, code)
-
- if request.method == "POST":
- form = forms.AddAppriseForm(request.POST)
- if form.is_valid():
- channel = Channel(project=project, kind="apprise")
- channel.value = form.cleaned_data["url"]
- channel.save()
-
- channel.assign_all_checks()
- messages.success(request, "The Apprise integration has been added!")
- return redirect("hc-channels", project.code)
- else:
- form = forms.AddAppriseForm()
-
- ctx = {"page": "channels", "project": project, "form": form}
- return render(request, "integrations/add_apprise.html", ctx)
-
-
- @require_setting("TRELLO_APP_KEY")
- @login_required
- @require_POST
- def trello_settings(request):
- token = request.POST.get("token")
-
- url = "https://api.trello.com/1/members/me/boards?" + urlencode(
- {
- "key": settings.TRELLO_APP_KEY,
- "token": token,
- "filter": "open",
- "fields": "id,name",
- "lists": "open",
- "list_fields": "id,name",
- }
- )
-
- boards = requests.get(url).json()
- num_lists = sum(len(board["lists"]) for board in boards)
-
- ctx = {"token": token, "boards": boards, "num_lists": num_lists}
- return render(request, "integrations/trello_settings.html", ctx)
-
-
- @require_setting("MSTEAMS_ENABLED")
- @login_required
- def add_msteams(request, code):
- project = _get_rw_project_for_user(request, code)
-
- if request.method == "POST":
- form = forms.AddUrlForm(request.POST)
- if form.is_valid():
- channel = Channel(project=project, kind="msteams")
- channel.value = form.cleaned_data["value"]
- channel.save()
-
- channel.assign_all_checks()
- return redirect("hc-channels", project.code)
- else:
- form = forms.AddUrlForm()
-
- ctx = {"page": "channels", "project": project, "form": form}
- return render(request, "integrations/add_msteams.html", ctx)
-
-
- @require_setting("PROMETHEUS_ENABLED")
- @login_required
- def add_prometheus(request, code):
- project, rw = _get_project_for_user(request, code)
- ctx = {"page": "channels", "project": project}
- return render(request, "integrations/add_prometheus.html", ctx)
-
-
- @require_setting("PROMETHEUS_ENABLED")
- def metrics(request, code, key):
- if len(key) != 32:
- return HttpResponseBadRequest()
-
- q = Project.objects.filter(code=code, api_key_readonly=key)
- try:
- project = q.get()
- except Project.DoesNotExist:
- return HttpResponseForbidden()
-
- checks = Check.objects.filter(project_id=project.id).order_by("id")
-
- def esc(s):
- return s.replace("\\", "\\\\").replace('"', '\\"').replace("\n", "\\n")
-
- def output(checks):
- yield "# HELP hc_check_up Whether the check is currently up (1 for yes, 0 for no).\n"
- yield "# TYPE hc_check_up gauge\n"
-
- TMPL = """hc_check_up{name="%s", tags="%s", unique_key="%s"} %d\n"""
- for check in checks:
- value = 0 if check.get_status() == "down" else 1
- yield TMPL % (esc(check.name), esc(check.tags), check.unique_key, value)
-
- tags_statuses, num_down = _tags_statuses(checks)
- yield "\n"
- yield "# HELP hc_tag_up Whether all checks with this tag are up (1 for yes, 0 for no).\n"
- yield "# TYPE hc_tag_up gauge\n"
- TMPL = """hc_tag_up{tag="%s"} %d\n"""
- for tag in sorted(tags_statuses):
- value = 0 if tags_statuses[tag] == "down" else 1
- yield TMPL % (esc(tag), value)
-
- yield "\n"
- yield "# HELP hc_checks_total The total number of checks.\n"
- yield "# TYPE hc_checks_total gauge\n"
- yield "hc_checks_total %d\n" % len(checks)
- yield "\n"
-
- yield "# HELP hc_checks_down_total The number of checks currently down.\n"
- yield "# TYPE hc_checks_down_total gauge\n"
- yield "hc_checks_down_total %d\n" % num_down
-
- return HttpResponse(output(checks), content_type="text/plain")
-
-
- @require_setting("SPIKE_ENABLED")
- @login_required
- def add_spike(request, code):
- project = _get_rw_project_for_user(request, code)
-
- if request.method == "POST":
- form = forms.AddUrlForm(request.POST)
- if form.is_valid():
- channel = Channel(project=project, kind="spike")
- channel.value = form.cleaned_data["value"]
- channel.save()
-
- channel.assign_all_checks()
- return redirect("hc-channels", project.code)
- else:
- form = forms.AddUrlForm()
-
- ctx = {"page": "channels", "project": project, "form": form}
- return render(request, "integrations/add_spike.html", ctx)
-
-
- @require_setting("LINENOTIFY_CLIENT_ID")
- @login_required
- def add_linenotify(request, code):
- project = _get_rw_project_for_user(request, code)
-
- state = token_urlsafe()
- authorize_url = " https://notify-bot.line.me/oauth/authorize?" + urlencode(
- {
- "client_id": settings.LINENOTIFY_CLIENT_ID,
- "redirect_uri": settings.SITE_ROOT + reverse(add_linenotify_complete),
- "response_type": "code",
- "state": state,
- "scope": "notify",
- }
- )
-
- ctx = {
- "page": "channels",
- "project": project,
- "authorize_url": authorize_url,
- }
-
- request.session["add_linenotify"] = (state, str(project.code))
- return render(request, "integrations/add_linenotify.html", ctx)
-
-
- @require_setting("LINENOTIFY_CLIENT_ID")
- @login_required
- def add_linenotify_complete(request):
- if "add_linenotify" not in request.session:
- return HttpResponseForbidden()
-
- state, code = request.session.pop("add_linenotify")
- if request.GET.get("state") != state:
- return HttpResponseForbidden()
-
- project = _get_rw_project_for_user(request, code)
- if request.GET.get("error") == "access_denied":
- messages.warning(request, "LINE Notify setup was cancelled.")
- return redirect("hc-channels", project.code)
-
- # Exchange code for access token
- result = requests.post(
- "https://notify-bot.line.me/oauth/token",
- {
- "grant_type": "authorization_code",
- "code": request.GET.get("code"),
- "redirect_uri": settings.SITE_ROOT + reverse(add_linenotify_complete),
- "client_id": settings.LINENOTIFY_CLIENT_ID,
- "client_secret": settings.LINENOTIFY_CLIENT_SECRET,
- },
- )
-
- doc = result.json()
- if doc.get("status") != 200:
- messages.warning(request, "Something went wrong.")
- return redirect("hc-channels", project.code)
-
- # Fetch notification target's name, will use it as channel name:
- token = doc["access_token"]
- result = requests.get(
- "https://notify-api.line.me/api/status",
- headers={"Authorization": "Bearer %s" % token},
- )
- doc = result.json()
-
- channel = Channel(kind="linenotify", project=project)
- channel.name = doc.get("target")
- channel.value = token
- channel.save()
- channel.assign_all_checks()
- messages.success(request, "The LINE Notify integration has been added!")
-
- return redirect("hc-channels", project.code)
-
-
- # Forks: add custom views after this line
|