from datetime import datetime, timedelta as td
|
|
import json
|
|
from urllib.parse import urlencode
|
|
|
|
from croniter import croniter
|
|
from django.conf import settings
|
|
from django.contrib import messages
|
|
from django.contrib.auth.decorators import login_required
|
|
from django.core import signing
|
|
from django.db.models import Count
|
|
from django.http import (
|
|
Http404,
|
|
HttpResponse,
|
|
HttpResponseBadRequest,
|
|
HttpResponseForbidden,
|
|
JsonResponse,
|
|
)
|
|
from django.shortcuts import get_object_or_404, redirect, render
|
|
from django.template.loader import get_template, render_to_string
|
|
from django.urls import reverse
|
|
from django.utils import timezone
|
|
from django.utils.crypto import get_random_string
|
|
from django.views.decorators.csrf import csrf_exempt
|
|
from django.views.decorators.http import require_POST
|
|
from hc.accounts.models import Project
|
|
from hc.api.models import (
|
|
DEFAULT_GRACE,
|
|
DEFAULT_TIMEOUT,
|
|
Channel,
|
|
Check,
|
|
Ping,
|
|
Notification,
|
|
)
|
|
from hc.api.transports import Telegram
|
|
from hc.front.forms import (
|
|
AddWebhookForm,
|
|
NameTagsForm,
|
|
TimeoutForm,
|
|
AddUrlForm,
|
|
AddEmailForm,
|
|
AddOpsGenieForm,
|
|
CronForm,
|
|
AddSmsForm,
|
|
ChannelNameForm,
|
|
EmailSettingsForm,
|
|
AddMatrixForm,
|
|
)
|
|
from hc.front.schemas import telegram_callback
|
|
from hc.front.templatetags.hc_extras import num_down_title, down_title, sortchecks
|
|
from hc.lib import jsonschema
|
|
from hc.lib.badges import get_badge_url
|
|
import pytz
|
|
from pytz.exceptions import UnknownTimeZoneError
|
|
import requests
|
|
|
|
|
|
VALID_SORT_VALUES = ("name", "-name", "last_ping", "-last_ping", "created")
|
|
STATUS_TEXT_TMPL = get_template("front/log_status_text.html")
|
|
LAST_PING_TMPL = get_template("front/last_ping_cell.html")
|
|
EVENTS_TMPL = get_template("front/details_events.html")
|
|
ONE_HOUR = td(hours=1)
|
|
TWELVE_HOURS = td(hours=12)
|
|
|
|
|
|
def _tags_statuses(checks):
|
|
tags, down, grace, num_down = {}, {}, {}, 0
|
|
for check in checks:
|
|
status = check.get_status(with_started=False)
|
|
|
|
if status == "down":
|
|
num_down += 1
|
|
for tag in check.tags_list():
|
|
down[tag] = "down"
|
|
elif status == "grace":
|
|
for tag in check.tags_list():
|
|
grace[tag] = "grace"
|
|
else:
|
|
for tag in check.tags_list():
|
|
tags[tag] = "up"
|
|
|
|
tags.update(grace)
|
|
tags.update(down)
|
|
return tags, num_down
|
|
|
|
|
|
def _get_check_for_user(request, code):
|
|
""" Return specified check if current user has access to it. """
|
|
|
|
if not request.user.is_authenticated:
|
|
raise Http404("not found")
|
|
|
|
if request.user.is_superuser:
|
|
q = Check.objects
|
|
else:
|
|
q = request.profile.checks_from_all_projects()
|
|
|
|
try:
|
|
return q.get(code=code)
|
|
except Check.DoesNotExist:
|
|
raise Http404("not found")
|
|
|
|
|
|
def _get_project_for_user(request, project_code):
|
|
""" Return true if current user has access to the specified account. """
|
|
|
|
if request.user.is_superuser:
|
|
q = Project.objects
|
|
else:
|
|
q = request.profile.projects()
|
|
|
|
try:
|
|
return q.get(code=project_code)
|
|
except Project.DoesNotExist:
|
|
raise Http404("not found")
|
|
|
|
|
|
@login_required
|
|
def my_checks(request, code):
|
|
project = _get_project_for_user(request, code)
|
|
|
|
if request.GET.get("sort") in VALID_SORT_VALUES:
|
|
request.profile.sort = request.GET["sort"]
|
|
request.profile.save()
|
|
|
|
if request.profile.current_project_id != project.id:
|
|
request.profile.current_project = project
|
|
request.profile.save()
|
|
|
|
q = Check.objects.filter(project=project)
|
|
checks = list(q.prefetch_related("channel_set"))
|
|
sortchecks(checks, request.profile.sort)
|
|
|
|
tags_statuses, num_down = _tags_statuses(checks)
|
|
pairs = list(tags_statuses.items())
|
|
pairs.sort(key=lambda pair: pair[0].lower())
|
|
|
|
channels = Channel.objects.filter(project=project)
|
|
channels = list(channels.order_by("created"))
|
|
|
|
hidden_checks = set()
|
|
# Hide checks that don't match selected tags:
|
|
selected_tags = set(request.GET.getlist("tag", []))
|
|
if selected_tags:
|
|
for check in checks:
|
|
if not selected_tags.issubset(check.tags_list()):
|
|
hidden_checks.add(check)
|
|
|
|
# Hide checks that don't match the search string:
|
|
search = request.GET.get("search", "")
|
|
if search:
|
|
for check in checks:
|
|
search_key = "%s\n%s" % (check.name.lower(), check.code)
|
|
if search not in search_key:
|
|
hidden_checks.add(check)
|
|
|
|
ctx = {
|
|
"page": "checks",
|
|
"checks": checks,
|
|
"channels": channels,
|
|
"num_down": num_down,
|
|
"now": timezone.now(),
|
|
"tags": pairs,
|
|
"ping_endpoint": settings.PING_ENDPOINT,
|
|
"timezones": pytz.all_timezones,
|
|
"project": project,
|
|
"num_available": project.num_checks_available(),
|
|
"sort": request.profile.sort,
|
|
"selected_tags": selected_tags,
|
|
"show_search": True,
|
|
"search": search,
|
|
"hidden_checks": hidden_checks,
|
|
}
|
|
|
|
return render(request, "front/my_checks.html", ctx)
|
|
|
|
|
|
@login_required
|
|
def status(request, code):
|
|
_get_project_for_user(request, code)
|
|
|
|
checks = list(Check.objects.filter(project__code=code))
|
|
|
|
details = []
|
|
for check in checks:
|
|
ctx = {"check": check}
|
|
details.append(
|
|
{
|
|
"code": str(check.code),
|
|
"status": check.get_status(),
|
|
"last_ping": LAST_PING_TMPL.render(ctx),
|
|
}
|
|
)
|
|
|
|
tags_statuses, num_down = _tags_statuses(checks)
|
|
return JsonResponse(
|
|
{"details": details, "tags": tags_statuses, "title": num_down_title(num_down)}
|
|
)
|
|
|
|
|
|
@login_required
|
|
@require_POST
|
|
def switch_channel(request, code, channel_code):
|
|
check = _get_check_for_user(request, code)
|
|
|
|
channel = get_object_or_404(Channel, code=channel_code)
|
|
if channel.project_id != check.project_id:
|
|
return HttpResponseBadRequest()
|
|
|
|
if request.POST.get("state") == "on":
|
|
channel.checks.add(check)
|
|
else:
|
|
channel.checks.remove(check)
|
|
|
|
return HttpResponse()
|
|
|
|
|
|
def index(request):
|
|
if request.user.is_authenticated:
|
|
projects = list(request.profile.projects())
|
|
|
|
ctx = {"page": "projects", "projects": projects}
|
|
return render(request, "front/projects.html", ctx)
|
|
|
|
check = Check()
|
|
|
|
ctx = {
|
|
"page": "welcome",
|
|
"check": check,
|
|
"ping_url": check.url(),
|
|
"enable_pushbullet": settings.PUSHBULLET_CLIENT_ID is not None,
|
|
"enable_pushover": settings.PUSHOVER_API_TOKEN is not None,
|
|
"enable_discord": settings.DISCORD_CLIENT_ID is not None,
|
|
"enable_telegram": settings.TELEGRAM_TOKEN is not None,
|
|
"enable_sms": settings.TWILIO_AUTH is not None,
|
|
"enable_pd": settings.PD_VENDOR_KEY is not None,
|
|
"enable_trello": settings.TRELLO_APP_KEY is not None,
|
|
"enable_matrix": settings.MATRIX_ACCESS_TOKEN is not None,
|
|
"registration_open": settings.REGISTRATION_OPEN,
|
|
}
|
|
|
|
return render(request, "front/welcome.html", ctx)
|
|
|
|
|
|
def docs(request):
|
|
ctx = {
|
|
"page": "docs",
|
|
"section": "home",
|
|
"ping_endpoint": settings.PING_ENDPOINT,
|
|
"ping_email": "your-uuid-here@%s" % settings.PING_EMAIL_DOMAIN,
|
|
"ping_email_domain": settings.PING_EMAIL_DOMAIN,
|
|
"ping_url": settings.PING_ENDPOINT + "your-uuid-here",
|
|
}
|
|
|
|
return render(request, "front/docs.html", ctx)
|
|
|
|
|
|
def docs_api(request):
|
|
ctx = {
|
|
"page": "docs",
|
|
"section": "api",
|
|
"SITE_ROOT": settings.SITE_ROOT,
|
|
"PING_ENDPOINT": settings.PING_ENDPOINT,
|
|
"default_timeout": int(DEFAULT_TIMEOUT.total_seconds()),
|
|
"default_grace": int(DEFAULT_GRACE.total_seconds()),
|
|
}
|
|
|
|
return render(request, "front/docs_api.html", ctx)
|
|
|
|
|
|
def docs_cron(request):
|
|
ctx = {"page": "docs", "section": "cron"}
|
|
return render(request, "front/docs_cron.html", ctx)
|
|
|
|
|
|
def docs_resources(request):
|
|
ctx = {"page": "docs", "section": "resources"}
|
|
return render(request, "front/docs_resources.html", ctx)
|
|
|
|
|
|
@require_POST
|
|
@login_required
|
|
def add_check(request, code):
|
|
project = _get_project_for_user(request, code)
|
|
if project.num_checks_available() <= 0:
|
|
return HttpResponseBadRequest()
|
|
|
|
check = Check(project=project)
|
|
check.save()
|
|
|
|
check.assign_all_channels()
|
|
|
|
return redirect("hc-checks", code)
|
|
|
|
|
|
@require_POST
|
|
@login_required
|
|
def update_name(request, code):
|
|
check = _get_check_for_user(request, code)
|
|
form = NameTagsForm(request.POST)
|
|
if form.is_valid():
|
|
check.name = form.cleaned_data["name"]
|
|
check.tags = form.cleaned_data["tags"]
|
|
check.desc = form.cleaned_data["desc"]
|
|
check.save()
|
|
|
|
if "/details/" in request.META.get("HTTP_REFERER", ""):
|
|
return redirect("hc-details", code)
|
|
|
|
return redirect("hc-checks", check.project.code)
|
|
|
|
|
|
@require_POST
|
|
@login_required
|
|
def email_settings(request, code):
|
|
check = _get_check_for_user(request, code)
|
|
form = EmailSettingsForm(request.POST)
|
|
if form.is_valid():
|
|
check.subject = form.cleaned_data["subject"]
|
|
check.save()
|
|
|
|
return redirect("hc-details", code)
|
|
|
|
|
|
@require_POST
|
|
@login_required
|
|
def update_timeout(request, code):
|
|
check = _get_check_for_user(request, code)
|
|
|
|
kind = request.POST.get("kind")
|
|
if kind == "simple":
|
|
form = TimeoutForm(request.POST)
|
|
if not form.is_valid():
|
|
return HttpResponseBadRequest()
|
|
|
|
check.kind = "simple"
|
|
check.timeout = form.cleaned_data["timeout"]
|
|
check.grace = form.cleaned_data["grace"]
|
|
elif kind == "cron":
|
|
form = CronForm(request.POST)
|
|
if not form.is_valid():
|
|
return HttpResponseBadRequest()
|
|
|
|
check.kind = "cron"
|
|
check.schedule = form.cleaned_data["schedule"]
|
|
check.tz = form.cleaned_data["tz"]
|
|
check.grace = td(minutes=form.cleaned_data["grace"])
|
|
|
|
check.alert_after = check.going_down_after()
|
|
check.save()
|
|
|
|
if "/details/" in request.META.get("HTTP_REFERER", ""):
|
|
return redirect("hc-details", code)
|
|
|
|
return redirect("hc-checks", check.project.code)
|
|
|
|
|
|
@require_POST
|
|
def cron_preview(request):
|
|
schedule = request.POST.get("schedule", "")
|
|
tz = request.POST.get("tz")
|
|
ctx = {"tz": tz, "dates": []}
|
|
|
|
try:
|
|
zone = pytz.timezone(tz)
|
|
now_local = timezone.localtime(timezone.now(), zone)
|
|
|
|
if len(schedule.split()) != 5:
|
|
raise ValueError()
|
|
|
|
it = croniter(schedule, now_local)
|
|
for i in range(0, 6):
|
|
ctx["dates"].append(it.get_next(datetime))
|
|
except UnknownTimeZoneError:
|
|
ctx["bad_tz"] = True
|
|
except:
|
|
ctx["bad_schedule"] = True
|
|
|
|
return render(request, "front/cron_preview.html", ctx)
|
|
|
|
|
|
def ping_details(request, code, n=None):
|
|
check = _get_check_for_user(request, code)
|
|
q = Ping.objects.filter(owner=check)
|
|
if n:
|
|
q = q.filter(n=n)
|
|
|
|
ping = q.latest("created")
|
|
|
|
ctx = {"check": check, "ping": ping}
|
|
|
|
return render(request, "front/ping_details.html", ctx)
|
|
|
|
|
|
@require_POST
|
|
@login_required
|
|
def pause(request, code):
|
|
check = _get_check_for_user(request, code)
|
|
|
|
check.status = "paused"
|
|
check.last_start = None
|
|
check.alert_after = None
|
|
check.save()
|
|
|
|
if "/details/" in request.META.get("HTTP_REFERER", ""):
|
|
return redirect("hc-details", code)
|
|
|
|
return redirect("hc-checks", check.project.code)
|
|
|
|
|
|
@require_POST
|
|
@login_required
|
|
def remove_check(request, code):
|
|
check = _get_check_for_user(request, code)
|
|
project = check.project
|
|
check.delete()
|
|
return redirect("hc-checks", project.code)
|
|
|
|
|
|
def _get_events(check, limit):
|
|
# max time between start and ping where we will consider
|
|
# the both events related.
|
|
max_delta = min(ONE_HOUR + check.grace, TWELVE_HOURS)
|
|
|
|
pings = Ping.objects.filter(owner=check).order_by("-id")[:limit]
|
|
pings = list(pings)
|
|
|
|
prev = None
|
|
for ping in pings:
|
|
if ping.kind == "start" and prev and prev.kind != "start":
|
|
delta = prev.created - ping.created
|
|
if delta < max_delta:
|
|
setattr(prev, "delta", delta)
|
|
|
|
prev = ping
|
|
|
|
alerts = []
|
|
if len(pings):
|
|
cutoff = pings[-1].created
|
|
alerts = Notification.objects.select_related("channel").filter(
|
|
owner=check, check_status="down", created__gt=cutoff
|
|
)
|
|
|
|
events = pings + list(alerts)
|
|
events.sort(key=lambda el: el.created, reverse=True)
|
|
return events
|
|
|
|
|
|
@login_required
|
|
def log(request, code):
|
|
check = _get_check_for_user(request, code)
|
|
|
|
limit = check.project.owner_profile.ping_log_limit
|
|
ctx = {
|
|
"project": check.project,
|
|
"check": check,
|
|
"events": _get_events(check, limit),
|
|
"limit": limit,
|
|
"show_limit_notice": check.n_pings > limit and settings.USE_PAYMENTS,
|
|
}
|
|
|
|
return render(request, "front/log.html", ctx)
|
|
|
|
|
|
@login_required
|
|
def details(request, code):
|
|
check = _get_check_for_user(request, code)
|
|
|
|
channels = Channel.objects.filter(project=check.project)
|
|
channels = list(channels.order_by("created"))
|
|
|
|
ctx = {
|
|
"page": "details",
|
|
"project": check.project,
|
|
"check": check,
|
|
"channels": channels,
|
|
"timezones": pytz.all_timezones,
|
|
}
|
|
|
|
return render(request, "front/details.html", ctx)
|
|
|
|
|
|
@login_required
|
|
def transfer(request, code):
|
|
check = _get_check_for_user(request, code)
|
|
|
|
if request.method == "POST":
|
|
target_project = _get_project_for_user(request, request.POST["project"])
|
|
if target_project.num_checks_available() <= 0:
|
|
return HttpResponseBadRequest()
|
|
|
|
check.project = target_project
|
|
check.save()
|
|
|
|
check.assign_all_channels()
|
|
|
|
request.profile.current_project = target_project
|
|
request.profile.save()
|
|
|
|
messages.success(request, "Check transferred successfully!")
|
|
|
|
return redirect("hc-details", code)
|
|
|
|
ctx = {"check": check}
|
|
return render(request, "front/transfer_modal.html", ctx)
|
|
|
|
|
|
@login_required
|
|
def status_single(request, code):
|
|
check = _get_check_for_user(request, code)
|
|
|
|
status = check.get_status()
|
|
events = _get_events(check, 20)
|
|
updated = "1"
|
|
if len(events):
|
|
updated = str(events[0].created.timestamp())
|
|
|
|
doc = {
|
|
"status": status,
|
|
"status_text": STATUS_TEXT_TMPL.render({"check": check}),
|
|
"title": down_title(check),
|
|
"updated": updated,
|
|
}
|
|
|
|
if updated != request.GET.get("u"):
|
|
doc["events"] = EVENTS_TMPL.render({"check": check, "events": events})
|
|
|
|
return JsonResponse(doc)
|
|
|
|
|
|
@login_required
|
|
def badges(request, code):
|
|
project = _get_project_for_user(request, code)
|
|
|
|
tags = set()
|
|
for check in Check.objects.filter(project=project):
|
|
tags.update(check.tags_list())
|
|
|
|
sorted_tags = sorted(tags, key=lambda s: s.lower())
|
|
sorted_tags.append("*") # For the "overall status" badge
|
|
|
|
urls = []
|
|
for tag in sorted_tags:
|
|
urls.append(
|
|
{
|
|
"tag": tag,
|
|
"svg": get_badge_url(project.badge_key, tag),
|
|
"json": get_badge_url(project.badge_key, tag, format="json"),
|
|
}
|
|
)
|
|
|
|
ctx = {
|
|
"have_tags": len(urls) > 1,
|
|
"page": "badges",
|
|
"project": project,
|
|
"badges": urls,
|
|
}
|
|
|
|
return render(request, "front/badges.html", ctx)
|
|
|
|
|
|
@login_required
|
|
def channels(request):
|
|
if request.method == "POST":
|
|
code = request.POST["channel"]
|
|
try:
|
|
channel = Channel.objects.get(code=code)
|
|
except Channel.DoesNotExist:
|
|
return HttpResponseBadRequest()
|
|
if channel.project_id != request.project.id:
|
|
return HttpResponseForbidden()
|
|
|
|
new_checks = []
|
|
for key in request.POST:
|
|
if key.startswith("check-"):
|
|
code = key[6:]
|
|
try:
|
|
check = Check.objects.get(code=code)
|
|
except Check.DoesNotExist:
|
|
return HttpResponseBadRequest()
|
|
if check.project_id != request.project.id:
|
|
return HttpResponseForbidden()
|
|
new_checks.append(check)
|
|
|
|
channel.checks.set(new_checks)
|
|
return redirect("hc-channels")
|
|
|
|
channels = Channel.objects.filter(project=request.project)
|
|
channels = channels.order_by("created")
|
|
channels = channels.annotate(n_checks=Count("checks"))
|
|
|
|
ctx = {
|
|
"page": "channels",
|
|
"project": request.project,
|
|
"profile": request.project.owner_profile,
|
|
"channels": channels,
|
|
"enable_pushbullet": settings.PUSHBULLET_CLIENT_ID is not None,
|
|
"enable_pushover": settings.PUSHOVER_API_TOKEN is not None,
|
|
"enable_discord": settings.DISCORD_CLIENT_ID is not None,
|
|
"enable_telegram": settings.TELEGRAM_TOKEN is not None,
|
|
"enable_sms": settings.TWILIO_AUTH is not None,
|
|
"enable_pd": settings.PD_VENDOR_KEY is not None,
|
|
"enable_trello": settings.TRELLO_APP_KEY is not None,
|
|
"enable_matrix": settings.MATRIX_ACCESS_TOKEN is not None,
|
|
"use_payments": settings.USE_PAYMENTS,
|
|
}
|
|
|
|
return render(request, "front/channels.html", ctx)
|
|
|
|
|
|
@login_required
|
|
def channel_checks(request, code):
|
|
channel = get_object_or_404(Channel, code=code)
|
|
if channel.project_id != request.project.id:
|
|
return HttpResponseForbidden()
|
|
|
|
assigned = set(channel.checks.values_list("code", flat=True).distinct())
|
|
checks = Check.objects.filter(project=request.project).order_by("created")
|
|
|
|
ctx = {"checks": checks, "assigned": assigned, "channel": channel}
|
|
|
|
return render(request, "front/channel_checks.html", ctx)
|
|
|
|
|
|
@require_POST
|
|
@login_required
|
|
def update_channel_name(request, code):
|
|
channel = get_object_or_404(Channel, code=code)
|
|
if channel.project_id != request.project.id:
|
|
return HttpResponseForbidden()
|
|
|
|
form = ChannelNameForm(request.POST)
|
|
if form.is_valid():
|
|
channel.name = form.cleaned_data["name"]
|
|
channel.save()
|
|
|
|
return redirect("hc-channels")
|
|
|
|
|
|
def verify_email(request, code, token):
|
|
channel = get_object_or_404(Channel, code=code)
|
|
if channel.make_token() == token:
|
|
channel.email_verified = True
|
|
channel.save()
|
|
return render(request, "front/verify_email_success.html")
|
|
|
|
return render(request, "bad_link.html")
|
|
|
|
|
|
def unsubscribe_email(request, code, token):
|
|
channel = get_object_or_404(Channel, code=code)
|
|
if channel.make_token() != token:
|
|
return render(request, "bad_link.html")
|
|
|
|
if channel.kind != "email":
|
|
return HttpResponseBadRequest()
|
|
|
|
# Some email servers open links in emails to check for malicious content.
|
|
# To work around this, we serve a form that auto-submits with JS.
|
|
if "ask" in request.GET and request.method != "POST":
|
|
return render(request, "accounts/unsubscribe_submit.html")
|
|
|
|
channel.delete()
|
|
return render(request, "front/unsubscribe_success.html")
|
|
|
|
|
|
@require_POST
|
|
@login_required
|
|
def send_test_notification(request, code):
|
|
channel = get_object_or_404(Channel, code=code)
|
|
if channel.project_id != request.project.id:
|
|
return HttpResponseForbidden()
|
|
|
|
dummy = Check(name="TEST", status="down")
|
|
dummy.last_ping = timezone.now() - td(days=1)
|
|
dummy.n_pings = 42
|
|
|
|
if channel.kind == "email":
|
|
error = channel.transport.notify(dummy, channel.get_unsub_link())
|
|
else:
|
|
error = channel.transport.notify(dummy)
|
|
|
|
if error:
|
|
messages.warning(request, "Could not send a test notification: %s" % error)
|
|
else:
|
|
messages.success(request, "Test notification sent!")
|
|
|
|
return redirect("hc-channels")
|
|
|
|
|
|
@require_POST
|
|
@login_required
|
|
def remove_channel(request, code):
|
|
# user may refresh the page during POST and cause two deletion attempts
|
|
channel = Channel.objects.filter(code=code).first()
|
|
if channel:
|
|
if channel.project_id != request.project.id:
|
|
return HttpResponseForbidden()
|
|
channel.delete()
|
|
|
|
return redirect("hc-channels")
|
|
|
|
|
|
@login_required
|
|
def add_email(request):
|
|
if request.method == "POST":
|
|
form = AddEmailForm(request.POST)
|
|
if form.is_valid():
|
|
channel = Channel(project=request.project, kind="email")
|
|
channel.value = json.dumps(
|
|
{
|
|
"value": form.cleaned_data["value"],
|
|
"up": form.cleaned_data["up"],
|
|
"down": form.cleaned_data["down"],
|
|
}
|
|
)
|
|
channel.save()
|
|
|
|
channel.assign_all_checks()
|
|
|
|
is_own_email = form.cleaned_data["value"] == request.user.email
|
|
if is_own_email or not settings.EMAIL_USE_VERIFICATION:
|
|
# If user is subscribing *their own* address
|
|
# we can skip the verification step.
|
|
|
|
# Additionally, in self-hosted setting, administator has the
|
|
# option to disable the email verification step altogether.
|
|
|
|
channel.email_verified = True
|
|
channel.save()
|
|
else:
|
|
channel.send_verify_link()
|
|
|
|
return redirect("hc-channels")
|
|
else:
|
|
form = AddEmailForm()
|
|
|
|
ctx = {
|
|
"page": "channels",
|
|
"project": request.project,
|
|
"use_verification": settings.EMAIL_USE_VERIFICATION,
|
|
"form": form,
|
|
}
|
|
return render(request, "integrations/add_email.html", ctx)
|
|
|
|
|
|
@login_required
|
|
def add_webhook(request):
|
|
if request.method == "POST":
|
|
form = AddWebhookForm(request.POST)
|
|
if form.is_valid():
|
|
channel = Channel(project=request.project, kind="webhook")
|
|
channel.value = form.get_value()
|
|
channel.save()
|
|
|
|
channel.assign_all_checks()
|
|
return redirect("hc-channels")
|
|
else:
|
|
form = AddWebhookForm()
|
|
|
|
ctx = {
|
|
"page": "channels",
|
|
"project": request.project,
|
|
"form": form,
|
|
"now": timezone.now().replace(microsecond=0).isoformat(),
|
|
}
|
|
return render(request, "integrations/add_webhook.html", ctx)
|
|
|
|
|
|
def _prepare_state(request, session_key):
|
|
state = get_random_string()
|
|
request.session[session_key] = state
|
|
return state
|
|
|
|
|
|
def _get_validated_code(request, session_key, key="code"):
|
|
if session_key not in request.session:
|
|
return None
|
|
|
|
session_state = request.session.pop(session_key)
|
|
request_state = request.GET.get("state")
|
|
if session_state is None or session_state != request_state:
|
|
return None
|
|
|
|
return request.GET.get(key)
|
|
|
|
|
|
def add_pd(request, state=None):
|
|
if settings.PD_VENDOR_KEY is None:
|
|
raise Http404("pagerduty integration is not available")
|
|
|
|
if state and request.user.is_authenticated:
|
|
if "pd" not in request.session:
|
|
return HttpResponseBadRequest()
|
|
|
|
session_state = request.session.pop("pd")
|
|
if session_state != state:
|
|
return HttpResponseBadRequest()
|
|
|
|
if request.GET.get("error") == "cancelled":
|
|
messages.warning(request, "PagerDuty setup was cancelled")
|
|
return redirect("hc-channels")
|
|
|
|
channel = Channel(kind="pd", project=request.project)
|
|
channel.user = request.project.owner
|
|
channel.value = json.dumps(
|
|
{
|
|
"service_key": request.GET.get("service_key"),
|
|
"account": request.GET.get("account"),
|
|
}
|
|
)
|
|
channel.save()
|
|
channel.assign_all_checks()
|
|
messages.success(request, "The PagerDuty integration has been added!")
|
|
return redirect("hc-channels")
|
|
|
|
state = _prepare_state(request, "pd")
|
|
callback = settings.SITE_ROOT + reverse("hc-add-pd-state", args=[state])
|
|
connect_url = "https://connect.pagerduty.com/connect?" + urlencode(
|
|
{"vendor": settings.PD_VENDOR_KEY, "callback": callback}
|
|
)
|
|
|
|
ctx = {"page": "channels", "project": request.project, "connect_url": connect_url}
|
|
return render(request, "integrations/add_pd.html", ctx)
|
|
|
|
|
|
@login_required
|
|
def add_pagertree(request):
|
|
if request.method == "POST":
|
|
form = AddUrlForm(request.POST)
|
|
if form.is_valid():
|
|
channel = Channel(project=request.project, kind="pagertree")
|
|
channel.value = form.cleaned_data["value"]
|
|
channel.save()
|
|
|
|
channel.assign_all_checks()
|
|
return redirect("hc-channels")
|
|
else:
|
|
form = AddUrlForm()
|
|
|
|
ctx = {"page": "channels", "project": request.project, "form": form}
|
|
return render(request, "integrations/add_pagertree.html", ctx)
|
|
|
|
|
|
@login_required
|
|
def add_pagerteam(request):
|
|
if request.method == "POST":
|
|
form = AddUrlForm(request.POST)
|
|
if form.is_valid():
|
|
channel = Channel(project=request.project, kind="pagerteam")
|
|
channel.value = form.cleaned_data["value"]
|
|
channel.save()
|
|
|
|
channel.assign_all_checks()
|
|
return redirect("hc-channels")
|
|
else:
|
|
form = AddUrlForm()
|
|
|
|
ctx = {"page": "channels", "project": request.project, "form": form}
|
|
return render(request, "integrations/add_pagerteam.html", ctx)
|
|
|
|
|
|
def add_slack(request):
|
|
if not settings.SLACK_CLIENT_ID and not request.user.is_authenticated:
|
|
return redirect("hc-login")
|
|
|
|
if request.method == "POST":
|
|
form = AddUrlForm(request.POST)
|
|
if form.is_valid():
|
|
channel = Channel(project=request.project, kind="slack")
|
|
channel.value = form.cleaned_data["value"]
|
|
channel.save()
|
|
|
|
channel.assign_all_checks()
|
|
return redirect("hc-channels")
|
|
else:
|
|
form = AddUrlForm()
|
|
|
|
ctx = {
|
|
"page": "channels",
|
|
"form": form,
|
|
"slack_client_id": settings.SLACK_CLIENT_ID,
|
|
}
|
|
|
|
if request.user.is_authenticated:
|
|
ctx["project"] = request.project
|
|
|
|
if settings.SLACK_CLIENT_ID and request.user.is_authenticated:
|
|
ctx["state"] = _prepare_state(request, "slack")
|
|
|
|
return render(request, "integrations/add_slack.html", ctx)
|
|
|
|
|
|
@login_required
|
|
def add_slack_btn(request):
|
|
code = _get_validated_code(request, "slack")
|
|
if code is None:
|
|
return HttpResponseBadRequest()
|
|
|
|
result = requests.post(
|
|
"https://slack.com/api/oauth.access",
|
|
{
|
|
"client_id": settings.SLACK_CLIENT_ID,
|
|
"client_secret": settings.SLACK_CLIENT_SECRET,
|
|
"code": code,
|
|
},
|
|
)
|
|
|
|
doc = result.json()
|
|
if doc.get("ok"):
|
|
channel = Channel(kind="slack", project=request.project)
|
|
channel.user = request.project.owner
|
|
channel.value = result.text
|
|
channel.save()
|
|
channel.assign_all_checks()
|
|
messages.success(request, "The Slack integration has been added!")
|
|
else:
|
|
s = doc.get("error")
|
|
messages.warning(request, "Error message from slack: %s" % s)
|
|
|
|
return redirect("hc-channels")
|
|
|
|
|
|
@login_required
|
|
def add_pushbullet(request):
|
|
if settings.PUSHBULLET_CLIENT_ID is None:
|
|
raise Http404("pushbullet integration is not available")
|
|
|
|
if "code" in request.GET:
|
|
code = _get_validated_code(request, "pushbullet")
|
|
if code is None:
|
|
return HttpResponseBadRequest()
|
|
|
|
result = requests.post(
|
|
"https://api.pushbullet.com/oauth2/token",
|
|
{
|
|
"client_id": settings.PUSHBULLET_CLIENT_ID,
|
|
"client_secret": settings.PUSHBULLET_CLIENT_SECRET,
|
|
"code": code,
|
|
"grant_type": "authorization_code",
|
|
},
|
|
)
|
|
|
|
doc = result.json()
|
|
if "access_token" in doc:
|
|
channel = Channel(kind="pushbullet", project=request.project)
|
|
channel.user = request.project.owner
|
|
channel.value = doc["access_token"]
|
|
channel.save()
|
|
channel.assign_all_checks()
|
|
messages.success(request, "The Pushbullet integration has been added!")
|
|
else:
|
|
messages.warning(request, "Something went wrong")
|
|
|
|
return redirect("hc-channels")
|
|
|
|
redirect_uri = settings.SITE_ROOT + reverse("hc-add-pushbullet")
|
|
authorize_url = "https://www.pushbullet.com/authorize?" + urlencode(
|
|
{
|
|
"client_id": settings.PUSHBULLET_CLIENT_ID,
|
|
"redirect_uri": redirect_uri,
|
|
"response_type": "code",
|
|
"state": _prepare_state(request, "pushbullet"),
|
|
}
|
|
)
|
|
|
|
ctx = {
|
|
"page": "channels",
|
|
"project": request.project,
|
|
"authorize_url": authorize_url,
|
|
}
|
|
return render(request, "integrations/add_pushbullet.html", ctx)
|
|
|
|
|
|
@login_required
|
|
def add_discord(request):
|
|
if settings.DISCORD_CLIENT_ID is None:
|
|
raise Http404("discord integration is not available")
|
|
|
|
redirect_uri = settings.SITE_ROOT + reverse("hc-add-discord")
|
|
if "code" in request.GET:
|
|
code = _get_validated_code(request, "discord")
|
|
if code is None:
|
|
return HttpResponseBadRequest()
|
|
|
|
result = requests.post(
|
|
"https://discordapp.com/api/oauth2/token",
|
|
{
|
|
"client_id": settings.DISCORD_CLIENT_ID,
|
|
"client_secret": settings.DISCORD_CLIENT_SECRET,
|
|
"code": code,
|
|
"grant_type": "authorization_code",
|
|
"redirect_uri": redirect_uri,
|
|
},
|
|
)
|
|
|
|
doc = result.json()
|
|
if "access_token" in doc:
|
|
channel = Channel(kind="discord", project=request.project)
|
|
channel.user = request.project.owner
|
|
channel.value = result.text
|
|
channel.save()
|
|
channel.assign_all_checks()
|
|
messages.success(request, "The Discord integration has been added!")
|
|
else:
|
|
messages.warning(request, "Something went wrong")
|
|
|
|
return redirect("hc-channels")
|
|
|
|
auth_url = "https://discordapp.com/api/oauth2/authorize?" + urlencode(
|
|
{
|
|
"client_id": settings.DISCORD_CLIENT_ID,
|
|
"scope": "webhook.incoming",
|
|
"redirect_uri": redirect_uri,
|
|
"response_type": "code",
|
|
"state": _prepare_state(request, "discord"),
|
|
}
|
|
)
|
|
|
|
ctx = {"page": "channels", "project": request.project, "authorize_url": auth_url}
|
|
|
|
return render(request, "integrations/add_discord.html", ctx)
|
|
|
|
|
|
def add_pushover(request):
|
|
if (
|
|
settings.PUSHOVER_API_TOKEN is None
|
|
or settings.PUSHOVER_SUBSCRIPTION_URL is None
|
|
):
|
|
raise Http404("pushover integration is not available")
|
|
|
|
if not request.user.is_authenticated:
|
|
ctx = {"page": "channels"}
|
|
return render(request, "integrations/add_pushover.html", ctx)
|
|
|
|
if request.method == "POST":
|
|
# Initiate the subscription
|
|
state = _prepare_state(request, "pushover")
|
|
|
|
failure_url = settings.SITE_ROOT + reverse("hc-channels")
|
|
success_url = (
|
|
settings.SITE_ROOT
|
|
+ reverse("hc-add-pushover")
|
|
+ "?"
|
|
+ urlencode(
|
|
{
|
|
"state": state,
|
|
"prio": request.POST.get("po_priority", "0"),
|
|
"prio_up": request.POST.get("po_priority_up", "0"),
|
|
}
|
|
)
|
|
)
|
|
subscription_url = (
|
|
settings.PUSHOVER_SUBSCRIPTION_URL
|
|
+ "?"
|
|
+ urlencode({"success": success_url, "failure": failure_url})
|
|
)
|
|
|
|
return redirect(subscription_url)
|
|
|
|
# Handle successful subscriptions
|
|
if "pushover_user_key" in request.GET:
|
|
key = _get_validated_code(request, "pushover", "pushover_user_key")
|
|
if key is None:
|
|
return HttpResponseBadRequest()
|
|
|
|
# Validate priority
|
|
prio = request.GET.get("prio")
|
|
if prio not in ("-2", "-1", "0", "1", "2"):
|
|
return HttpResponseBadRequest()
|
|
|
|
prio_up = request.GET.get("prio_up")
|
|
if prio_up not in ("-2", "-1", "0", "1", "2"):
|
|
return HttpResponseBadRequest()
|
|
|
|
if request.GET.get("pushover_unsubscribed") == "1":
|
|
# Unsubscription: delete all Pushover channels for this project
|
|
Channel.objects.filter(project=request.project, kind="po").delete()
|
|
return redirect("hc-channels")
|
|
|
|
# Subscription
|
|
channel = Channel(project=request.project, kind="po")
|
|
channel.value = "%s|%s|%s" % (key, prio, prio_up)
|
|
channel.save()
|
|
channel.assign_all_checks()
|
|
|
|
messages.success(request, "The Pushover integration has been added!")
|
|
return redirect("hc-channels")
|
|
|
|
# Show Integration Settings form
|
|
ctx = {
|
|
"page": "channels",
|
|
"project": request.project,
|
|
"po_retry_delay": td(seconds=settings.PUSHOVER_EMERGENCY_RETRY_DELAY),
|
|
"po_expiration": td(seconds=settings.PUSHOVER_EMERGENCY_EXPIRATION),
|
|
}
|
|
return render(request, "integrations/add_pushover.html", ctx)
|
|
|
|
|
|
@login_required
|
|
def add_opsgenie(request):
|
|
if request.method == "POST":
|
|
form = AddOpsGenieForm(request.POST)
|
|
if form.is_valid():
|
|
channel = Channel(project=request.project, kind="opsgenie")
|
|
channel.value = form.cleaned_data["value"]
|
|
channel.save()
|
|
|
|
channel.assign_all_checks()
|
|
return redirect("hc-channels")
|
|
else:
|
|
form = AddUrlForm()
|
|
|
|
ctx = {"page": "channels", "project": request.project, "form": form}
|
|
return render(request, "integrations/add_opsgenie.html", ctx)
|
|
|
|
|
|
@login_required
|
|
def add_victorops(request):
|
|
if request.method == "POST":
|
|
form = AddUrlForm(request.POST)
|
|
if form.is_valid():
|
|
channel = Channel(project=request.project, kind="victorops")
|
|
channel.value = form.cleaned_data["value"]
|
|
channel.save()
|
|
|
|
channel.assign_all_checks()
|
|
return redirect("hc-channels")
|
|
else:
|
|
form = AddUrlForm()
|
|
|
|
ctx = {"page": "channels", "project": request.project, "form": form}
|
|
return render(request, "integrations/add_victorops.html", ctx)
|
|
|
|
|
|
@csrf_exempt
|
|
@require_POST
|
|
def telegram_bot(request):
|
|
try:
|
|
doc = json.loads(request.body.decode())
|
|
jsonschema.validate(doc, telegram_callback)
|
|
except ValueError:
|
|
return HttpResponseBadRequest()
|
|
except jsonschema.ValidationError:
|
|
# We don't recognize the message format, but don't want Telegram
|
|
# retrying this over and over again, so respond with 200 OK
|
|
return HttpResponse()
|
|
|
|
if "/start" not in doc["message"]["text"]:
|
|
return HttpResponse()
|
|
|
|
chat = doc["message"]["chat"]
|
|
name = max(chat.get("title", ""), chat.get("username", ""))
|
|
|
|
invite = render_to_string(
|
|
"integrations/telegram_invite.html",
|
|
{"qs": signing.dumps((chat["id"], chat["type"], name))},
|
|
)
|
|
|
|
Telegram.send(chat["id"], invite)
|
|
return HttpResponse()
|
|
|
|
|
|
@login_required
|
|
def add_telegram(request):
|
|
chat_id, chat_type, chat_name = None, None, None
|
|
qs = request.META["QUERY_STRING"]
|
|
if qs:
|
|
chat_id, chat_type, chat_name = signing.loads(qs, max_age=600)
|
|
|
|
if request.method == "POST":
|
|
channel = Channel(project=request.project, kind="telegram")
|
|
channel.value = json.dumps(
|
|
{"id": chat_id, "type": chat_type, "name": chat_name}
|
|
)
|
|
channel.save()
|
|
|
|
channel.assign_all_checks()
|
|
messages.success(request, "The Telegram integration has been added!")
|
|
return redirect("hc-channels")
|
|
|
|
ctx = {
|
|
"page": "channels",
|
|
"project": request.project,
|
|
"chat_id": chat_id,
|
|
"chat_type": chat_type,
|
|
"chat_name": chat_name,
|
|
"bot_name": settings.TELEGRAM_BOT_NAME,
|
|
}
|
|
|
|
return render(request, "integrations/add_telegram.html", ctx)
|
|
|
|
|
|
@login_required
|
|
def add_sms(request):
|
|
if settings.TWILIO_AUTH is None:
|
|
raise Http404("sms integration is not available")
|
|
|
|
if request.method == "POST":
|
|
form = AddSmsForm(request.POST)
|
|
if form.is_valid():
|
|
channel = Channel(project=request.project, kind="sms")
|
|
channel.name = form.cleaned_data["label"]
|
|
channel.value = json.dumps({"value": form.cleaned_data["value"]})
|
|
channel.save()
|
|
|
|
channel.assign_all_checks()
|
|
return redirect("hc-channels")
|
|
else:
|
|
form = AddSmsForm()
|
|
|
|
ctx = {
|
|
"page": "channels",
|
|
"project": request.project,
|
|
"form": form,
|
|
"profile": request.project.owner_profile,
|
|
}
|
|
return render(request, "integrations/add_sms.html", ctx)
|
|
|
|
|
|
@login_required
|
|
def add_trello(request):
|
|
if settings.TRELLO_APP_KEY is None:
|
|
raise Http404("trello integration is not available")
|
|
|
|
if request.method == "POST":
|
|
channel = Channel(project=request.project, kind="trello")
|
|
channel.value = request.POST["settings"]
|
|
channel.save()
|
|
|
|
channel.assign_all_checks()
|
|
return redirect("hc-channels")
|
|
|
|
authorize_url = "https://trello.com/1/authorize?" + urlencode(
|
|
{
|
|
"expiration": "never",
|
|
"name": settings.SITE_NAME,
|
|
"scope": "read,write",
|
|
"response_type": "token",
|
|
"key": settings.TRELLO_APP_KEY,
|
|
"return_url": settings.SITE_ROOT + reverse("hc-add-trello"),
|
|
}
|
|
)
|
|
|
|
ctx = {
|
|
"page": "channels",
|
|
"project": request.project,
|
|
"authorize_url": authorize_url,
|
|
}
|
|
|
|
return render(request, "integrations/add_trello.html", ctx)
|
|
|
|
|
|
@login_required
|
|
def add_matrix(request):
|
|
if settings.MATRIX_ACCESS_TOKEN is None:
|
|
raise Http404("matrix integration is not available")
|
|
|
|
if request.method == "POST":
|
|
form = AddMatrixForm(request.POST)
|
|
if form.is_valid():
|
|
channel = Channel(project=request.project, kind="matrix")
|
|
channel.value = form.cleaned_data["room_id"]
|
|
|
|
# If user supplied room alias instead of ID, use it as channel name
|
|
alias = form.cleaned_data["alias"]
|
|
if not alias.startswith("!"):
|
|
channel.name = alias
|
|
|
|
channel.save()
|
|
|
|
channel.assign_all_checks()
|
|
messages.success(request, "The Matrix integration has been added!")
|
|
return redirect("hc-channels")
|
|
else:
|
|
form = AddMatrixForm()
|
|
|
|
ctx = {
|
|
"page": "channels",
|
|
"project": request.project,
|
|
"form": form,
|
|
"matrix_user_id": settings.MATRIX_USER_ID,
|
|
}
|
|
return render(request, "integrations/add_matrix.html", ctx)
|
|
|
|
|
|
@login_required
|
|
@require_POST
|
|
def trello_settings(request):
|
|
token = request.POST.get("token")
|
|
|
|
url = "https://api.trello.com/1/members/me/boards?" + urlencode(
|
|
{
|
|
"key": settings.TRELLO_APP_KEY,
|
|
"token": token,
|
|
"fields": "id,name",
|
|
"lists": "open",
|
|
"list_fields": "id,name",
|
|
}
|
|
)
|
|
|
|
r = requests.get(url)
|
|
ctx = {"token": token, "data": r.json()}
|
|
return render(request, "integrations/trello_settings.html", ctx)
|