From 544ec7ea69d40546568b8e90f38d412075fdaea6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C4=93teris=20Caune?= Date: Tue, 10 Aug 2021 10:36:58 +0300 Subject: [PATCH] Add handling for non-latin-1 characters in webhook headers --- CHANGELOG.md | 5 +++++ hc/api/tests/test_notify_webhook.py | 17 +++++++++++++++++ hc/api/transports.py | 12 +++++++++--- hc/front/forms.py | 13 +++++++++++++ hc/front/tests/test_add_webhook.py | 13 +++++++++++++ 5 files changed, 57 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d7501f9f..47da11f7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,11 @@ # Changelog All notable changes to this project will be documented in this file. +## v1.23.0 - Unreleased + +### Bug Fixes +- Add handling for non-latin-1 characters in webhook headers + ## v1.22.0 - 2020-08-06 ### Improvements diff --git a/hc/api/tests/test_notify_webhook.py b/hc/api/tests/test_notify_webhook.py index 6836e911..7d14e9d9 100644 --- a/hc/api/tests/test_notify_webhook.py +++ b/hc/api/tests/test_notify_webhook.py @@ -342,3 +342,20 @@ class NotifyWebhookTestCase(BaseTestCase): n = Notification.objects.get() self.assertEqual(n.error, "Webhook notifications are not enabled.") + + @patch("hc.api.transports.requests.request") + def test_webhooks_handle_non_ascii_in_headers(self, mock_request): + definition = { + "method_down": "GET", + "url_down": "http://foo.com", + "headers_down": {"X-Foo": "bār"}, + "body_down": "", + } + + self._setup_data(json.dumps(definition)) + self.check.save() + + self.channel.notify(self.check) + args, kwargs = mock_request.call_args + + self.assertEqual(kwargs["headers"]["X-Foo"], "bār") diff --git a/hc/api/transports.py b/hc/api/transports.py index c552c07f..930933ff 100644 --- a/hc/api/transports.py +++ b/hc/api/transports.py @@ -203,7 +203,7 @@ class HttpTransport(Transport): class Webhook(HttpTransport): - def prepare(self, template, check, urlencode=False): + def prepare(self, template, check, urlencode=False, latin1=False): """ Replace variables with actual values. """ def safe(s): @@ -220,7 +220,12 @@ class Webhook(HttpTransport): for i, tag in enumerate(check.tags_list()): ctx["$TAG%d" % (i + 1)] = safe(tag) - return replace(template, ctx) + result = replace(template, ctx) + if latin1: + # Replace non-latin-1 characters with XML character references. + result = result.encode("latin-1", "xmlcharrefreplace").decode() + + return result def is_noop(self, check): if check.status == "down" and not self.channel.url_down: @@ -242,7 +247,8 @@ class Webhook(HttpTransport): url = self.prepare(spec["url"], check, urlencode=True) headers = {} for key, value in spec["headers"].items(): - headers[key] = self.prepare(value, check) + # Header values should contain ASCII and latin-1 only + headers[key] = self.prepare(value, check, latin1=True) body = spec["body"] if body: diff --git a/hc/front/forms.py b/hc/front/forms.py index 106ef930..df3e6eb8 100644 --- a/hc/front/forms.py +++ b/hc/front/forms.py @@ -15,6 +15,14 @@ from hc.front.validators import ( import requests +def _is_latin1(s): + try: + s.encode("latin-1") + return True + except UnicodeError: + return False + + class HeadersField(forms.Field): message = """Use "Header-Name: value" pairs, one per line.""" @@ -35,6 +43,11 @@ class HeadersField(forms.Field): if not n or not v: raise ValidationError(message=self.message) + if not _is_latin1(n): + raise ValidationError( + message="Header names must not contain special characters" + ) + headers[n] = v return headers diff --git a/hc/front/tests/test_add_webhook.py b/hc/front/tests/test_add_webhook.py index 6e0fdf05..53c5bef6 100644 --- a/hc/front/tests/test_add_webhook.py +++ b/hc/front/tests/test_add_webhook.py @@ -149,6 +149,19 @@ class AddWebhookTestCase(BaseTestCase): self.assertContains(r, """invalid-header""") self.assertEqual(Channel.objects.count(), 0) + def test_it_rejects_non_latin1_in_header_name(self): + self.client.login(username="alice@example.org", password="password") + form = { + "method_down": "GET", + "url_down": "http://example.org", + "headers_down": "fō:bar", + "method_up": "GET", + } + + r = self.client.post(self.url, form) + self.assertContains(r, """must not contain special characters""") + self.assertEqual(Channel.objects.count(), 0) + def test_it_strips_headers(self): form = { "method_down": "GET",