import os import time from django.conf import settings from django.template.loader import render_to_string from django.utils import timezone from django.utils.html import escape import json import requests from urllib.parse import quote, urlencode from hc.accounts.models import Profile from hc.lib import emails from hc.lib.string import replace try: import apprise except ImportError: # Enforce settings.APPRISE_ENABLED = False try: import dbus except ImportError: # Enforce dbus = None settings.SIGNAL_CLI_ENABLED = False def tmpl(template_name, **ctx): template_path = "integrations/%s" % template_name # \xa0 is non-breaking space. It causes SMS messages to use UCS2 encoding # and cost twice the money. return render_to_string(template_path, ctx).strip().replace("\xa0", " ") class Transport(object): def __init__(self, channel): self.channel = channel def notify(self, check): """ Send notification about current status of the check. This method returns None on success, and error message on error. """ raise NotImplementedError() def is_noop(self, check): """ Return True if transport will ignore check's current status. This method is overridden in Webhook subclass where the user can configure webhook urls for "up" and "down" events, and both are optional. """ return False def checks(self): return self.channel.project.check_set.order_by("created") class Email(Transport): def notify(self, check): if not self.channel.email_verified: return "Email not verified" unsub_link = self.channel.get_unsub_link() headers = { "X-Status-Url": check.status_url, "List-Unsubscribe": "<%s>" % unsub_link, "List-Unsubscribe-Post": "List-Unsubscribe=One-Click", } from hc.accounts.models import Profile # If this email address has an associated account, include # a summary of projects the account has access to try: profile = Profile.objects.get(user__email=self.channel.email_value) projects = list(profile.projects()) except Profile.DoesNotExist: projects = None ctx = { "check": check, "ping": check.ping_set.order_by("created").last(), "projects": projects, "unsub_link": unsub_link, } emails.alert(self.channel.email_value, ctx, headers) def is_noop(self, check): if check.status == "down": return not self.channel.email_notify_down else: return not self.channel.email_notify_up class Shell(Transport): def prepare(self, template, check): """ Replace placeholders with actual values. """ ctx = { "$CODE": str(check.code), "$STATUS": check.status, "$NOW": timezone.now().replace(microsecond=0).isoformat(), "$NAME": check.name, "$TAGS": check.tags, } for i, tag in enumerate(check.tags_list()): ctx["$TAG%d" % (i + 1)] = tag return replace(template, ctx) def is_noop(self, check): if check.status == "down" and not self.channel.cmd_down: return True if check.status == "up" and not self.channel.cmd_up: return True return False def notify(self, check): if not settings.SHELL_ENABLED: return "Shell commands are not enabled" if check.status == "up": cmd = self.channel.cmd_up elif check.status == "down": cmd = self.channel.cmd_down cmd = self.prepare(cmd, check) code = os.system(cmd) if code != 0: return "Command returned exit code %d" % code class HttpTransport(Transport): @classmethod def get_error(cls, response): # Override in subclasses: look for a specific error message in the # response and return it. return None @classmethod def _request(cls, method, url, **kwargs): try: options = dict(kwargs) options["timeout"] = 10 if "headers" not in options: options["headers"] = {} if "User-Agent" not in options["headers"]: options["headers"]["User-Agent"] = "healthchecks.io" r = requests.request(method, url, **options) if r.status_code not in (200, 201, 202, 204): m = cls.get_error(r) if m: return f'Received status code {r.status_code} with a message: "{m}"' return f"Received status code {r.status_code}" except requests.exceptions.Timeout: # Well, we tried return "Connection timed out" except requests.exceptions.ConnectionError: return "Connection failed" @classmethod def _request_with_retries(cls, method, url, use_retries=True, **kwargs): start = time.time() error = cls._request(method, url, **kwargs) # 2nd try if error and use_retries: error = cls._request(method, url, **kwargs) # 3rd try. Only do the 3rd try if we have spent 10s or less in first two # tries. Otherwise we risk overshooting the 20s total time budget. if error and use_retries and time.time() - start < 10: error = cls._request(method, url, **kwargs) return error @classmethod def get(cls, url, **kwargs): return cls._request_with_retries("get", url, **kwargs) @classmethod def post(cls, url, **kwargs): return cls._request_with_retries("post", url, **kwargs) @classmethod def put(cls, url, **kwargs): return cls._request_with_retries("put", url, **kwargs) class Webhook(HttpTransport): def prepare(self, template, check, urlencode=False, latin1=False): """ Replace variables with actual values. """ def safe(s): return quote(s) if urlencode else s ctx = { "$CODE": str(check.code), "$STATUS": check.status, "$NOW": safe(timezone.now().replace(microsecond=0).isoformat()), "$NAME": safe(check.name), "$TAGS": safe(check.tags), } for i, tag in enumerate(check.tags_list()): ctx["$TAG%d" % (i + 1)] = safe(tag) result = replace(template, ctx) if latin1: # Replace non-latin-1 characters with XML character references. result = result.encode("latin-1", "xmlcharrefreplace").decode("latin-1") return result def is_noop(self, check): if check.status == "down" and not self.channel.url_down: return True if check.status == "up" and not self.channel.url_up: return True return False def notify(self, check): if not settings.WEBHOOKS_ENABLED: return "Webhook notifications are not enabled." spec = self.channel.webhook_spec(check.status) if not spec["url"]: return "Empty webhook URL" url = self.prepare(spec["url"], check, urlencode=True) headers = {} for key, value in spec["headers"].items(): # Header values should contain ASCII and latin-1 only headers[key] = self.prepare(value, check, latin1=True) body = spec["body"] if body: body = self.prepare(body, check).encode() # When sending a test notification, don't retry on failures. use_retries = False if getattr(check, "is_test") else True if spec["method"] == "GET": return self.get(url, use_retries=use_retries, headers=headers) elif spec["method"] == "POST": return self.post(url, use_retries=use_retries, data=body, headers=headers) elif spec["method"] == "PUT": return self.put(url, use_retries=use_retries, data=body, headers=headers) class Slack(HttpTransport): def notify(self, check): if self.channel.kind == "slack" and not settings.SLACK_ENABLED: return "Slack notifications are not enabled." if self.channel.kind == "mattermost" and not settings.MATTERMOST_ENABLED: return "Mattermost notifications are not enabled." text = tmpl("slack_message.json", check=check) payload = json.loads(text) return self.post(self.channel.slack_webhook_url, json=payload) class HipChat(HttpTransport): def is_noop(self, check): return True class Opsgenie(HttpTransport): @classmethod def get_error(cls, response): try: return response.json().get("message") except ValueError: pass def notify(self, check): if not settings.OPSGENIE_ENABLED: return "Opsgenie notifications are not enabled." headers = { "Conent-Type": "application/json", "Authorization": "GenieKey %s" % self.channel.opsgenie_key, } payload = {"alias": str(check.code), "source": settings.SITE_NAME} if check.status == "down": payload["tags"] = check.tags_list() payload["message"] = tmpl("opsgenie_message.html", check=check) payload["note"] = tmpl("opsgenie_note.html", check=check) payload["description"] = tmpl("opsgenie_description.html", check=check) url = "https://api.opsgenie.com/v2/alerts" if self.channel.opsgenie_region == "eu": url = "https://api.eu.opsgenie.com/v2/alerts" if check.status == "up": url += "/%s/close?identifierType=alias" % check.code return self.post(url, json=payload, headers=headers) class PagerDuty(HttpTransport): URL = "https://events.pagerduty.com/generic/2010-04-15/create_event.json" def notify(self, check): if not settings.PD_ENABLED: return "PagerDuty notifications are not enabled." description = tmpl("pd_description.html", check=check) payload = { "service_key": self.channel.pd_service_key, "incident_key": str(check.code), "event_type": "trigger" if check.status == "down" else "resolve", "description": description, "client": settings.SITE_NAME, "client_url": check.details_url(), } return self.post(self.URL, json=payload) class PagerTree(HttpTransport): def notify(self, check): if not settings.PAGERTREE_ENABLED: return "PagerTree notifications are not enabled." url = self.channel.value headers = {"Conent-Type": "application/json"} payload = { "incident_key": str(check.code), "event_type": "trigger" if check.status == "down" else "resolve", "title": tmpl("pagertree_title.html", check=check), "description": tmpl("pagertree_description.html", check=check), "client": settings.SITE_NAME, "client_url": settings.SITE_ROOT, "tags": ",".join(check.tags_list()), } return self.post(url, json=payload, headers=headers) class PagerTeam(HttpTransport): def is_noop(self, check): return True class Pushbullet(HttpTransport): def notify(self, check): text = tmpl("pushbullet_message.html", check=check) url = "https://api.pushbullet.com/v2/pushes" headers = { "Access-Token": self.channel.value, "Conent-Type": "application/json", } payload = {"type": "note", "title": settings.SITE_NAME, "body": text} return self.post(url, json=payload, headers=headers) class Pushover(HttpTransport): URL = "https://api.pushover.net/1/messages.json" CANCEL_TMPL = "https://api.pushover.net/1/receipts/cancel_by_tag/%s.json" def notify(self, check): pieces = self.channel.value.split("|") user_key, down_prio = pieces[0], pieces[1] # The third element, if present, is the priority for "up" events up_prio = down_prio if len(pieces) == 3: up_prio = pieces[2] from hc.api.models import TokenBucket if not TokenBucket.authorize_pushover(user_key): return "Rate limit exceeded" # If down events have the emergency priority, # send a cancel call first if check.status == "up" and down_prio == "2": url = self.CANCEL_TMPL % check.unique_key payload = {"token": settings.PUSHOVER_API_TOKEN} self.post(url, data=payload) others = self.checks().filter(status="down").exclude(code=check.code) # list() executes the query, to avoid DB access while # rendering a template ctx = {"check": check, "down_checks": list(others)} text = tmpl("pushover_message.html", **ctx) title = tmpl("pushover_title.html", **ctx) prio = up_prio if check.status == "up" else down_prio payload = { "token": settings.PUSHOVER_API_TOKEN, "user": user_key, "message": text, "title": title, "html": 1, "priority": int(prio), "tags": check.unique_key, "url": check.cloaked_url(), "url_title": f"View on {settings.SITE_NAME}", } # Emergency notification if prio == "2": payload["retry"] = settings.PUSHOVER_EMERGENCY_RETRY_DELAY payload["expire"] = settings.PUSHOVER_EMERGENCY_EXPIRATION return self.post(self.URL, data=payload) class VictorOps(HttpTransport): def notify(self, check): if not settings.VICTOROPS_ENABLED: return "Splunk On-Call notifications are not enabled." description = tmpl("victorops_description.html", check=check) mtype = "CRITICAL" if check.status == "down" else "RECOVERY" payload = { "entity_id": str(check.code), "message_type": mtype, "entity_display_name": check.name_then_code(), "state_message": description, "monitoring_tool": settings.SITE_NAME, } return self.post(self.channel.value, json=payload) class Matrix(HttpTransport): def get_url(self): s = quote(self.channel.value) url = settings.MATRIX_HOMESERVER url += "/_matrix/client/r0/rooms/%s/send/m.room.message?" % s url += urlencode({"access_token": settings.MATRIX_ACCESS_TOKEN}) return url def notify(self, check): plain = tmpl("matrix_description.html", check=check) formatted = tmpl("matrix_description_formatted.html", check=check) payload = { "msgtype": "m.text", "body": plain, "format": "org.matrix.custom.html", "formatted_body": formatted, } return self.post(self.get_url(), json=payload) class Discord(HttpTransport): def notify(self, check): text = tmpl("slack_message.json", check=check) payload = json.loads(text) url = self.channel.discord_webhook_url + "/slack" return self.post(url, json=payload) class Telegram(HttpTransport): SM = "https://api.telegram.org/bot%s/sendMessage" % settings.TELEGRAM_TOKEN @classmethod def get_error(cls, response): try: return response.json().get("description") except ValueError: pass @classmethod def send(cls, chat_id, text): # Telegram.send is a separate method because it is also used in # hc.front.views.telegram_bot to send invite links. return cls.post( cls.SM, json={"chat_id": chat_id, "text": text, "parse_mode": "html"} ) def notify(self, check): from hc.api.models import TokenBucket if not TokenBucket.authorize_telegram(self.channel.telegram_id): return "Rate limit exceeded" text = tmpl("telegram_message.html", check=check) return self.send(self.channel.telegram_id, text) class Sms(HttpTransport): URL = "https://api.twilio.com/2010-04-01/Accounts/%s/Messages.json" def is_noop(self, check): if check.status == "down": return not self.channel.sms_notify_down else: return not self.channel.sms_notify_up def notify(self, check): profile = Profile.objects.for_user(self.channel.project.owner) if not profile.authorize_sms(): profile.send_sms_limit_notice("SMS") return "Monthly SMS limit exceeded" url = self.URL % settings.TWILIO_ACCOUNT auth = (settings.TWILIO_ACCOUNT, settings.TWILIO_AUTH) text = tmpl("sms_message.html", check=check, site_name=settings.SITE_NAME) data = { "From": settings.TWILIO_FROM, "To": self.channel.phone_number, "Body": text, "StatusCallback": check.status_url, } return self.post(url, data=data, auth=auth) class Call(HttpTransport): URL = "https://api.twilio.com/2010-04-01/Accounts/%s/Calls.json" def is_noop(self, check): return check.status != "down" def notify(self, check): profile = Profile.objects.for_user(self.channel.project.owner) if not profile.authorize_call(): profile.send_call_limit_notice() return "Monthly phone call limit exceeded" url = self.URL % settings.TWILIO_ACCOUNT auth = (settings.TWILIO_ACCOUNT, settings.TWILIO_AUTH) twiml = tmpl("call_message.html", check=check, site_name=settings.SITE_NAME) data = { "From": settings.TWILIO_FROM, "To": self.channel.phone_number, "Twiml": twiml, "StatusCallback": check.status_url, } return self.post(url, data=data, auth=auth) class WhatsApp(HttpTransport): URL = "https://api.twilio.com/2010-04-01/Accounts/%s/Messages.json" def is_noop(self, check): if check.status == "down": return not self.channel.whatsapp_notify_down else: return not self.channel.whatsapp_notify_up def notify(self, check): profile = Profile.objects.for_user(self.channel.project.owner) if not profile.authorize_sms(): profile.send_sms_limit_notice("WhatsApp") return "Monthly message limit exceeded" url = self.URL % settings.TWILIO_ACCOUNT auth = (settings.TWILIO_ACCOUNT, settings.TWILIO_AUTH) text = tmpl("whatsapp_message.html", check=check, site_name=settings.SITE_NAME) data = { "From": "whatsapp:%s" % settings.TWILIO_FROM, "To": "whatsapp:%s" % self.channel.phone_number, "Body": text, "StatusCallback": check.status_url, } return self.post(url, data=data, auth=auth) class Trello(HttpTransport): URL = "https://api.trello.com/1/cards" def is_noop(self, check): return check.status != "down" def notify(self, check): params = { "idList": self.channel.trello_list_id, "name": tmpl("trello_name.html", check=check), "desc": tmpl("trello_desc.html", check=check), "key": settings.TRELLO_APP_KEY, "token": self.channel.trello_token, } return self.post(self.URL, params=params) class Apprise(HttpTransport): def notify(self, check): if not settings.APPRISE_ENABLED: # Not supported and/or enabled return "Apprise is disabled and/or not installed" a = apprise.Apprise() title = tmpl("apprise_title.html", check=check) body = tmpl("apprise_description.html", check=check) a.add(self.channel.value) notify_type = ( apprise.NotifyType.SUCCESS if check.status == "up" else apprise.NotifyType.FAILURE ) return ( "Failed" if not a.notify(body=body, title=title, notify_type=notify_type) else None ) class MsTeams(HttpTransport): def escape_md(self, s): # Escape special HTML characters s = escape(s) # Escape characters that have special meaning in Markdown for c in r"\`*_{}[]()#+-.!|": s = s.replace(c, "\\" + c) return s def notify(self, check): if not settings.MSTEAMS_ENABLED: return "MS Teams notifications are not enabled." text = tmpl("msteams_message.json", check=check) payload = json.loads(text) # MS Teams escapes HTML special characters in the summary field. # It does not interpret summary content as Markdown. name = check.name_then_code() payload["summary"] = f"“{name}” is {check.status.upper()}." # MS teams *strips* HTML special characters from the title field. # To avoid that, we use escape(). # It does not interpret title as Markdown. safe_name = escape(name) payload["title"] = f"“{safe_name}” is {check.status.upper()}." # MS teams allows some HTML in the section text. # It also interprets the section text as Markdown. # We want to display the raw content, angle brackets and all, # so we run escape() and then additionally escape Markdown: payload["sections"][0]["text"] = self.escape_md(check.desc) return self.post(self.channel.value, json=payload) class Zulip(HttpTransport): @classmethod def get_error(cls, response): try: return response.json().get("msg") except ValueError: pass def notify(self, check): if not settings.ZULIP_ENABLED: return "Zulip notifications are not enabled." url = self.channel.zulip_site + "/api/v1/messages" auth = (self.channel.zulip_bot_email, self.channel.zulip_api_key) data = { "type": self.channel.zulip_type, "to": self.channel.zulip_to, "topic": tmpl("zulip_topic.html", check=check), "content": tmpl("zulip_content.html", check=check), } return self.post(url, data=data, auth=auth) class Spike(HttpTransport): def notify(self, check): if not settings.SPIKE_ENABLED: return "Spike notifications are not enabled." url = self.channel.value headers = {"Conent-Type": "application/json"} payload = { "check_id": str(check.code), "title": tmpl("spike_title.html", check=check), "message": tmpl("spike_description.html", check=check), "status": check.status, } return self.post(url, json=payload, headers=headers) class LineNotify(HttpTransport): URL = "https://notify-api.line.me/api/notify" def notify(self, check): headers = { "Content-Type": "application/x-www-form-urlencoded", "Authorization": "Bearer %s" % self.channel.linenotify_token, } payload = {"message": tmpl("linenotify_message.html", check=check)} return self.post(self.URL, headers=headers, params=payload) class Signal(Transport): def is_noop(self, check): if check.status == "down": return not self.channel.signal_notify_down else: return not self.channel.signal_notify_up def get_service(self): bus = dbus.SystemBus() signal_object = bus.get_object("org.asamk.Signal", "/org/asamk/Signal") return dbus.Interface(signal_object, "org.asamk.Signal") def notify(self, check): if not settings.SIGNAL_CLI_ENABLED: return "Signal notifications are not enabled" from hc.api.models import TokenBucket if not TokenBucket.authorize_signal(self.channel.phone_number): return "Rate limit exceeded" text = tmpl("signal_message.html", check=check, site_name=settings.SITE_NAME) try: dbus.SystemBus().call_blocking( "org.asamk.Signal", "/org/asamk/Signal", "org.asamk.Signal", "sendMessage", "sasas", (text, [], [self.channel.phone_number]), timeout=30, ) except dbus.exceptions.DBusException as e: if "NotFoundException" in str(e): return "Recipient not found" return "signal-cli call failed"