from datetime import timedelta as td from django.db import connection from django.db.models import F from django.http import (HttpResponse, HttpResponseForbidden, HttpResponseNotFound, JsonResponse) from django.shortcuts import get_object_or_404 from django.utils import timezone from django.views.decorators.cache import never_cache from django.views.decorators.csrf import csrf_exempt from django.views.decorators.http import require_POST from hc.api import schemas from hc.api.decorators import check_api_key, uuid_or_400, validate_json from hc.api.models import Check, Notification, Ping from hc.lib.badges import check_signature, get_badge_svg @csrf_exempt @uuid_or_400 @never_cache def ping(request, code): check = get_object_or_404(Check, code=code) check.n_pings = F("n_pings") + 1 check.last_ping = timezone.now() check.last_ping_body = request.body[:10000] check.alert_after = check.get_alert_after() if check.status in ("new", "paused"): check.status = "up" check.save() check.refresh_from_db() ping = Ping(owner=check) headers = request.META ping.n = check.n_pings remote_addr = headers.get("HTTP_X_FORWARDED_FOR", headers["REMOTE_ADDR"]) ping.remote_addr = remote_addr.split(",")[0] ping.scheme = headers.get("HTTP_X_FORWARDED_PROTO", "http") ping.method = headers["REQUEST_METHOD"] # If User-Agent is longer than 200 characters, truncate it: ping.ua = headers.get("HTTP_USER_AGENT", "")[:200] ping.save() response = HttpResponse("OK") response["Access-Control-Allow-Origin"] = "*" return response def _lookup(user, spec): unique_fields = spec.get("unique", []) if unique_fields: existing_checks = Check.objects.filter(user=user) if "name" in unique_fields: existing_checks = existing_checks.filter(name=spec.get("name")) if "tags" in unique_fields: existing_checks = existing_checks.filter(tags=spec.get("tags")) if "timeout" in unique_fields: timeout = td(seconds=spec["timeout"]) existing_checks = existing_checks.filter(timeout=timeout) if "grace" in unique_fields: grace = td(seconds=spec["grace"]) existing_checks = existing_checks.filter(grace=grace) return existing_checks.first() def _update(check, spec): if "name" in spec: check.name = spec["name"] if "tags" in spec: check.tags = spec["tags"] if "timeout" in spec and "schedule" not in spec: check.kind = "simple" check.timeout = td(seconds=spec["timeout"]) if "grace" in spec: check.grace = td(seconds=spec["grace"]) if "schedule" in spec: check.kind = "cron" check.schedule = spec["schedule"] if "tz" in spec: check.tz = spec["tz"] check.save() # This needs to be done after saving the check, because of # the M2M relation between checks and channels: if "channels" in spec: if spec["channels"] == "*": check.assign_all_channels() elif spec["channels"] == "": check.channel_set.clear() return check @csrf_exempt @check_api_key @validate_json(schemas.check) def checks(request): if request.method == "GET": q = Check.objects.filter(user=request.user) doc = {"checks": [check.to_dict() for check in q]} return JsonResponse(doc) elif request.method == "POST": created = False check = _lookup(request.user, request.json) if check is None: num_checks = Check.objects.filter(user=request.user).count() if num_checks >= request.user.profile.check_limit: return HttpResponseForbidden() check = Check(user=request.user) created = True _update(check, request.json) return JsonResponse(check.to_dict(), status=201 if created else 200) # If request is neither GET nor POST, return "405 Method not allowed" return HttpResponse(status=405) @csrf_exempt @require_POST @uuid_or_400 @check_api_key @validate_json(schemas.check) def update(request, code): check = get_object_or_404(Check, code=code) if check.user != request.user: return HttpResponseForbidden() _update(check, request.json) return JsonResponse(check.to_dict()) @csrf_exempt @require_POST @uuid_or_400 @check_api_key def pause(request, code): check = get_object_or_404(Check, code=code) if check.user != request.user: return HttpResponseForbidden() check.status = "paused" check.save() return JsonResponse(check.to_dict()) @never_cache def badge(request, username, signature, tag, format="svg"): if not check_signature(username, tag, signature): return HttpResponseNotFound() status = "up" q = Check.objects.filter(user__username=username, tags__contains=tag) for check in q: if tag not in check.tags_list(): continue if status == "up" and check.in_grace_period(): status = "late" if check.get_status() == "down": status = "down" break if format == "json": return JsonResponse({"status": status}) svg = get_badge_svg(tag, status) return HttpResponse(svg, content_type="image/svg+xml") @csrf_exempt @uuid_or_400 def bounce(request, code): notification = get_object_or_404(Notification, code=code) # If webhook is more than 10 minutes late, don't accept it: td = timezone.now() - notification.created if td.total_seconds() > 600: return HttpResponseForbidden() notification.error = request.body[:200] notification.save() return HttpResponse() def status(request): with connection.cursor() as c: c.execute("SELECT 1") c.fetchone() return HttpResponse("OK")