@ -0,0 +1,42 @@ | |||
import getpass | |||
from django.core.management.base import BaseCommand | |||
from hc.accounts.forms import AvailableEmailForm | |||
from hc.accounts.views import _make_user | |||
class Command(BaseCommand): | |||
help = """Create a super-user account.""" | |||
def handle(self, *args, **options): | |||
email = None | |||
password = None | |||
while not email: | |||
raw = input("Email address:") | |||
form = AvailableEmailForm({"identity": raw}) | |||
if not form.is_valid(): | |||
self.stderr.write("Error: " + " ".join(form.errors["identity"])) | |||
continue | |||
email = form.cleaned_data["identity"] | |||
while not password: | |||
p1 = getpass.getpass() | |||
p2 = getpass.getpass("Password (again):") | |||
if p1.strip() == "": | |||
self.stderr.write("Error: Blank passwords aren't allowed.") | |||
continue | |||
if p1 != p2: | |||
self.stderr.write("Error: Your passwords didn't match.") | |||
continue | |||
password = p1 | |||
user = _make_user(email) | |||
user.set_password(password) | |||
user.is_staff = True | |||
user.is_superuser = True | |||
user.save() | |||
return "Superuser created successfully." |
@ -0,0 +1,23 @@ | |||
# Generated by Django 2.2.6 on 2019-11-19 13:46 | |||
from django.db import migrations, models | |||
class Migration(migrations.Migration): | |||
dependencies = [ | |||
('accounts', '0027_profile_deletion_notice_date'), | |||
] | |||
operations = [ | |||
migrations.AddField( | |||
model_name='profile', | |||
name='last_active_date', | |||
field=models.DateTimeField(blank=True, null=True), | |||
), | |||
migrations.AlterField( | |||
model_name='profile', | |||
name='sms_limit', | |||
field=models.IntegerField(default=5), | |||
), | |||
] |
@ -0,0 +1,17 @@ | |||
# Generated by Django 3.0.1 on 2020-03-02 07:56 | |||
from django.db import migrations | |||
class Migration(migrations.Migration): | |||
dependencies = [ | |||
('accounts', '0028_auto_20191119_1346'), | |||
] | |||
operations = [ | |||
migrations.RemoveField( | |||
model_name='profile', | |||
name='current_project', | |||
), | |||
] |
@ -2,12 +2,7 @@ from hc.accounts.models import Project | |||
from hc.test import BaseTestCase | |||
class RemoveProjectTestCase(BaseTestCase): | |||
def setUp(self): | |||
super(RemoveProjectTestCase, self).setUp() | |||
self.url = "/projects/%s/remove/" % self.project.code | |||
class AddProjectTestCase(BaseTestCase): | |||
def test_it_works(self): | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.post("/projects/add/", {"name": "My Second Project"}) | |||
@ -16,10 +11,6 @@ class RemoveProjectTestCase(BaseTestCase): | |||
self.assertRedirects(r, "/projects/%s/checks/" % p.code) | |||
self.assertEqual(str(p.code), p.badge_key) | |||
# Alice's current project should be the just created one | |||
self.profile.refresh_from_db() | |||
self.assertEqual(self.profile.current_project, p) | |||
def test_it_rejects_get(self): | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.get("/projects/add/") | |||
@ -24,13 +24,14 @@ class LoginTestCase(BaseTestCase): | |||
def test_it_sends_link_with_next(self): | |||
form = {"identity": "[email protected]"} | |||
r = self.client.post("/accounts/login/?next=/integrations/add_slack/", form) | |||
r = self.client.post("/accounts/login/?next=" + self.channels_url, form) | |||
self.assertRedirects(r, "/accounts/login_link_sent/") | |||
self.assertIn("auto-login", r.cookies) | |||
# The check_token link should have a ?next= query parameter: | |||
self.assertEqual(len(mail.outbox), 1) | |||
body = mail.outbox[0].body | |||
self.assertTrue("/?next=/integrations/add_slack/" in body) | |||
self.assertTrue("/?next=" + self.channels_url in body) | |||
@override_settings(SECRET_KEY="test-secret") | |||
def test_it_rate_limits_emails(self): | |||
@ -84,7 +85,7 @@ class LoginTestCase(BaseTestCase): | |||
form = {"action": "login", "email": "[email protected]", "password": "password"} | |||
samples = ["/integrations/add_slack/", "/checks/%s/details/" % check.code] | |||
samples = [self.channels_url, "/checks/%s/details/" % check.code] | |||
for s in samples: | |||
r = self.client.post("/accounts/login/?next=%s" % s, form) | |||
@ -3,7 +3,7 @@ from django.core import mail | |||
from django.conf import settings | |||
from django.test.utils import override_settings | |||
from hc.test import BaseTestCase | |||
from hc.accounts.models import Member | |||
from hc.accounts.models import Member, Project | |||
from hc.api.models import TokenBucket | |||
@ -76,17 +76,37 @@ class ProjectTestCase(BaseTestCase): | |||
project=self.project, user__email="[email protected]" | |||
) | |||
profile = member.user.profile | |||
self.assertEqual(profile.current_project, self.project) | |||
# The new user should not have their own project | |||
self.assertFalse(member.user.project_set.exists()) | |||
# And an email should have been sent | |||
subj = ( | |||
"You have been invited to join" | |||
" Alice's Project on %s" % settings.SITE_NAME | |||
" Alice's Project on %s" % settings.SITE_NAME | |||
) | |||
self.assertEqual(mail.outbox[0].subject, subj) | |||
self.assertHTMLEqual(mail.outbox[0].subject, subj) | |||
def test_it_adds_member_from_another_team(self): | |||
# With team limit at zero, we should not be able to invite any new users | |||
self.profile.team_limit = 0 | |||
self.profile.save() | |||
# But Charlie will have an existing membership in another Alice's project | |||
# so Alice *should* be able to invite Charlie: | |||
p2 = Project.objects.create(owner=self.alice) | |||
Member.objects.create(user=self.charlie, project=p2) | |||
self.client.login(username="[email protected]", password="password") | |||
form = {"invite_team_member": "1", "email": "[email protected]"} | |||
r = self.client.post(self.url, form) | |||
self.assertEqual(r.status_code, 200) | |||
q = Member.objects.filter(project=self.project, user=self.charlie) | |||
self.assertEqual(q.count(), 1) | |||
# And this should not have affected the rate limit: | |||
q = TokenBucket.objects.filter(value="invite-%d" % self.alice.id) | |||
self.assertFalse(q.exists()) | |||
@override_settings(SECRET_KEY="test-secret") | |||
def test_it_rate_limits_invites(self): | |||
@ -126,10 +146,7 @@ class ProjectTestCase(BaseTestCase): | |||
r = self.client.post(self.url, form) | |||
self.assertEqual(r.status_code, 200) | |||
self.assertEqual(Member.objects.count(), 0) | |||
self.bobs_profile.refresh_from_db() | |||
self.assertEqual(self.bobs_profile.current_project, None) | |||
self.assertFalse(Member.objects.exists()) | |||
def test_it_requires_owner_to_remove_team_member(self): | |||
self.client.login(username="[email protected]", password="password") | |||
@ -146,9 +163,6 @@ class ProjectTestCase(BaseTestCase): | |||
r = self.client.post(url, form) | |||
self.assertEqual(r.status_code, 400) | |||
self.profile.refresh_from_db() | |||
self.assertIsNotNone(self.profile.current_project) | |||
def test_it_sets_project_name(self): | |||
self.client.login(username="[email protected]", password="password") | |||
@ -0,0 +1,106 @@ | |||
from datetime import timedelta as td | |||
from unittest.mock import Mock | |||
from django.core import mail | |||
from django.utils.timezone import now | |||
from hc.accounts.management.commands.senddeletionnotices import Command | |||
from hc.accounts.models import Member | |||
from hc.api.models import Check, Ping | |||
from hc.test import BaseTestCase | |||
class SendDeletionNoticesTestCase(BaseTestCase): | |||
def setUp(self): | |||
super(SendDeletionNoticesTestCase, self).setUp() | |||
# Make alice eligible for notice -- signed up more than 1 year ago | |||
self.alice.date_joined = now() - td(days=500) | |||
self.alice.save() | |||
self.profile.sms_limit = 5 | |||
self.profile.save() | |||
# remove members from alice's project | |||
self.project.member_set.all().delete() | |||
def test_it_sends_notice(self): | |||
cmd = Command(stdout=Mock()) | |||
cmd.pause = Mock() # don't pause for 1s | |||
result = cmd.handle() | |||
self.assertEqual(result, "Done! Sent 1 notices") | |||
self.profile.refresh_from_db() | |||
self.assertTrue(self.profile.deletion_notice_date) | |||
email = mail.outbox[0] | |||
self.assertEqual(email.subject, "Inactive Account Notification") | |||
def test_it_checks_last_login(self): | |||
# alice has logged in recently: | |||
self.alice.last_login = now() - td(days=15) | |||
self.alice.save() | |||
result = Command(stdout=Mock()).handle() | |||
self.assertEqual(result, "Done! Sent 0 notices") | |||
self.profile.refresh_from_db() | |||
self.assertIsNone(self.profile.deletion_notice_date) | |||
def test_it_checks_date_joined(self): | |||
# alice signed up recently: | |||
self.alice.date_joined = now() - td(days=15) | |||
self.alice.save() | |||
result = Command(stdout=Mock()).handle() | |||
self.assertEqual(result, "Done! Sent 0 notices") | |||
self.profile.refresh_from_db() | |||
self.assertIsNone(self.profile.deletion_notice_date) | |||
def test_it_checks_deletion_notice_date(self): | |||
# alice has already received a deletion notice | |||
self.profile.deletion_notice_date = now() - td(days=15) | |||
self.profile.save() | |||
result = Command(stdout=Mock()).handle() | |||
self.assertEqual(result, "Done! Sent 0 notices") | |||
def test_it_checks_sms_limit(self): | |||
# alice has a paid account | |||
self.profile.sms_limit = 50 | |||
self.profile.save() | |||
result = Command(stdout=Mock()).handle() | |||
self.assertEqual(result, "Done! Sent 0 notices") | |||
self.profile.refresh_from_db() | |||
self.assertIsNone(self.profile.deletion_notice_date) | |||
def test_it_checks_team_members(self): | |||
# bob has access to alice's project | |||
Member.objects.create(user=self.bob, project=self.project) | |||
result = Command(stdout=Mock()).handle() | |||
self.assertEqual(result, "Done! Sent 0 notices") | |||
self.profile.refresh_from_db() | |||
self.assertIsNone(self.profile.deletion_notice_date) | |||
def test_it_checks_recent_pings(self): | |||
check = Check.objects.create(project=self.project) | |||
Ping.objects.create(owner=check) | |||
result = Command(stdout=Mock()).handle() | |||
self.assertEqual(result, "Done! Sent 0 notices") | |||
self.profile.refresh_from_db() | |||
self.assertIsNone(self.profile.deletion_notice_date) | |||
def test_it_checks_last_active_date(self): | |||
# alice has been browsing the site recently | |||
self.profile.last_active_date = now() - td(days=15) | |||
self.profile.save() | |||
result = Command(stdout=Mock()).handle() | |||
self.assertEqual(result, "Done! Sent 0 notices") |
@ -0,0 +1,18 @@ | |||
# Generated by Django 2.2.6 on 2019-11-19 13:46 | |||
from django.db import migrations, models | |||
class Migration(migrations.Migration): | |||
dependencies = [ | |||
('api', '0063_auto_20190903_0901'), | |||
] | |||
operations = [ | |||
migrations.AlterField( | |||
model_name='channel', | |||
name='kind', | |||
field=models.CharField(choices=[('email', 'Email'), ('webhook', 'Webhook'), ('hipchat', 'HipChat'), ('slack', 'Slack'), ('pd', 'PagerDuty'), ('pagertree', 'PagerTree'), ('pagerteam', 'Pager Team'), ('po', 'Pushover'), ('pushbullet', 'Pushbullet'), ('opsgenie', 'OpsGenie'), ('victorops', 'VictorOps'), ('discord', 'Discord'), ('telegram', 'Telegram'), ('sms', 'SMS'), ('zendesk', 'Zendesk'), ('trello', 'Trello'), ('matrix', 'Matrix'), ('whatsapp', 'WhatsApp'), ('apprise', 'Apprise'), ('mattermost', 'Mattermost'), ('msteams', 'Microsoft Teams')], max_length=20), | |||
), | |||
] |
@ -0,0 +1,23 @@ | |||
# Generated by Django 2.2.6 on 2019-11-27 12:40 | |||
from django.db import migrations, models | |||
class Migration(migrations.Migration): | |||
dependencies = [ | |||
('api', '0064_auto_20191119_1346'), | |||
] | |||
operations = [ | |||
migrations.AddField( | |||
model_name='check', | |||
name='methods', | |||
field=models.CharField(blank=True, max_length=30), | |||
), | |||
migrations.AlterField( | |||
model_name='channel', | |||
name='kind', | |||
field=models.CharField(choices=[('email', 'Email'), ('webhook', 'Webhook'), ('hipchat', 'HipChat'), ('slack', 'Slack'), ('pd', 'PagerDuty'), ('pagertree', 'PagerTree'), ('pagerteam', 'Pager Team'), ('po', 'Pushover'), ('pushbullet', 'Pushbullet'), ('opsgenie', 'OpsGenie'), ('victorops', 'VictorOps'), ('discord', 'Discord'), ('telegram', 'Telegram'), ('sms', 'SMS'), ('zendesk', 'Zendesk'), ('trello', 'Trello'), ('matrix', 'Matrix'), ('whatsapp', 'WhatsApp'), ('apprise', 'Apprise'), ('mattermost', 'Mattermost'), ('msteams', 'Microsoft Teams'), ('shell', 'Shell Command')], max_length=20), | |||
), | |||
] |
@ -0,0 +1,18 @@ | |||
# Generated by Django 3.0.1 on 2020-01-02 12:25 | |||
from django.db import migrations, models | |||
class Migration(migrations.Migration): | |||
dependencies = [ | |||
('api', '0065_auto_20191127_1240'), | |||
] | |||
operations = [ | |||
migrations.AddField( | |||
model_name='channel', | |||
name='last_error', | |||
field=models.CharField(blank=True, max_length=200), | |||
), | |||
] |
@ -0,0 +1,27 @@ | |||
# Generated by Django 3.0.1 on 2020-01-02 14:28 | |||
from django.db import migrations | |||
def fill_last_errors(apps, schema_editor): | |||
Channel = apps.get_model("api", "Channel") | |||
Notification = apps.get_model("api", "Notification") | |||
for ch in Channel.objects.all(): | |||
error = "" | |||
try: | |||
n = Notification.objects.filter(channel=ch).latest() | |||
error = n.error | |||
except Notification.DoesNotExist: | |||
pass | |||
ch.last_error = error | |||
ch.save() | |||
class Migration(migrations.Migration): | |||
dependencies = [ | |||
("api", "0066_channel_last_error"), | |||
] | |||
operations = [migrations.RunPython(fill_last_errors, migrations.RunPython.noop)] |
@ -0,0 +1,18 @@ | |||
# Generated by Django 3.0.1 on 2020-01-17 10:23 | |||
from django.db import migrations, models | |||
class Migration(migrations.Migration): | |||
dependencies = [ | |||
('api', '0067_last_error_values'), | |||
] | |||
operations = [ | |||
migrations.AlterField( | |||
model_name='ping', | |||
name='body', | |||
field=models.TextField(blank=True, null=True), | |||
), | |||
] |
@ -0,0 +1,19 @@ | |||
# Generated by Django 3.0.1 on 2020-01-17 12:27 | |||
from django.db import migrations, models | |||
import django.utils.timezone | |||
class Migration(migrations.Migration): | |||
dependencies = [ | |||
('api', '0068_auto_20200117_1023'), | |||
] | |||
operations = [ | |||
migrations.AlterField( | |||
model_name='ping', | |||
name='created', | |||
field=models.DateTimeField(default=django.utils.timezone.now), | |||
), | |||
] |
@ -0,0 +1,64 @@ | |||
from datetime import timedelta as td | |||
from django.utils.timezone import now | |||
from hc.api.models import Channel, Check | |||
from hc.test import BaseTestCase | |||
class GetCheckTestCase(BaseTestCase): | |||
def setUp(self): | |||
super(GetCheckTestCase, self).setUp() | |||
self.now = now().replace(microsecond=0) | |||
self.a1 = Check(project=self.project, name="Alice 1") | |||
self.a1.timeout = td(seconds=3600) | |||
self.a1.grace = td(seconds=900) | |||
self.a1.n_pings = 0 | |||
self.a1.status = "new" | |||
self.a1.tags = "a1-tag a1-additional-tag" | |||
self.a1.desc = "This is description" | |||
self.a1.save() | |||
self.c1 = Channel.objects.create(project=self.project) | |||
self.a1.channel_set.add(self.c1) | |||
def get(self, code, api_key="X" * 32): | |||
url = "/api/v1/checks/%s" % code | |||
return self.client.get(url, HTTP_X_API_KEY=api_key) | |||
def test_it_works(self): | |||
r = self.get(self.a1.code) | |||
self.assertEqual(r.status_code, 200) | |||
self.assertEqual(r["Access-Control-Allow-Origin"], "*") | |||
doc = r.json() | |||
self.assertEqual(len(doc), 13) | |||
self.assertEqual(doc["timeout"], 3600) | |||
self.assertEqual(doc["grace"], 900) | |||
self.assertEqual(doc["ping_url"], self.a1.url()) | |||
self.assertEqual(doc["last_ping"], None) | |||
self.assertEqual(doc["n_pings"], 0) | |||
self.assertEqual(doc["status"], "new") | |||
self.assertEqual(doc["channels"], str(self.c1.code)) | |||
self.assertEqual(doc["desc"], "This is description") | |||
def test_it_handles_invalid_uuid(self): | |||
r = self.get("not-an-uuid") | |||
self.assertEqual(r.status_code, 404) | |||
def test_it_handles_missing_check(self): | |||
made_up_code = "07c2f548-9850-4b27-af5d-6c9dc157ec02" | |||
r = self.get(made_up_code) | |||
self.assertEqual(r.status_code, 404) | |||
def test_readonly_key_works(self): | |||
self.project.api_key_readonly = "R" * 32 | |||
self.project.save() | |||
r = self.get(self.a1.code, api_key=self.project.api_key_readonly) | |||
self.assertEqual(r.status_code, 200) | |||
# When using readonly keys, the ping URLs should not be exposed: | |||
self.assertNotContains(r, self.a1.url()) |
@ -2,12 +2,12 @@ | |||
from datetime import timedelta as td | |||
import json | |||
from unittest.mock import patch, Mock | |||
from django.core import mail | |||
from django.utils.timezone import now | |||
from hc.api.models import Channel, Check, Notification | |||
from hc.api.models import Channel, Check, Notification, TokenBucket | |||
from hc.test import BaseTestCase | |||
from mock import patch, Mock | |||
from requests.exceptions import ConnectionError, Timeout | |||
from django.test.utils import override_settings | |||
@ -28,7 +28,14 @@ class NotifyTestCase(BaseTestCase): | |||
@patch("hc.api.transports.requests.request") | |||
def test_webhook(self, mock_get): | |||
self._setup_data("webhook", "http://example") | |||
definition = { | |||
"method_down": "GET", | |||
"url_down": "http://example", | |||
"body_down": "", | |||
"headers_down": {}, | |||
} | |||
self._setup_data("webhook", json.dumps(definition)) | |||
mock_get.return_value.status_code = 200 | |||
self.channel.notify(self.check) | |||
@ -41,31 +48,44 @@ class NotifyTestCase(BaseTestCase): | |||
@patch("hc.api.transports.requests.request", side_effect=Timeout) | |||
def test_webhooks_handle_timeouts(self, mock_get): | |||
self._setup_data("webhook", "http://example") | |||
definition = { | |||
"method_down": "GET", | |||
"url_down": "http://example", | |||
"body_down": "", | |||
"headers_down": {}, | |||
} | |||
self._setup_data("webhook", json.dumps(definition)) | |||
self.channel.notify(self.check) | |||
n = Notification.objects.get() | |||
self.assertEqual(n.error, "Connection timed out") | |||
self.assertEqual(self.channel.last_error, "Connection timed out") | |||
@patch("hc.api.transports.requests.request", side_effect=ConnectionError) | |||
def test_webhooks_handle_connection_errors(self, mock_get): | |||
self._setup_data("webhook", "http://example") | |||
definition = { | |||
"method_down": "GET", | |||
"url_down": "http://example", | |||
"body_down": "", | |||
"headers_down": {}, | |||
} | |||
self._setup_data("webhook", json.dumps(definition)) | |||
self.channel.notify(self.check) | |||
n = Notification.objects.get() | |||
self.assertEqual(n.error, "Connection failed") | |||
@patch("hc.api.transports.requests.request") | |||
def test_webhooks_ignore_up_events(self, mock_get): | |||
self._setup_data("webhook", "http://example", status="up") | |||
self.channel.notify(self.check) | |||
self.assertFalse(mock_get.called) | |||
self.assertEqual(Notification.objects.count(), 0) | |||
@patch("hc.api.transports.requests.request") | |||
def test_webhooks_handle_500(self, mock_get): | |||
self._setup_data("webhook", "http://example") | |||
definition = { | |||
"method_down": "GET", | |||
"url_down": "http://example", | |||
"body_down": "", | |||
"headers_down": {}, | |||
} | |||
self._setup_data("webhook", json.dumps(definition)) | |||
mock_get.return_value.status_code = 500 | |||
self.channel.notify(self.check) | |||
@ -74,40 +94,58 @@ class NotifyTestCase(BaseTestCase): | |||
self.assertEqual(n.error, "Received status code 500") | |||
@patch("hc.api.transports.requests.request") | |||
def test_webhooks_support_tags(self, mock_get): | |||
template = "http://host/$TAGS" | |||
self._setup_data("webhook", template) | |||
def test_webhooks_support_variables(self, mock_get): | |||
definition = { | |||
"method_down": "GET", | |||
"url_down": "http://host/$CODE/$STATUS/$TAG1/$TAG2/?name=$NAME", | |||
"body_down": "", | |||
"headers_down": {}, | |||
} | |||
self._setup_data("webhook", json.dumps(definition)) | |||
self.check.name = "Hello World" | |||
self.check.tags = "foo bar" | |||
self.check.save() | |||
self.channel.notify(self.check) | |||
url = "http://host/%s/down/foo/bar/?name=Hello%%20World" % self.check.code | |||
args, kwargs = mock_get.call_args | |||
self.assertEqual(args[0], "get") | |||
self.assertEqual(args[1], "http://host/foo%20bar") | |||
self.assertEqual(args[1], url) | |||
self.assertEqual(kwargs["headers"], {"User-Agent": "healthchecks.io"}) | |||
self.assertEqual(kwargs["timeout"], 5) | |||
@patch("hc.api.transports.requests.request") | |||
def test_webhooks_support_variables(self, mock_get): | |||
template = "http://host/$CODE/$STATUS/$TAG1/$TAG2/?name=$NAME" | |||
self._setup_data("webhook", template) | |||
self.check.name = "Hello World" | |||
def test_webhooks_handle_variable_variables(self, mock_get): | |||
definition = { | |||
"method_down": "GET", | |||
"url_down": "http://host/$$NAMETAG1", | |||
"body_down": "", | |||
"headers_down": {}, | |||
} | |||
self._setup_data("webhook", json.dumps(definition)) | |||
self.check.tags = "foo bar" | |||
self.check.save() | |||
self.channel.notify(self.check) | |||
url = "http://host/%s/down/foo/bar/?name=Hello%%20World" % self.check.code | |||
# $$NAMETAG1 should *not* get transformed to "foo" | |||
args, kwargs = mock_get.call_args | |||
self.assertEqual(args[0], "get") | |||
self.assertEqual(args[1], url) | |||
self.assertEqual(kwargs["headers"], {"User-Agent": "healthchecks.io"}) | |||
self.assertEqual(kwargs["timeout"], 5) | |||
self.assertEqual(args[1], "http://host/$TAG1") | |||
@patch("hc.api.transports.requests.request") | |||
def test_webhooks_support_post(self, mock_request): | |||
template = "http://example.com\n\nThe Time Is $NOW" | |||
self._setup_data("webhook", template) | |||
definition = { | |||
"method_down": "POST", | |||
"url_down": "http://example.com", | |||
"body_down": "The Time Is $NOW", | |||
"headers_down": {}, | |||
} | |||
self._setup_data("webhook", json.dumps(definition)) | |||
self.check.save() | |||
self.channel.notify(self.check) | |||
@ -123,9 +161,14 @@ class NotifyTestCase(BaseTestCase): | |||
def test_webhooks_dollarsign_escaping(self, mock_get): | |||
# If name or tag contains what looks like a variable reference, | |||
# that should be left alone: | |||
definition = { | |||
"method_down": "GET", | |||
"url_down": "http://host/$NAME", | |||
"body_down": "", | |||
"headers_down": {}, | |||
} | |||
template = "http://host/$NAME" | |||
self._setup_data("webhook", template) | |||
self._setup_data("webhook", json.dumps(definition)) | |||
self.check.name = "$TAG1" | |||
self.check.tags = "foo" | |||
self.check.save() | |||
@ -138,8 +181,14 @@ class NotifyTestCase(BaseTestCase): | |||
) | |||
@patch("hc.api.transports.requests.request") | |||
def test_webhook_fires_on_up_event(self, mock_get): | |||
self._setup_data("webhook", "http://foo\nhttp://bar", status="up") | |||
def test_webhooks_handle_up_events(self, mock_get): | |||
definition = { | |||
"method_up": "GET", | |||
"url_up": "http://bar", | |||
"body_up": "", | |||
"headers_up": {}, | |||
} | |||
self._setup_data("webhook", json.dumps(definition), status="up") | |||
self.channel.notify(self.check) | |||
@ -148,47 +197,37 @@ class NotifyTestCase(BaseTestCase): | |||
) | |||
@patch("hc.api.transports.requests.request") | |||
def test_webhooks_handle_unicode_post_body(self, mock_request): | |||
template = "http://example.com\n\n(╯°□°)╯︵ ┻━┻" | |||
self._setup_data("webhook", template) | |||
self.check.save() | |||
def test_webhooks_handle_noop_up_events(self, mock_get): | |||
definition = { | |||
"method_up": "GET", | |||
"url_up": "", | |||
"body_up": "", | |||
"headers_up": {}, | |||
} | |||
self._setup_data("webhook", json.dumps(definition), status="up") | |||
self.channel.notify(self.check) | |||
args, kwargs = mock_request.call_args | |||
# unicode should be encoded into utf-8 | |||
self.assertIsInstance(kwargs["data"], bytes) | |||
self.assertFalse(mock_get.called) | |||
self.assertEqual(Notification.objects.count(), 0) | |||
@patch("hc.api.transports.requests.request") | |||
def test_webhooks_handle_json_value(self, mock_request): | |||
def test_webhooks_handle_unicode_post_body(self, mock_request): | |||
definition = { | |||
"method_down": "GET", | |||
"method_down": "POST", | |||
"url_down": "http://foo.com", | |||
"body_down": "", | |||
"body_down": "(╯°□°)╯︵ ┻━┻", | |||
"headers_down": {}, | |||
} | |||
self._setup_data("webhook", json.dumps(definition)) | |||
self.channel.notify(self.check) | |||
headers = {"User-Agent": "healthchecks.io"} | |||
mock_request.assert_called_with( | |||
"get", "http://foo.com", headers=headers, timeout=5 | |||
) | |||
@patch("hc.api.transports.requests.request") | |||
def test_webhooks_handle_json_up_event(self, mock_request): | |||
definition = { | |||
"method_up": "GET", | |||
"url_up": "http://bar", | |||
"body_up": "", | |||
"headers_up": {}, | |||
} | |||
self._setup_data("webhook", json.dumps(definition)) | |||
self.check.save() | |||
self._setup_data("webhook", json.dumps(definition), status="up") | |||
self.channel.notify(self.check) | |||
args, kwargs = mock_request.call_args | |||
headers = {"User-Agent": "healthchecks.io"} | |||
mock_request.assert_called_with("get", "http://bar", headers=headers, timeout=5) | |||
# unicode should be encoded into utf-8 | |||
self.assertIsInstance(kwargs["data"], bytes) | |||
@patch("hc.api.transports.requests.request") | |||
def test_webhooks_handle_post_headers(self, mock_request): | |||
@ -275,6 +314,7 @@ class NotifyTestCase(BaseTestCase): | |||
self.assertEqual(email.to[0], "[email protected]") | |||
self.assertTrue("X-Bounce-Url" in email.extra_headers) | |||
self.assertTrue("List-Unsubscribe" in email.extra_headers) | |||
self.assertTrue("List-Unsubscribe-Post" in email.extra_headers) | |||
def test_email_transport_handles_json_value(self): | |||
payload = {"value": "[email protected]", "up": True, "down": True} | |||
@ -305,6 +345,14 @@ class NotifyTestCase(BaseTestCase): | |||
self.assertEqual(Notification.objects.count(), 0) | |||
self.assertEqual(len(mail.outbox), 0) | |||
def test_email_handles_amperstand(self): | |||
self._setup_data("email", "[email protected]") | |||
self.check.name = "Foo & Bar" | |||
self.channel.notify(self.check) | |||
email = mail.outbox[0] | |||
self.assertEqual(email.subject, "DOWN | Foo & Bar") | |||
@patch("hc.api.transports.requests.request") | |||
def test_pd(self, mock_post): | |||
self._setup_data("pd", "123") | |||
@ -421,7 +469,7 @@ class NotifyTestCase(BaseTestCase): | |||
self.assertEqual(Notification.objects.count(), 0) | |||
@patch("hc.api.transports.requests.request") | |||
def test_opsgenie(self, mock_post): | |||
def test_opsgenie_with_legacy_value(self, mock_post): | |||
self._setup_data("opsgenie", "123") | |||
mock_post.return_value.status_code = 202 | |||
@ -431,6 +479,7 @@ class NotifyTestCase(BaseTestCase): | |||
self.assertEqual(mock_post.call_count, 1) | |||
args, kwargs = mock_post.call_args | |||
self.assertIn("api.opsgenie.com", args[1]) | |||
payload = kwargs["json"] | |||
self.assertIn("DOWN", payload["message"]) | |||
@ -448,6 +497,29 @@ class NotifyTestCase(BaseTestCase): | |||
method, url = args | |||
self.assertTrue(str(self.check.code) in url) | |||
@patch("hc.api.transports.requests.request") | |||
def test_opsgenie_with_json_value(self, mock_post): | |||
self._setup_data("opsgenie", json.dumps({"key": "456", "region": "eu"})) | |||
mock_post.return_value.status_code = 202 | |||
self.channel.notify(self.check) | |||
n = Notification.objects.first() | |||
self.assertEqual(n.error, "") | |||
self.assertEqual(mock_post.call_count, 1) | |||
args, kwargs = mock_post.call_args | |||
self.assertIn("api.eu.opsgenie.com", args[1]) | |||
@patch("hc.api.transports.requests.request") | |||
def test_opsgenie_returns_error(self, mock_post): | |||
self._setup_data("opsgenie", "123") | |||
mock_post.return_value.status_code = 403 | |||
mock_post.return_value.json.return_value = {"message": "Nice try"} | |||
self.channel.notify(self.check) | |||
n = Notification.objects.first() | |||
self.assertEqual(n.error, 'Received status code 403 with a message: "Nice try"') | |||
@patch("hc.api.transports.requests.request") | |||
def test_pushover(self, mock_post): | |||
self._setup_data("po", "123|0") | |||
@ -528,6 +600,25 @@ class NotifyTestCase(BaseTestCase): | |||
self.assertEqual(payload["chat_id"], 123) | |||
self.assertTrue("The check" in payload["text"]) | |||
@patch("hc.api.transports.requests.request") | |||
def test_telegram_returns_error(self, mock_post): | |||
self._setup_data("telegram", json.dumps({"id": 123})) | |||
mock_post.return_value.status_code = 400 | |||
mock_post.return_value.json.return_value = {"description": "Hi"} | |||
self.channel.notify(self.check) | |||
n = Notification.objects.first() | |||
self.assertEqual(n.error, 'Received status code 400 with a message: "Hi"') | |||
def test_telegram_obeys_rate_limit(self): | |||
self._setup_data("telegram", json.dumps({"id": 123})) | |||
TokenBucket.objects.create(value="tg-123", tokens=0) | |||
self.channel.notify(self.check) | |||
n = Notification.objects.first() | |||
self.assertEqual(n.error, "Rate limit exceeded") | |||
@patch("hc.api.transports.requests.request") | |||
def test_sms(self, mock_post): | |||
self._setup_data("sms", "+1234567890") | |||
@ -577,6 +668,13 @@ class NotifyTestCase(BaseTestCase): | |||
n = Notification.objects.get() | |||
self.assertTrue("Monthly SMS limit exceeded" in n.error) | |||
# And email should have been sent | |||
self.assertEqual(len(mail.outbox), 1) | |||
email = mail.outbox[0] | |||
self.assertEqual(email.to[0], "[email protected]") | |||
self.assertEqual(email.subject, "Monthly SMS Limit Reached") | |||
@patch("hc.api.transports.requests.request") | |||
def test_sms_limit_reset(self, mock_post): | |||
# At limit, but also into a new month | |||
@ -638,6 +736,13 @@ class NotifyTestCase(BaseTestCase): | |||
n = Notification.objects.get() | |||
self.assertTrue("Monthly message limit exceeded" in n.error) | |||
# And email should have been sent | |||
self.assertEqual(len(mail.outbox), 1) | |||
email = mail.outbox[0] | |||
self.assertEqual(email.to[0], "[email protected]") | |||
self.assertEqual(email.subject, "Monthly WhatsApp Limit Reached") | |||
@patch("apprise.Apprise") | |||
@override_settings(APPRISE_ENABLED=True) | |||
def test_apprise_enabled(self, mock_apprise): | |||
@ -671,3 +776,97 @@ class NotifyTestCase(BaseTestCase): | |||
with self.assertRaises(NotImplementedError): | |||
self.channel.notify(self.check) | |||
@patch("hc.api.transports.requests.request") | |||
def test_msteams(self, mock_post): | |||
self._setup_data("msteams", "http://example.com/webhook") | |||
mock_post.return_value.status_code = 200 | |||
self.channel.notify(self.check) | |||
assert Notification.objects.count() == 1 | |||
args, kwargs = mock_post.call_args | |||
payload = kwargs["json"] | |||
self.assertEqual(payload["@type"], "MessageCard") | |||
@patch("hc.api.transports.os.system") | |||
@override_settings(SHELL_ENABLED=True) | |||
def test_shell(self, mock_system): | |||
definition = {"cmd_down": "logger hello", "cmd_up": ""} | |||
self._setup_data("shell", json.dumps(definition)) | |||
mock_system.return_value = 0 | |||
self.channel.notify(self.check) | |||
mock_system.assert_called_with("logger hello") | |||
@patch("hc.api.transports.os.system") | |||
@override_settings(SHELL_ENABLED=True) | |||
def test_shell_handles_nonzero_exit_code(self, mock_system): | |||
definition = {"cmd_down": "logger hello", "cmd_up": ""} | |||
self._setup_data("shell", json.dumps(definition)) | |||
mock_system.return_value = 123 | |||
self.channel.notify(self.check) | |||
n = Notification.objects.get() | |||
self.assertEqual(n.error, "Command returned exit code 123") | |||
@patch("hc.api.transports.os.system") | |||
@override_settings(SHELL_ENABLED=True) | |||
def test_shell_supports_variables(self, mock_system): | |||
definition = {"cmd_down": "logger $NAME is $STATUS ($TAG1)", "cmd_up": ""} | |||
self._setup_data("shell", json.dumps(definition)) | |||
mock_system.return_value = 0 | |||
self.check.name = "Database" | |||
self.check.tags = "foo bar" | |||
self.check.save() | |||
self.channel.notify(self.check) | |||
mock_system.assert_called_with("logger Database is down (foo)") | |||
@patch("hc.api.transports.os.system") | |||
@override_settings(SHELL_ENABLED=False) | |||
def test_shell_disabled(self, mock_system): | |||
definition = {"cmd_down": "logger hello", "cmd_up": ""} | |||
self._setup_data("shell", json.dumps(definition)) | |||
self.channel.notify(self.check) | |||
self.assertFalse(mock_system.called) | |||
n = Notification.objects.get() | |||
self.assertEqual(n.error, "Shell commands are not enabled") | |||
@patch("hc.api.transports.requests.request") | |||
def test_zulip(self, mock_post): | |||
definition = { | |||
"bot_email": "[email protected]", | |||
"api_key": "fake-key", | |||
"mtype": "stream", | |||
"to": "general", | |||
} | |||
self._setup_data("zulip", json.dumps(definition)) | |||
mock_post.return_value.status_code = 200 | |||
self.channel.notify(self.check) | |||
assert Notification.objects.count() == 1 | |||
args, kwargs = mock_post.call_args | |||
payload = kwargs["data"] | |||
self.assertIn("DOWN", payload["topic"]) | |||
@patch("hc.api.transports.requests.request") | |||
def test_zulip_returns_error(self, mock_post): | |||
definition = { | |||
"bot_email": "[email protected]", | |||
"api_key": "fake-key", | |||
"mtype": "stream", | |||
"to": "general", | |||
} | |||
self._setup_data("zulip", json.dumps(definition)) | |||
mock_post.return_value.status_code = 403 | |||
mock_post.return_value.json.return_value = {"msg": "Nice try"} | |||
self.channel.notify(self.check) | |||
n = Notification.objects.first() | |||
self.assertEqual(n.error, 'Received status code 403 with a message: "Nice try"') |
@ -1,3 +1 @@ | |||
from django.contrib import admin | |||
# Register your models here. | |||
# The front app has no models. |
@ -0,0 +1,18 @@ | |||
from functools import wraps | |||
from django.conf import settings | |||
from django.http import HttpResponse | |||
def require_setting(key): | |||
def decorator(f): | |||
@wraps(f) | |||
def wrapper(request, *args, **kwds): | |||
if not getattr(settings, key): | |||
return HttpResponse(status=404) | |||
return f(request, *args, **kwds) | |||
return wrapper | |||
return decorator |
@ -0,0 +1,39 @@ | |||
import os | |||
from django.conf import settings | |||
from django.core.management.base import BaseCommand | |||
class Command(BaseCommand): | |||
help = "Renders Markdown to HTML" | |||
def handle(self, *args, **options): | |||
try: | |||
import markdown | |||
# We use pygments for highlighting code samples | |||
import pygments | |||
except ImportError as e: | |||
self.stdout.write(f"This command requires the {e.name} package.") | |||
self.stdout.write("Please install it with:\n\n") | |||
self.stdout.write(f" pip install {e.name}\n\n") | |||
return | |||
extensions = ["fenced_code", "codehilite", "tables", "def_list", "attr_list"] | |||
ec = {"codehilite": {"css_class": "highlight"}} | |||
docs_path = os.path.join(settings.BASE_DIR, "templates/docs") | |||
for doc in os.listdir(docs_path): | |||
if not doc.endswith(".md"): | |||
continue | |||
print("Rendering %s" % doc) | |||
src_path = os.path.join(docs_path, doc) | |||
dst_path = os.path.join(docs_path, doc[:-3] + ".html") | |||
text = open(src_path, "r", encoding="utf-8").read() | |||
html = markdown.markdown(text, extensions=extensions, extension_configs=ec) | |||
with open(dst_path, "w", encoding="utf-8") as f: | |||
f.write(html) |
@ -1,3 +1 @@ | |||
from django.db import models | |||
# Create your models here. | |||
# The front app has no models. |
@ -5,17 +5,21 @@ from django.test.utils import override_settings | |||
@override_settings(APPRISE_ENABLED=True) | |||
class AddAppriseTestCase(BaseTestCase): | |||
def setUp(self): | |||
super(AddAppriseTestCase, self).setUp() | |||
self.url = "/projects/%s/add_apprise/" % self.project.code | |||
def test_instructions_work(self): | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.get("/integrations/add_apprise/") | |||
r = self.client.get(self.url) | |||
self.assertContains(r, "Integration Settings", status_code=200) | |||
def test_it_works(self): | |||
form = {"url": "json://example.org"} | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.post("/integrations/add_apprise/", form) | |||
self.assertRedirects(r, "/integrations/") | |||
r = self.client.post(self.url, form) | |||
self.assertRedirects(r, self.channels_url) | |||
c = Channel.objects.get() | |||
self.assertEqual(c.kind, "apprise") | |||
@ -25,5 +29,5 @@ class AddAppriseTestCase(BaseTestCase): | |||
@override_settings(APPRISE_ENABLED=False) | |||
def test_it_requires_client_id(self): | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.get("/integrations/add_apprise/") | |||
r = self.client.get(self.url) | |||
self.assertEqual(r.status_code, 404) |
@ -19,19 +19,6 @@ class AddCheckTestCase(BaseTestCase): | |||
redirect_url = "/checks/%s/details/?new" % check.code | |||
self.assertRedirects(r, redirect_url) | |||
def test_it_handles_unset_current_project(self): | |||
self.profile.current_project = None | |||
self.profile.save() | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.post(self.url) | |||
check = Check.objects.get() | |||
self.assertEqual(check.project, self.project) | |||
redirect_url = "/checks/%s/details/?new" % check.code | |||
self.assertRedirects(r, redirect_url) | |||
def test_team_access_works(self): | |||
self.client.login(username="[email protected]", password="password") | |||
self.client.post(self.url) | |||
@ -1,14 +1,12 @@ | |||
import json | |||
from django.test.utils import override_settings | |||
from hc.api.models import Channel | |||
from hc.test import BaseTestCase | |||
from mock import patch | |||
@override_settings(DISCORD_CLIENT_ID="t1", DISCORD_CLIENT_SECRET="s1") | |||
class AddDiscordTestCase(BaseTestCase): | |||
url = "/integrations/add_discord/" | |||
def setUp(self): | |||
super(AddDiscordTestCase, self).setUp() | |||
self.url = "/projects/%s/add_discord/" % self.project.code | |||
def test_instructions_work(self): | |||
self.client.login(username="[email protected]", password="password") | |||
@ -17,49 +15,10 @@ class AddDiscordTestCase(BaseTestCase): | |||
self.assertContains(r, "discordapp.com/api/oauth2/authorize") | |||
# There should now be a key in session | |||
self.assertTrue("discord" in self.client.session) | |||
self.assertTrue("add_discord" in self.client.session) | |||
@override_settings(DISCORD_CLIENT_ID=None) | |||
def test_it_requires_client_id(self): | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.get(self.url) | |||
self.assertEqual(r.status_code, 404) | |||
@patch("hc.front.views.requests.post") | |||
def test_it_handles_oauth_response(self, mock_post): | |||
session = self.client.session | |||
session["discord"] = "foo" | |||
session.save() | |||
oauth_response = { | |||
"access_token": "test-token", | |||
"webhook": {"url": "foo", "id": "bar"}, | |||
} | |||
mock_post.return_value.text = json.dumps(oauth_response) | |||
mock_post.return_value.json.return_value = oauth_response | |||
url = self.url + "?code=12345678&state=foo" | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.get(url, follow=True) | |||
self.assertRedirects(r, "/integrations/") | |||
self.assertContains(r, "The Discord integration has been added!") | |||
ch = Channel.objects.get() | |||
self.assertEqual(ch.discord_webhook_url, "foo") | |||
self.assertEqual(ch.project, self.project) | |||
# Session should now be clean | |||
self.assertFalse("discord" in self.client.session) | |||
def test_it_avoids_csrf(self): | |||
session = self.client.session | |||
session["discord"] = "foo" | |||
session.save() | |||
url = self.url + "?code=12345678&state=bar" | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.get(url) | |||
self.assertEqual(r.status_code, 400) |
@ -0,0 +1,76 @@ | |||
import json | |||
from unittest.mock import patch | |||
from django.test.utils import override_settings | |||
from hc.api.models import Channel | |||
from hc.test import BaseTestCase | |||
@override_settings(DISCORD_CLIENT_ID="t1", DISCORD_CLIENT_SECRET="s1") | |||
class AddDiscordCompleteTestCase(BaseTestCase): | |||
url = "/integrations/add_discord/" | |||
@patch("hc.front.views.requests.post") | |||
def test_it_handles_oauth_response(self, mock_post): | |||
session = self.client.session | |||
session["add_discord"] = ("foo", str(self.project.code)) | |||
session.save() | |||
oauth_response = { | |||
"access_token": "test-token", | |||
"webhook": {"url": "foo", "id": "bar"}, | |||
} | |||
mock_post.return_value.text = json.dumps(oauth_response) | |||
mock_post.return_value.json.return_value = oauth_response | |||
url = self.url + "?code=12345678&state=foo" | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.get(url, follow=True) | |||
self.assertRedirects(r, self.channels_url) | |||
self.assertContains(r, "The Discord integration has been added!") | |||
ch = Channel.objects.get() | |||
self.assertEqual(ch.discord_webhook_url, "foo") | |||
self.assertEqual(ch.project, self.project) | |||
# Session should now be clean | |||
self.assertFalse("add_discord" in self.client.session) | |||
def test_it_avoids_csrf(self): | |||
session = self.client.session | |||
session["add_discord"] = ("foo", str(self.project.code)) | |||
session.save() | |||
url = self.url + "?code=12345678&state=bar" | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.get(url) | |||
self.assertEqual(r.status_code, 403) | |||
# Session should now be clean | |||
self.assertFalse("add_discord" in self.client.session) | |||
def test_it_handles_access_denied(self): | |||
session = self.client.session | |||
session["add_discord"] = ("foo", str(self.project.code)) | |||
session.save() | |||
url = self.url + "?error=access_denied" | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.get(url, follow=True) | |||
self.assertRedirects(r, self.channels_url) | |||
self.assertContains(r, "Discord setup was cancelled.") | |||
self.assertEqual(Channel.objects.count(), 0) | |||
# Session should now be clean | |||
self.assertFalse("add_discord" in self.client.session) | |||
@override_settings(DISCORD_CLIENT_ID=None) | |||
def test_it_requires_client_id(self): | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.get(self.url + "?code=12345678&state=bar") | |||
self.assertEqual(r.status_code, 404) |
@ -8,7 +8,9 @@ from hc.test import BaseTestCase | |||
class AddEmailTestCase(BaseTestCase): | |||
url = "/integrations/add_email/" | |||
def setUp(self): | |||
super(AddEmailTestCase, self).setUp() | |||
self.url = "/projects/%s/add_email/" % self.project.code | |||
def test_instructions_work(self): | |||
self.client.login(username="[email protected]", password="password") | |||
@ -17,11 +19,11 @@ class AddEmailTestCase(BaseTestCase): | |||
self.assertContains(r, "Requires confirmation") | |||
def test_it_creates_channel(self): | |||
form = {"value": "[email protected]"} | |||
form = {"value": "[email protected]", "down": "true", "up": "true"} | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.post(self.url, form) | |||
self.assertRedirects(r, "/integrations/") | |||
self.assertRedirects(r, self.channels_url) | |||
c = Channel.objects.get() | |||
doc = json.loads(c.value) | |||
@ -39,7 +41,7 @@ class AddEmailTestCase(BaseTestCase): | |||
self.assertEqual(email.to[0], "[email protected]") | |||
def test_team_access_works(self): | |||
form = {"value": "[email protected]"} | |||
form = {"value": "[email protected]", "down": "true", "up": "true"} | |||
self.client.login(username="[email protected]", password="password") | |||
self.client.post(self.url, form) | |||
@ -49,14 +51,14 @@ class AddEmailTestCase(BaseTestCase): | |||
self.assertEqual(ch.project, self.project) | |||
def test_it_rejects_bad_email(self): | |||
form = {"value": "not an email address"} | |||
form = {"value": "not an email address", "down": "true", "up": "true"} | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.post(self.url, form) | |||
self.assertContains(r, "Enter a valid email address.") | |||
def test_it_trims_whitespace(self): | |||
form = {"value": " [email protected] "} | |||
form = {"value": " [email protected] ", "down": "true", "up": "true"} | |||
self.client.login(username="[email protected]", password="password") | |||
self.client.post(self.url, form) | |||
@ -73,11 +75,11 @@ class AddEmailTestCase(BaseTestCase): | |||
@override_settings(EMAIL_USE_VERIFICATION=False) | |||
def test_it_auto_verifies_email(self): | |||
form = {"value": "[email protected]"} | |||
form = {"value": "[email protected]", "down": "true", "up": "true"} | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.post(self.url, form) | |||
self.assertRedirects(r, "/integrations/") | |||
self.assertRedirects(r, self.channels_url) | |||
c = Channel.objects.get() | |||
doc = json.loads(c.value) | |||
@ -89,11 +91,11 @@ class AddEmailTestCase(BaseTestCase): | |||
self.assertEqual(len(mail.outbox), 0) | |||
def test_it_auto_verifies_own_email(self): | |||
form = {"value": "[email protected]"} | |||
form = {"value": "[email protected]", "down": "true", "up": "true"} | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.post(self.url, form) | |||
self.assertRedirects(r, "/integrations/") | |||
self.assertRedirects(r, self.channels_url) | |||
c = Channel.objects.get() | |||
doc = json.loads(c.value) | |||
@ -103,3 +105,10 @@ class AddEmailTestCase(BaseTestCase): | |||
# Email should *not* have been sent | |||
self.assertEqual(len(mail.outbox), 0) | |||
def test_it_rejects_unchecked_up_and_dwon(self): | |||
form = {"value": "[email protected]"} | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.post(self.url, form) | |||
self.assertContains(r, "Please select at least one.") |
@ -0,0 +1,39 @@ | |||
from unittest.mock import patch | |||
from django.test.utils import override_settings | |||
from hc.api.models import Channel | |||
from hc.test import BaseTestCase | |||
@override_settings(MATRIX_ACCESS_TOKEN="foo") | |||
@override_settings(MATRIX_HOMESERVER="fake-homeserver") | |||
class AddMatrixTestCase(BaseTestCase): | |||
def setUp(self): | |||
super(AddMatrixTestCase, self).setUp() | |||
self.url = "/projects/%s/add_matrix/" % self.project.code | |||
@override_settings(MATRIX_ACCESS_TOKEN="foo") | |||
def test_instructions_work(self): | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.get(self.url) | |||
self.assertContains(r, "Integration Settings", status_code=200) | |||
@patch("hc.front.forms.requests.post") | |||
def test_it_works(self, mock_post): | |||
mock_post.return_value.json.return_value = {"room_id": "fake-room-id"} | |||
form = {"alias": "!foo:example.org"} | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.post(self.url, form) | |||
self.assertRedirects(r, self.channels_url) | |||
c = Channel.objects.get() | |||
self.assertEqual(c.kind, "matrix") | |||
self.assertEqual(c.value, "fake-room-id") | |||
self.assertEqual(c.project, self.project) | |||
@override_settings(MATRIX_ACCESS_TOKEN=None) | |||
def test_it_requires_access_token(self): | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.get(self.url) | |||
self.assertEqual(r.status_code, 404) |
@ -1,21 +1,23 @@ | |||
from django.test.utils import override_settings | |||
from hc.api.models import Channel | |||
from hc.test import BaseTestCase | |||
class AddMattermostTestCase(BaseTestCase): | |||
def setUp(self): | |||
super(AddMattermostTestCase, self).setUp() | |||
self.url = "/projects/%s/add_mattermost/" % self.project.code | |||
def test_instructions_work(self): | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.get("/integrations/add_mattermost/") | |||
r = self.client.get(self.url) | |||
self.assertContains(r, "Integration Settings", status_code=200) | |||
def test_it_works(self): | |||
form = {"value": "http://example.org"} | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.post("/integrations/add_mattermost/", form) | |||
self.assertRedirects(r, "/integrations/") | |||
r = self.client.post(self.url, form) | |||
self.assertRedirects(r, self.channels_url) | |||
c = Channel.objects.get() | |||
self.assertEqual(c.kind, "mattermost") | |||
@ -0,0 +1,25 @@ | |||
from hc.api.models import Channel | |||
from hc.test import BaseTestCase | |||
class AddMsTeamsTestCase(BaseTestCase): | |||
def setUp(self): | |||
super(AddMsTeamsTestCase, self).setUp() | |||
self.url = "/projects/%s/add_msteams/" % self.project.code | |||
def test_instructions_work(self): | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.get(self.url) | |||
self.assertContains(r, "Integration Settings", status_code=200) | |||
def test_it_works(self): | |||
form = {"value": "https://example.com/foo"} | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.post(self.url, form) | |||
self.assertRedirects(r, self.channels_url) | |||
c = Channel.objects.get() | |||
self.assertEqual(c.kind, "msteams") | |||
self.assertEqual(c.value, "https://example.com/foo") | |||
self.assertEqual(c.project, self.project) |
@ -1,9 +1,13 @@ | |||
import json | |||
from hc.api.models import Channel | |||
from hc.test import BaseTestCase | |||
class AddOpsGenieTestCase(BaseTestCase): | |||
url = "/integrations/add_opsgenie/" | |||
def setUp(self): | |||
super(AddOpsGenieTestCase, self).setUp() | |||
self.url = "/projects/%s/add_opsgenie/" % self.project.code | |||
def test_instructions_work(self): | |||
self.client.login(username="[email protected]", password="password") | |||
@ -11,22 +15,36 @@ class AddOpsGenieTestCase(BaseTestCase): | |||
self.assertContains(r, "escalation policies and incident tracking") | |||
def test_it_works(self): | |||
form = {"value": "123456"} | |||
form = {"key": "123456", "region": "us"} | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.post(self.url, form) | |||
self.assertRedirects(r, "/integrations/") | |||
self.assertRedirects(r, self.channels_url) | |||
c = Channel.objects.get() | |||
self.assertEqual(c.kind, "opsgenie") | |||
self.assertEqual(c.value, "123456") | |||
payload = json.loads(c.value) | |||
self.assertEqual(payload["key"], "123456") | |||
self.assertEqual(payload["region"], "us") | |||
self.assertEqual(c.project, self.project) | |||
def test_it_trims_whitespace(self): | |||
form = {"value": " 123456 "} | |||
form = {"key": " 123456 ", "region": "us"} | |||
self.client.login(username="[email protected]", password="password") | |||
self.client.post(self.url, form) | |||
c = Channel.objects.get() | |||
payload = json.loads(c.value) | |||
self.assertEqual(payload["key"], "123456") | |||
def test_it_saves_eu_region(self): | |||
form = {"key": "123456", "region": "eu"} | |||
self.client.login(username="[email protected]", password="password") | |||
self.client.post(self.url, form) | |||
c = Channel.objects.get() | |||
self.assertEqual(c.value, "123456") | |||
payload = json.loads(c.value) | |||
self.assertEqual(payload["region"], "eu") |
@ -3,7 +3,9 @@ from hc.test import BaseTestCase | |||
class AddPagerTeamTestCase(BaseTestCase): | |||
url = "/integrations/add_pagerteam/" | |||
def setUp(self): | |||
super(AddPagerTeamTestCase, self).setUp() | |||
self.url = "/projects/%s/add_pagerteam/" % self.project.code | |||
def test_instructions_work(self): | |||
self.client.login(username="[email protected]", password="password") | |||
@ -15,7 +17,7 @@ class AddPagerTeamTestCase(BaseTestCase): | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.post(self.url, form) | |||
self.assertRedirects(r, "/integrations/") | |||
self.assertRedirects(r, self.channels_url) | |||
c = Channel.objects.get() | |||
self.assertEqual(c.kind, "pagerteam") | |||
@ -3,7 +3,9 @@ from hc.test import BaseTestCase | |||
class AddPagerTreeTestCase(BaseTestCase): | |||
url = "/integrations/add_pagertree/" | |||
def setUp(self): | |||
super(AddPagerTreeTestCase, self).setUp() | |||
self.url = "/projects/%s/add_pagertree/" % self.project.code | |||
def test_instructions_work(self): | |||
self.client.login(username="[email protected]", password="password") | |||
@ -15,7 +17,7 @@ class AddPagerTreeTestCase(BaseTestCase): | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.post(self.url, form) | |||
self.assertRedirects(r, "/integrations/") | |||
self.assertRedirects(r, self.channels_url) | |||
c = Channel.objects.get() | |||
self.assertEqual(c.kind, "pagertree") | |||
@ -1,43 +1,34 @@ | |||
from django.test.utils import override_settings | |||
from hc.api.models import Channel | |||
from hc.test import BaseTestCase | |||
@override_settings(PD_VENDOR_KEY="foo") | |||
class AddPdTestCase(BaseTestCase): | |||
url = "/integrations/add_pd/" | |||
def setUp(self): | |||
super(AddPdTestCase, self).setUp() | |||
self.url = "/projects/%s/add_pd/" % self.project.code | |||
def test_instructions_work(self): | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.get(self.url) | |||
self.assertContains(r, "If your team uses") | |||
self.assertContains(r, "Paste the Integration Key down below") | |||
def test_it_works(self): | |||
session = self.client.session | |||
session["pd"] = "1234567890AB" # 12 characters | |||
session.save() | |||
# Integration key is 32 characters long | |||
form = {"value": "12345678901234567890123456789012"} | |||
self.client.login(username="[email protected]", password="password") | |||
url = "/integrations/add_pd/1234567890AB/?service_key=123" | |||
r = self.client.get(url) | |||
self.assertEqual(r.status_code, 302) | |||
r = self.client.post(self.url, form) | |||
self.assertRedirects(r, self.channels_url) | |||
c = Channel.objects.get() | |||
self.assertEqual(c.kind, "pd") | |||
self.assertEqual(c.pd_service_key, "123") | |||
self.assertEqual(c.project, self.project) | |||
self.assertEqual(c.value, "12345678901234567890123456789012") | |||
def test_it_validates_code(self): | |||
session = self.client.session | |||
session["pd"] = "1234567890AB" # 12 characters | |||
session.save() | |||
def test_it_trims_whitespace(self): | |||
form = {"value": " 123456 "} | |||
self.client.login(username="[email protected]", password="password") | |||
url = "/integrations/add_pd/XXXXXXXXXXXX/?service_key=123" | |||
r = self.client.get(url) | |||
self.assertEqual(r.status_code, 400) | |||
@override_settings(PD_VENDOR_KEY=None) | |||
def test_it_requires_vendor_key(self): | |||
r = self.client.get("/integrations/add_pd/") | |||
self.assertEqual(r.status_code, 404) | |||
self.client.post(self.url, form) | |||
c = Channel.objects.get() | |||
self.assertEqual(c.value, "123456") |
@ -0,0 +1,32 @@ | |||
from django.test.utils import override_settings | |||
from hc.api.models import Channel | |||
from hc.test import BaseTestCase | |||
@override_settings(PD_VENDOR_KEY="foo") | |||
class AddPdConnectTestCase(BaseTestCase): | |||
def setUp(self): | |||
super(AddPdConnectTestCase, self).setUp() | |||
self.url = "/projects/%s/add_pdc/" % self.project.code | |||
def test_it_works(self): | |||
session = self.client.session | |||
session["pd"] = "1234567890AB" # 12 characters | |||
session.save() | |||
self.client.login(username="[email protected]", password="password") | |||
url = self.url + "1234567890AB/?service_key=123" | |||
r = self.client.get(url, follow=True) | |||
self.assertRedirects(r, self.channels_url) | |||
c = Channel.objects.get() | |||
self.assertEqual(c.kind, "pd") | |||
self.assertEqual(c.pd_service_key, "123") | |||
self.assertEqual(c.project, self.project) | |||
@override_settings(PD_VENDOR_KEY=None) | |||
def test_it_requires_vendor_key(self): | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.get(self.url) | |||
self.assertEqual(r.status_code, 404) |
@ -0,0 +1,26 @@ | |||
from django.test.utils import override_settings | |||
from hc.test import BaseTestCase | |||
@override_settings(PD_VENDOR_KEY="foo") | |||
class AddPdcCompleteTestCase(BaseTestCase): | |||
def setUp(self): | |||
super(AddPdcCompleteTestCase, self).setUp() | |||
self.url = "/projects/%s/add_pdc/" % self.project.code | |||
self.url += "XXXXXXXXXXXX/?service_key=123" | |||
def test_it_validates_code(self): | |||
session = self.client.session | |||
session["pd"] = "1234567890AB" | |||
session.save() | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.get(self.url) | |||
self.assertEqual(r.status_code, 400) | |||
@override_settings(PD_VENDOR_KEY=None) | |||
def test_it_requires_vendor_key(self): | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.get(self.url) | |||
self.assertEqual(r.status_code, 404) |
@ -0,0 +1,21 @@ | |||
from django.test.utils import override_settings | |||
from hc.test import BaseTestCase | |||
@override_settings(PD_VENDOR_KEY="foo") | |||
class AddPdcHelpTestCase(BaseTestCase): | |||
url = "/integrations/add_pdc/" | |||
def test_instructions_work_when_not_logged_in(self): | |||
r = self.client.get(self.url) | |||
self.assertContains(r, "Before adding PagerDuty integration, please log") | |||
def test_instructions_work(self): | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.get(self.url) | |||
self.assertContains(r, "If your team uses") | |||
@override_settings(PD_VENDOR_KEY=None) | |||
def test_it_requires_vendor_key(self): | |||
r = self.client.get(self.url) | |||
self.assertEqual(r.status_code, 404) |
@ -1,14 +1,12 @@ | |||
import json | |||
from django.test.utils import override_settings | |||
from hc.api.models import Channel | |||
from hc.test import BaseTestCase | |||
from mock import patch | |||
@override_settings(PUSHBULLET_CLIENT_ID="t1", PUSHBULLET_CLIENT_SECRET="s1") | |||
class AddPushbulletTestCase(BaseTestCase): | |||
url = "/integrations/add_pushbullet/" | |||
def setUp(self): | |||
super(AddPushbulletTestCase, self).setUp() | |||
self.url = "/projects/%s/add_pushbullet/" % self.project.code | |||
def test_instructions_work(self): | |||
self.client.login(username="[email protected]", password="password") | |||
@ -17,46 +15,10 @@ class AddPushbulletTestCase(BaseTestCase): | |||
self.assertContains(r, "Connect Pushbullet") | |||
# There should now be a key in session | |||
self.assertTrue("pushbullet" in self.client.session) | |||
self.assertTrue("add_pushbullet" in self.client.session) | |||
@override_settings(PUSHBULLET_CLIENT_ID=None) | |||
def test_it_requires_client_id(self): | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.get(self.url) | |||
self.assertEqual(r.status_code, 404) | |||
@patch("hc.front.views.requests.post") | |||
def test_it_handles_oauth_response(self, mock_post): | |||
session = self.client.session | |||
session["pushbullet"] = "foo" | |||
session.save() | |||
oauth_response = {"access_token": "test-token"} | |||
mock_post.return_value.text = json.dumps(oauth_response) | |||
mock_post.return_value.json.return_value = oauth_response | |||
url = self.url + "?code=12345678&state=foo" | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.get(url, follow=True) | |||
self.assertRedirects(r, "/integrations/") | |||
self.assertContains(r, "The Pushbullet integration has been added!") | |||
ch = Channel.objects.get() | |||
self.assertEqual(ch.value, "test-token") | |||
self.assertEqual(ch.project, self.project) | |||
# Session should now be clean | |||
self.assertFalse("pushbullet" in self.client.session) | |||
def test_it_avoids_csrf(self): | |||
session = self.client.session | |||
session["pushbullet"] = "foo" | |||
session.save() | |||
url = self.url + "?code=12345678&state=bar" | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.get(url) | |||
self.assertEqual(r.status_code, 400) |
@ -0,0 +1,71 @@ | |||
import json | |||
from unittest.mock import patch | |||
from django.test.utils import override_settings | |||
from hc.api.models import Channel | |||
from hc.test import BaseTestCase | |||
@override_settings(PUSHBULLET_CLIENT_ID="t1", PUSHBULLET_CLIENT_SECRET="s1") | |||
class AddPushbulletTestCase(BaseTestCase): | |||
url = "/integrations/add_pushbullet/" | |||
@patch("hc.front.views.requests.post") | |||
def test_it_handles_oauth_response(self, mock_post): | |||
session = self.client.session | |||
session["add_pushbullet"] = ("foo", str(self.project.code)) | |||
session.save() | |||
oauth_response = {"access_token": "test-token"} | |||
mock_post.return_value.text = json.dumps(oauth_response) | |||
mock_post.return_value.json.return_value = oauth_response | |||
url = self.url + "?code=12345678&state=foo&project=%s" % self.project.code | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.get(url, follow=True) | |||
self.assertRedirects(r, self.channels_url) | |||
self.assertContains(r, "The Pushbullet integration has been added!") | |||
ch = Channel.objects.get() | |||
self.assertEqual(ch.value, "test-token") | |||
self.assertEqual(ch.project, self.project) | |||
# Session should now be clean | |||
self.assertFalse("add_pushbullet" in self.client.session) | |||
def test_it_avoids_csrf(self): | |||
session = self.client.session | |||
session["pushbullet"] = ("foo", str(self.project.code)) | |||
session.save() | |||
url = self.url + "?code=12345678&state=bar&project=%s" % self.project.code | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.get(url) | |||
self.assertEqual(r.status_code, 403) | |||
@patch("hc.front.views.requests.post") | |||
def test_it_handles_denial(self, mock_post): | |||
session = self.client.session | |||
session["add_pushbullet"] = ("foo", str(self.project.code)) | |||
session.save() | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.get(self.url + "?error=access_denied", follow=True) | |||
self.assertRedirects(r, self.channels_url) | |||
self.assertContains(r, "Pushbullet setup was cancelled") | |||
self.assertEqual(Channel.objects.count(), 0) | |||
# Session should now be clean | |||
self.assertFalse("add_pushbullet" in self.client.session) | |||
@override_settings(PUSHBULLET_CLIENT_ID=None) | |||
def test_it_requires_client_id(self): | |||
url = self.url + "?code=12345678&state=bar&project=%s" % self.project.code | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.get(url) | |||
self.assertEqual(r.status_code, 404) |
@ -7,32 +7,30 @@ from hc.test import BaseTestCase | |||
PUSHOVER_API_TOKEN="token", PUSHOVER_SUBSCRIPTION_URL="http://example.org" | |||
) | |||
class AddPushoverTestCase(BaseTestCase): | |||
def setUp(self): | |||
super(AddPushoverTestCase, self).setUp() | |||
self.url = "/projects/%s/add_pushover/" % self.project.code | |||
@override_settings(PUSHOVER_API_TOKEN=None) | |||
def test_it_requires_api_token(self): | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.get("/integrations/add_pushover/") | |||
r = self.client.get(self.url) | |||
self.assertEqual(r.status_code, 404) | |||
def test_instructions_work_without_login(self): | |||
r = self.client.get("/integrations/add_pushover/") | |||
self.assertContains(r, "Setup Guide") | |||
def test_it_shows_form(self): | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.get("/integrations/add_pushover/") | |||
r = self.client.get(self.url) | |||
self.assertContains(r, "Subscribe with Pushover") | |||
def test_post_redirects(self): | |||
self.client.login(username="[email protected]", password="password") | |||
payload = {"po_priority": 2} | |||
r = self.client.post("/integrations/add_pushover/", form=payload) | |||
r = self.client.post(self.url, form=payload) | |||
self.assertEqual(r.status_code, 302) | |||
def test_post_requires_authenticated_user(self): | |||
payload = {"po_priority": 2} | |||
r = self.client.post("/integrations/add_pushover/", form=payload) | |||
self.assertEqual(r.status_code, 200) | |||
self.assertContains(r, "Setup Guide") | |||
def test_it_requires_authenticated_user(self): | |||
r = self.client.get(self.url) | |||
self.assertRedirects(r, "/accounts/login/?next=" + self.url) | |||
def test_it_adds_channel(self): | |||
self.client.login(username="[email protected]", password="password") | |||
@ -41,9 +39,9 @@ class AddPushoverTestCase(BaseTestCase): | |||
session["pushover"] = "foo" | |||
session.save() | |||
params = "pushover_user_key=a&state=foo&prio=0&prio_up=-1" | |||
r = self.client.get("/integrations/add_pushover/?%s" % params) | |||
self.assertEqual(r.status_code, 302) | |||
params = "?pushover_user_key=a&state=foo&prio=0&prio_up=-1" | |||
r = self.client.get(self.url + params, follow=True) | |||
self.assertRedirects(r, self.channels_url) | |||
channel = Channel.objects.get() | |||
self.assertEqual(channel.value, "a|0|-1") | |||
@ -56,8 +54,8 @@ class AddPushoverTestCase(BaseTestCase): | |||
session["pushover"] = "foo" | |||
session.save() | |||
params = "pushover_user_key=a&state=foo&prio=abc" | |||
r = self.client.get("/integrations/add_pushover/?%s" % params) | |||
params = "?pushover_user_key=a&state=foo&prio=abc" | |||
r = self.client.get(self.url + params) | |||
self.assertEqual(r.status_code, 400) | |||
def test_it_validates_priority_up(self): | |||
@ -67,8 +65,8 @@ class AddPushoverTestCase(BaseTestCase): | |||
session["pushover"] = "foo" | |||
session.save() | |||
params = "pushover_user_key=a&state=foo&prio_up=abc" | |||
r = self.client.get("/integrations/add_pushover/?%s" % params) | |||
params = "?pushover_user_key=a&state=foo&prio_up=abc" | |||
r = self.client.get(self.url + params) | |||
self.assertEqual(r.status_code, 400) | |||
def test_it_validates_state(self): | |||
@ -78,6 +76,6 @@ class AddPushoverTestCase(BaseTestCase): | |||
session["pushover"] = "foo" | |||
session.save() | |||
params = "pushover_user_key=a&state=INVALID&prio=0" | |||
r = self.client.get("/integrations/add_pushover/?%s" % params) | |||
self.assertEqual(r.status_code, 400) | |||
params = "?pushover_user_key=a&state=INVALID&prio=0" | |||
r = self.client.get(self.url + params) | |||
self.assertEqual(r.status_code, 403) |
@ -0,0 +1,18 @@ | |||
from django.test.utils import override_settings | |||
from hc.test import BaseTestCase | |||
@override_settings( | |||
PUSHOVER_API_TOKEN="token", PUSHOVER_SUBSCRIPTION_URL="http://example.org" | |||
) | |||
class AddPushoverHelpTestCase(BaseTestCase): | |||
url = "/integrations/add_pushover/" | |||
@override_settings(PUSHOVER_API_TOKEN=None) | |||
def test_it_requires_api_token(self): | |||
r = self.client.get(self.url) | |||
self.assertEqual(r.status_code, 404) | |||
def test_instructions_work_without_login(self): | |||
r = self.client.get(self.url) | |||
self.assertContains(r, "Setup Guide") |
@ -0,0 +1,55 @@ | |||
from django.test.utils import override_settings | |||
from hc.api.models import Channel | |||
from hc.test import BaseTestCase | |||
@override_settings(SHELL_ENABLED=True) | |||
class AddShellTestCase(BaseTestCase): | |||
def setUp(self): | |||
super(AddShellTestCase, self).setUp() | |||
self.url = "/projects/%s/add_shell/" % self.project.code | |||
@override_settings(SHELL_ENABLED=False) | |||
def test_it_is_disabled_by_default(self): | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.get(self.url) | |||
self.assertEqual(r.status_code, 404) | |||
def test_instructions_work(self): | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.get(self.url) | |||
self.assertContains(r, "Executes a local shell command") | |||
def test_it_adds_two_commands_and_redirects(self): | |||
form = {"cmd_down": "logger down", "cmd_up": "logger up"} | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.post(self.url, form) | |||
self.assertRedirects(r, self.channels_url) | |||
c = Channel.objects.get() | |||
self.assertEqual(c.project, self.project) | |||
self.assertEqual(c.cmd_down, "logger down") | |||
self.assertEqual(c.cmd_up, "logger up") | |||
def test_it_adds_webhook_using_team_access(self): | |||
form = {"cmd_down": "logger down", "cmd_up": "logger up"} | |||
# Logging in as bob, not alice. Bob has team access so this | |||
# should work. | |||
self.client.login(username="[email protected]", password="password") | |||
self.client.post(self.url, form) | |||
c = Channel.objects.get() | |||
self.assertEqual(c.project, self.project) | |||
self.assertEqual(c.cmd_down, "logger down") | |||
def test_it_handles_empty_down_command(self): | |||
form = {"cmd_down": "", "cmd_up": "logger up"} | |||
self.client.login(username="[email protected]", password="password") | |||
self.client.post(self.url, form) | |||
c = Channel.objects.get() | |||
self.assertEqual(c.cmd_down, "") | |||
self.assertEqual(c.cmd_up, "logger up") |
@ -1,33 +1,32 @@ | |||
from django.test.utils import override_settings | |||
from hc.api.models import Channel | |||
from hc.test import BaseTestCase | |||
class AddSlackTestCase(BaseTestCase): | |||
@override_settings(SLACK_CLIENT_ID=None) | |||
def setUp(self): | |||
super(AddSlackTestCase, self).setUp() | |||
self.url = "/projects/%s/add_slack/" % self.project.code | |||
def test_instructions_work(self): | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.get("/integrations/add_slack/") | |||
r = self.client.get(self.url) | |||
self.assertContains(r, "Integration Settings", status_code=200) | |||
@override_settings(SLACK_CLIENT_ID=None) | |||
def test_it_works(self): | |||
form = {"value": "http://example.org"} | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.post("/integrations/add_slack/", form) | |||
self.assertRedirects(r, "/integrations/") | |||
r = self.client.post(self.url, form) | |||
self.assertRedirects(r, self.channels_url) | |||
c = Channel.objects.get() | |||
self.assertEqual(c.kind, "slack") | |||
self.assertEqual(c.value, "http://example.org") | |||
self.assertEqual(c.project, self.project) | |||
@override_settings(SLACK_CLIENT_ID=None) | |||
def test_it_rejects_bad_url(self): | |||
form = {"value": "not an URL"} | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.post("/integrations/add_slack/", form) | |||
r = self.client.post(self.url, form) | |||
self.assertContains(r, "Enter a valid URL") |
@ -1,84 +1,28 @@ | |||
import json | |||
from django.test.utils import override_settings | |||
from hc.api.models import Channel | |||
from hc.test import BaseTestCase | |||
from mock import patch | |||
@override_settings(SLACK_CLIENT_ID="fake-client-id") | |||
class AddSlackBtnTestCase(BaseTestCase): | |||
@override_settings(SLACK_CLIENT_ID="foo") | |||
def test_it_prepares_login_link(self): | |||
r = self.client.get("/integrations/add_slack/") | |||
self.assertContains(r, "Before adding Slack integration", status_code=200) | |||
def setUp(self): | |||
super(AddSlackBtnTestCase, self).setUp() | |||
self.url = "/projects/%s/add_slack_btn/" % self.project.code | |||
self.assertContains(r, "?next=/integrations/add_slack/") | |||
def test_instructions_work(self): | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.get(self.url) | |||
self.assertContains(r, "Setup Guide", status_code=200) | |||
@override_settings(SLACK_CLIENT_ID="foo") | |||
def test_slack_button(self): | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.get("/integrations/add_slack/") | |||
self.assertContains(r, "slack.com/oauth/authorize", status_code=200) | |||
r = self.client.get(self.url) | |||
self.assertContains(r, "slack.com/oauth/v2/authorize", status_code=200) | |||
# There should now be a key in session | |||
self.assertTrue("slack" in self.client.session) | |||
@patch("hc.front.views.requests.post") | |||
def test_it_handles_oauth_response(self, mock_post): | |||
session = self.client.session | |||
session["slack"] = "foo" | |||
session.save() | |||
oauth_response = { | |||
"ok": True, | |||
"team_name": "foo", | |||
"incoming_webhook": {"url": "http://example.org", "channel": "bar"}, | |||
} | |||
mock_post.return_value.text = json.dumps(oauth_response) | |||
mock_post.return_value.json.return_value = oauth_response | |||
url = "/integrations/add_slack_btn/?code=12345678&state=foo" | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.get(url, follow=True) | |||
self.assertRedirects(r, "/integrations/") | |||
self.assertContains(r, "The Slack integration has been added!") | |||
ch = Channel.objects.get() | |||
self.assertEqual(ch.slack_team, "foo") | |||
self.assertEqual(ch.slack_channel, "bar") | |||
self.assertEqual(ch.slack_webhook_url, "http://example.org") | |||
self.assertEqual(ch.project, self.project) | |||
# Session should now be clean | |||
self.assertFalse("slack" in self.client.session) | |||
def test_it_avoids_csrf(self): | |||
session = self.client.session | |||
session["slack"] = "foo" | |||
session.save() | |||
url = "/integrations/add_slack_btn/?code=12345678&state=bar" | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.get(url) | |||
self.assertEqual(r.status_code, 400) | |||
@patch("hc.front.views.requests.post") | |||
def test_it_handles_oauth_error(self, mock_post): | |||
session = self.client.session | |||
session["slack"] = "foo" | |||
session.save() | |||
oauth_response = {"ok": False, "error": "something went wrong"} | |||
mock_post.return_value.text = json.dumps(oauth_response) | |||
mock_post.return_value.json.return_value = oauth_response | |||
url = "/integrations/add_slack_btn/?code=12345678&state=foo" | |||
self.assertTrue("add_slack" in self.client.session) | |||
@override_settings(SLACK_CLIENT_ID=None) | |||
def test_it_requires_client_id(self): | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.get(url, follow=True) | |||
self.assertRedirects(r, "/integrations/") | |||
self.assertContains(r, "something went wrong") | |||
r = self.client.get(self.url) | |||
self.assertEqual(r.status_code, 404) |
@ -0,0 +1,75 @@ | |||
import json | |||
from unittest.mock import patch | |||
from django.test.utils import override_settings | |||
from hc.api.models import Channel | |||
from hc.test import BaseTestCase | |||
@override_settings(SLACK_CLIENT_ID="fake-client-id") | |||
class AddSlackCompleteTestCase(BaseTestCase): | |||
@patch("hc.front.views.requests.post") | |||
def test_it_handles_oauth_response(self, mock_post): | |||
session = self.client.session | |||
session["add_slack"] = ("foo", str(self.project.code)) | |||
session.save() | |||
oauth_response = { | |||
"ok": True, | |||
"team_name": "foo", | |||
"incoming_webhook": {"url": "http://example.org", "channel": "bar"}, | |||
} | |||
mock_post.return_value.text = json.dumps(oauth_response) | |||
mock_post.return_value.json.return_value = oauth_response | |||
url = "/integrations/add_slack_btn/?code=12345678&state=foo" | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.get(url, follow=True) | |||
self.assertRedirects(r, self.channels_url) | |||
self.assertContains(r, "The Slack integration has been added!") | |||
ch = Channel.objects.get() | |||
self.assertEqual(ch.slack_team, "foo") | |||
self.assertEqual(ch.slack_channel, "bar") | |||
self.assertEqual(ch.slack_webhook_url, "http://example.org") | |||
self.assertEqual(ch.project, self.project) | |||
# Session should now be clean | |||
self.assertFalse("add_slack" in self.client.session) | |||
def test_it_avoids_csrf(self): | |||
session = self.client.session | |||
session["add_slack"] = ("foo", str(self.project.code)) | |||
session.save() | |||
url = "/integrations/add_slack_btn/?code=12345678&state=bar" | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.get(url) | |||
self.assertEqual(r.status_code, 403) | |||
@patch("hc.front.views.requests.post") | |||
def test_it_handles_oauth_error(self, mock_post): | |||
session = self.client.session | |||
session["add_slack"] = ("foo", str(self.project.code)) | |||
session.save() | |||
oauth_response = {"ok": False, "error": "something went wrong"} | |||
mock_post.return_value.text = json.dumps(oauth_response) | |||
mock_post.return_value.json.return_value = oauth_response | |||
url = "/integrations/add_slack_btn/?code=12345678&state=foo" | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.get(url, follow=True) | |||
self.assertRedirects(r, self.channels_url) | |||
self.assertContains(r, "something went wrong") | |||
@override_settings(SLACK_CLIENT_ID=None) | |||
def test_it_requires_client_id(self): | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.get("/integrations/add_slack_btn/?code=12345678&state=foo") | |||
self.assertEqual(r.status_code, 404) |
@ -0,0 +1,14 @@ | |||
from django.test.utils import override_settings | |||
from hc.test import BaseTestCase | |||
@override_settings(SLACK_CLIENT_ID="fake-client-id") | |||
class AddSlackHelpTestCase(BaseTestCase): | |||
def test_instructions_work(self): | |||
r = self.client.get("/integrations/add_slack/") | |||
self.assertContains(r, "Setup Guide", status_code=200) | |||
@override_settings(SLACK_CLIENT_ID=None) | |||
def test_it_requires_client_id(self): | |||
r = self.client.get("/integrations/add_slack/") | |||
self.assertEqual(r.status_code, 404) |
@ -5,7 +5,9 @@ from hc.test import BaseTestCase | |||
@override_settings(TWILIO_ACCOUNT="foo", TWILIO_AUTH="foo", TWILIO_FROM="123") | |||
class AddSmsTestCase(BaseTestCase): | |||
url = "/integrations/add_sms/" | |||
def setUp(self): | |||
super(AddSmsTestCase, self).setUp() | |||
self.url = "/projects/%s/add_sms/" % self.project.code | |||
def test_instructions_work(self): | |||
self.client.login(username="[email protected]", password="password") | |||
@ -26,7 +28,7 @@ class AddSmsTestCase(BaseTestCase): | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.post(self.url, form) | |||
self.assertRedirects(r, "/integrations/") | |||
self.assertRedirects(r, self.channels_url) | |||
c = Channel.objects.get() | |||
self.assertEqual(c.kind, "sms") | |||
@ -53,5 +55,5 @@ class AddSmsTestCase(BaseTestCase): | |||
@override_settings(TWILIO_AUTH=None) | |||
def test_it_requires_credentials(self): | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.get("/integrations/add_sms/") | |||
r = self.client.get(self.url) | |||
self.assertEqual(r.status_code, 404) |
@ -1,9 +1,12 @@ | |||
from unittest.mock import patch | |||
from django.core import signing | |||
from django.test.utils import override_settings | |||
from hc.api.models import Channel | |||
from hc.test import BaseTestCase | |||
from mock import patch | |||
@override_settings(TELEGRAM_TOKEN="fake-token") | |||
class AddTelegramTestCase(BaseTestCase): | |||
url = "/integrations/add_telegram/" | |||
@ -12,6 +15,14 @@ class AddTelegramTestCase(BaseTestCase): | |||
r = self.client.get(self.url) | |||
self.assertContains(r, "start@ExampleBot") | |||
@override_settings(TELEGRAM_TOKEN=None) | |||
def test_it_requires_token(self): | |||
payload = signing.dumps((123, "group", "My Group")) | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.get(self.url + "?" + payload) | |||
self.assertEqual(r.status_code, 404) | |||
def test_it_shows_confirmation(self): | |||
payload = signing.dumps((123, "group", "My Group")) | |||
@ -23,8 +34,9 @@ class AddTelegramTestCase(BaseTestCase): | |||
payload = signing.dumps((123, "group", "My Group")) | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.post(self.url + "?" + payload, {}) | |||
self.assertRedirects(r, "/integrations/") | |||
form = {"project": str(self.project.code)} | |||
r = self.client.post(self.url + "?" + payload, form) | |||
self.assertRedirects(r, self.channels_url) | |||
c = Channel.objects.get() | |||
self.assertEqual(c.kind, "telegram") | |||
@ -33,6 +45,13 @@ class AddTelegramTestCase(BaseTestCase): | |||
self.assertEqual(c.telegram_name, "My Group") | |||
self.assertEqual(c.project, self.project) | |||
def test_it_handles_bad_signature(self): | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.get(self.url + "?bad-signature") | |||
self.assertContains(r, "Incorrect Link") | |||
self.assertFalse(Channel.objects.exists()) | |||
@patch("hc.api.transports.requests.request") | |||
def test_it_sends_invite(self, mock_get): | |||
data = { | |||
@ -5,16 +5,17 @@ from hc.api.models import Channel | |||
from hc.test import BaseTestCase | |||
class AddPagerTreeTestCase(BaseTestCase): | |||
url = "/integrations/add_trello/" | |||
@override_settings(TRELLO_APP_KEY="foo") | |||
class AddTrelloTestCase(BaseTestCase): | |||
def setUp(self): | |||
super(AddTrelloTestCase, self).setUp() | |||
self.url = "/projects/%s/add_trello/" % self.project.code | |||
@override_settings(TRELLO_APP_KEY="foo") | |||
def test_instructions_work(self): | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.get(self.url) | |||
self.assertContains(r, "Trello") | |||
@override_settings(TRELLO_APP_KEY="foo") | |||
def test_it_works(self): | |||
form = { | |||
"settings": json.dumps( | |||
@ -29,9 +30,15 @@ class AddPagerTreeTestCase(BaseTestCase): | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.post(self.url, form) | |||
self.assertRedirects(r, "/integrations/") | |||
self.assertRedirects(r, self.channels_url) | |||
c = Channel.objects.get() | |||
self.assertEqual(c.kind, "trello") | |||
self.assertEqual(c.trello_token, "fake-token") | |||
self.assertEqual(c.project, self.project) | |||
@override_settings(TRELLO_APP_KEY=None) | |||
def test_it_requires_trello_app_key(self): | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.get(self.url) | |||
self.assertEqual(r.status_code, 404) |
@ -3,7 +3,9 @@ from hc.test import BaseTestCase | |||
class AddVictorOpsTestCase(BaseTestCase): | |||
url = "/integrations/add_victorops/" | |||
def setUp(self): | |||
super(AddVictorOpsTestCase, self).setUp() | |||
self.url = "/projects/%s/add_victorops/" % self.project.code | |||
def test_instructions_work(self): | |||
self.client.login(username="[email protected]", password="password") | |||
@ -15,7 +17,7 @@ class AddVictorOpsTestCase(BaseTestCase): | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.post(self.url, form) | |||
self.assertRedirects(r, "/integrations/") | |||
self.assertRedirects(r, self.channels_url) | |||
c = Channel.objects.get() | |||
self.assertEqual(c.kind, "victorops") | |||
@ -3,13 +3,31 @@ from hc.test import BaseTestCase | |||
class AddWebhookTestCase(BaseTestCase): | |||
url = "/integrations/add_webhook/" | |||
def setUp(self): | |||
super(AddWebhookTestCase, self).setUp() | |||
self.url = "/projects/%s/add_webhook/" % self.project.code | |||
def test_instructions_work(self): | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.get(self.url) | |||
self.assertContains(r, "Executes an HTTP request") | |||
def test_it_saves_name(self): | |||
form = { | |||
"name": "Call foo.com", | |||
"method_down": "GET", | |||
"url_down": "http://foo.com", | |||
"method_up": "GET", | |||
"url_up": "", | |||
} | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.post(self.url, form) | |||
self.assertRedirects(r, self.channels_url) | |||
c = Channel.objects.get() | |||
self.assertEqual(c.name, "Call foo.com") | |||
def test_it_adds_two_webhook_urls_and_redirects(self): | |||
form = { | |||
"method_down": "GET", | |||
@ -20,7 +38,7 @@ class AddWebhookTestCase(BaseTestCase): | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.post(self.url, form) | |||
self.assertRedirects(r, "/integrations/") | |||
self.assertRedirects(r, self.channels_url) | |||
c = Channel.objects.get() | |||
self.assertEqual(c.project, self.project) | |||
@ -95,7 +113,7 @@ class AddWebhookTestCase(BaseTestCase): | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.post(self.url, form) | |||
self.assertRedirects(r, "/integrations/") | |||
self.assertRedirects(r, self.channels_url) | |||
c = Channel.objects.get() | |||
self.assertEqual(c.down_webhook_spec["body"], "hello") | |||
@ -110,7 +128,7 @@ class AddWebhookTestCase(BaseTestCase): | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.post(self.url, form) | |||
self.assertRedirects(r, "/integrations/") | |||
self.assertRedirects(r, self.channels_url) | |||
c = Channel.objects.get() | |||
self.assertEqual( | |||
@ -122,12 +140,12 @@ class AddWebhookTestCase(BaseTestCase): | |||
form = { | |||
"method_down": "GET", | |||
"url_down": "http://example.org", | |||
"headers_down": "invalid-headers", | |||
"headers_down": "invalid-header\nfoo:bar", | |||
"method_up": "GET", | |||
} | |||
r = self.client.post(self.url, form) | |||
self.assertContains(r, """invalid-headers""") | |||
self.assertContains(r, """invalid-header""") | |||
self.assertEqual(Channel.objects.count(), 0) | |||
def test_it_strips_headers(self): | |||
@ -140,7 +158,22 @@ class AddWebhookTestCase(BaseTestCase): | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.post(self.url, form) | |||
self.assertRedirects(r, "/integrations/") | |||
self.assertRedirects(r, self.channels_url) | |||
c = Channel.objects.get() | |||
self.assertEqual(c.down_webhook_spec["headers"], {"test": "123"}) | |||
def test_it_rejects_both_empty(self): | |||
self.client.login(username="[email protected]", password="password") | |||
form = { | |||
"method_down": "GET", | |||
"url_down": "", | |||
"method_up": "GET", | |||
"url_up": "", | |||
} | |||
r = self.client.post(self.url, form) | |||
self.assertContains(r, "Enter a valid URL.") | |||
self.assertEqual(Channel.objects.count(), 0) |
@ -12,7 +12,9 @@ TEST_CREDENTIALS = { | |||
@override_settings(**TEST_CREDENTIALS) | |||
class AddWhatsAppTestCase(BaseTestCase): | |||
url = "/integrations/add_whatsapp/" | |||
def setUp(self): | |||
super(AddWhatsAppTestCase, self).setUp() | |||
self.url = "/projects/%s/add_whatsapp/" % self.project.code | |||
def test_instructions_work(self): | |||
self.client.login(username="[email protected]", password="password") | |||
@ -38,7 +40,7 @@ class AddWhatsAppTestCase(BaseTestCase): | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.post(self.url, form) | |||
self.assertRedirects(r, "/integrations/") | |||
self.assertRedirects(r, self.channels_url) | |||
c = Channel.objects.get() | |||
self.assertEqual(c.kind, "whatsapp") | |||
@ -53,7 +55,7 @@ class AddWhatsAppTestCase(BaseTestCase): | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.post(self.url, form) | |||
self.assertRedirects(r, "/integrations/") | |||
self.assertRedirects(r, self.channels_url) | |||
c = Channel.objects.get() | |||
self.assertEqual(c.kind, "whatsapp") | |||
@ -0,0 +1,80 @@ | |||
from hc.api.models import Channel | |||
from hc.test import BaseTestCase | |||
class AddZulipTestCase(BaseTestCase): | |||
def setUp(self): | |||
super(AddZulipTestCase, self).setUp() | |||
self.url = "/projects/%s/add_zulip/" % self.project.code | |||
def test_instructions_work(self): | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.get(self.url) | |||
self.assertContains(r, "open-source group chat app") | |||
def test_it_works(self): | |||
form = { | |||
"bot_email": "[email protected]", | |||
"api_key": "fake-key", | |||
"mtype": "stream", | |||
"to": "general", | |||
} | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.post(self.url, form) | |||
self.assertRedirects(r, self.channels_url) | |||
c = Channel.objects.get() | |||
self.assertEqual(c.kind, "zulip") | |||
self.assertEqual(c.zulip_bot_email, "[email protected]") | |||
self.assertEqual(c.zulip_api_key, "fake-key") | |||
self.assertEqual(c.zulip_type, "stream") | |||
self.assertEqual(c.zulip_to, "general") | |||
def test_it_rejects_bad_email(self): | |||
form = { | |||
"bot_email": "not@an@email", | |||
"api_key": "fake-key", | |||
"mtype": "stream", | |||
"to": "general", | |||
} | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.post(self.url, form) | |||
self.assertContains(r, "Enter a valid email address.") | |||
def test_it_rejects_missing_api_key(self): | |||
form = { | |||
"bot_email": "[email protected]", | |||
"api_key": "", | |||
"mtype": "stream", | |||
"to": "general", | |||
} | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.post(self.url, form) | |||
self.assertContains(r, "This field is required.") | |||
def test_it_rejects_bad_mtype(self): | |||
form = { | |||
"bot_email": "[email protected]", | |||
"api_key": "fake-key", | |||
"mtype": "this-should-not-work", | |||
"to": "general", | |||
} | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.post(self.url, form) | |||
self.assertEqual(r.status_code, 200) | |||
def test_it_rejects_missing_stream_name(self): | |||
form = { | |||
"bot_email": "[email protected]", | |||
"api_key": "fake-key", | |||
"mtype": "stream", | |||
"to": "", | |||
} | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.post(self.url, form) | |||
self.assertContains(r, "This field is required.") |
@ -1,4 +1,4 @@ | |||
from hc.api.models import Channel | |||
from hc.api.models import Channel, Check | |||
from hc.test import BaseTestCase | |||
@ -9,11 +9,14 @@ class ChannelChecksTestCase(BaseTestCase): | |||
self.channel.value = "[email protected]" | |||
self.channel.save() | |||
Check.objects.create(project=self.project, name="Database Backups") | |||
def test_it_works(self): | |||
url = "/integrations/%s/checks/" % self.channel.code | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.get(url) | |||
self.assertContains(r, "Database Backups") | |||
self.assertContains(r, "Assign Checks to Integration", status_code=200) | |||
def test_team_access_works(self): | |||
@ -31,7 +34,7 @@ class ChannelChecksTestCase(BaseTestCase): | |||
url = "/integrations/%s/checks/" % self.channel.code | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.get(url) | |||
assert r.status_code == 403 | |||
self.assertEqual(r.status_code, 404) | |||
def test_missing_channel(self): | |||
# Valid UUID but there is no channel for it: | |||
@ -39,4 +42,4 @@ class ChannelChecksTestCase(BaseTestCase): | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.get(url) | |||
assert r.status_code == 404 | |||
self.assertEqual(r.status_code, 404) |
@ -17,17 +17,28 @@ class ChannelsTestCase(BaseTestCase): | |||
ch.save() | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.get("/integrations/") | |||
r = self.client.get(self.channels_url) | |||
self.assertContains(r, "foo-team", status_code=200) | |||
self.assertContains(r, "#bar") | |||
def test_it_shows_webhook_post_data(self): | |||
ch = Channel(kind="webhook", project=self.project) | |||
ch.value = "http://down.example.com\nhttp://up.example.com\nfoobar" | |||
ch.value = json.dumps( | |||
{ | |||
"method_down": "POST", | |||
"url_down": "http://down.example.com", | |||
"body_down": "foobar", | |||
"headers_down": {}, | |||
"method_up": "GET", | |||
"url_up": "http://up.example.com", | |||
"body_up": "", | |||
"headers_up": {}, | |||
} | |||
) | |||
ch.save() | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.get("/integrations/") | |||
r = self.client.get(self.channels_url) | |||
self.assertEqual(r.status_code, 200) | |||
# These are inside a modal: | |||
@ -41,7 +52,7 @@ class ChannelsTestCase(BaseTestCase): | |||
ch.save() | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.get("/integrations/") | |||
r = self.client.get(self.channels_url) | |||
self.assertEqual(r.status_code, 200) | |||
self.assertContains(r, "(normal priority)") | |||
@ -58,7 +69,7 @@ class ChannelsTestCase(BaseTestCase): | |||
n.save() | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.get("/integrations/") | |||
r = self.client.get(self.channels_url) | |||
self.assertEqual(r.status_code, 200) | |||
self.assertContains(r, "Disabled") | |||
@ -68,7 +79,7 @@ class ChannelsTestCase(BaseTestCase): | |||
channel.save() | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.get("/integrations/") | |||
r = self.client.get(self.channels_url) | |||
self.assertEqual(r.status_code, 200) | |||
self.assertContains(r, "Unconfirmed") | |||
@ -80,7 +91,7 @@ class ChannelsTestCase(BaseTestCase): | |||
channel.save() | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.get("/integrations/") | |||
r = self.client.get(self.channels_url) | |||
self.assertEqual(r.status_code, 200) | |||
self.assertContains(r, "(down only)") | |||
@ -92,25 +103,24 @@ class ChannelsTestCase(BaseTestCase): | |||
channel.save() | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.get("/integrations/") | |||
r = self.client.get(self.channels_url) | |||
self.assertEqual(r.status_code, 200) | |||
self.assertContains(r, "(up only)") | |||
def test_it_shows_sms_label(self): | |||
def test_it_shows_sms_number(self): | |||
ch = Channel(kind="sms", project=self.project) | |||
ch.value = json.dumps({"value": "+123", "label": "My Phone"}) | |||
ch.value = json.dumps({"value": "+123"}) | |||
ch.save() | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.get("/integrations/") | |||
r = self.client.get(self.channels_url) | |||
self.assertEqual(r.status_code, 200) | |||
self.assertContains(r, "SMS to +123") | |||
def test_it_requires_current_project(self): | |||
self.profile.current_project = None | |||
self.profile.save() | |||
def test_it_shows_channel_issues_indicator(self): | |||
Channel.objects.create(kind="sms", project=self.project, last_error="x") | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.get("/integrations/") | |||
self.assertRedirects(r, "/") | |||
r = self.client.get(self.channels_url) | |||
self.assertContains(r, "broken-channels", status_code=200) |
@ -0,0 +1,18 @@ | |||
from hc.api.models import Check | |||
from hc.test import BaseTestCase | |||
class CopyCheckTestCase(BaseTestCase): | |||
def setUp(self): | |||
super(CopyCheckTestCase, self).setUp() | |||
self.check = Check(project=self.project) | |||
self.check.name = "Foo" | |||
self.check.save() | |||
self.copy_url = "/checks/%s/copy/" % self.check.code | |||
def test_it_works(self): | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.post(self.copy_url, follow=True) | |||
self.assertContains(r, "This is a brand new check") | |||
self.assertContains(r, "Foo (copy)") |
@ -37,9 +37,6 @@ class DetailsTestCase(BaseTestCase): | |||
self.assertContains(r, "Cron Expression", status_code=200) | |||
def test_it_allows_cross_team_access(self): | |||
self.bobs_profile.current_project = None | |||
self.bobs_profile.save() | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.get(self.url) | |||
self.assertEqual(r.status_code, 200) | |||
@ -0,0 +1,84 @@ | |||
import json | |||
from hc.api.models import Channel | |||
from hc.test import BaseTestCase | |||
class EditWebhookTestCase(BaseTestCase): | |||
def setUp(self): | |||
super(EditWebhookTestCase, self).setUp() | |||
definition = { | |||
"method_down": "GET", | |||
"url_down": "http://example.org/down", | |||
"body_down": "$NAME is down", | |||
"headers_down": {"User-Agent": "My-Custom-UA"}, | |||
"method_up": "GET", | |||
"url_up": "http://example.org/up", | |||
"body_up": "$NAME is up", | |||
"headers_up": {}, | |||
} | |||
self.channel = Channel(project=self.project, kind="webhook") | |||
self.channel.name = "Call example.org" | |||
self.channel.value = json.dumps(definition) | |||
self.channel.save() | |||
self.url = "/integrations/%s/edit_webhook/" % self.channel.code | |||
def test_it_shows_form(self): | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.get(self.url) | |||
self.assertContains(r, "Webhook Settings") | |||
self.assertContains(r, "Call example.org") | |||
# down | |||
self.assertContains(r, "http://example.org/down") | |||
self.assertContains(r, "My-Custom-UA") | |||
self.assertContains(r, "$NAME is down") | |||
# up | |||
self.assertContains(r, "http://example.org/up") | |||
self.assertContains(r, "$NAME is up") | |||
def test_it_saves_form_and_redirects(self): | |||
form = { | |||
"name": "Call foo.com / bar.com", | |||
"method_down": "POST", | |||
"url_down": "http://foo.com", | |||
"headers_down": "X-Foo: 1\nX-Bar: 2", | |||
"body_down": "going down", | |||
"method_up": "POST", | |||
"url_up": "https://bar.com", | |||
"headers_up": "Content-Type: text/plain", | |||
"body_up": "going up", | |||
} | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.post(self.url, form) | |||
self.assertRedirects(r, self.channels_url) | |||
self.channel.refresh_from_db() | |||
self.assertEqual(self.channel.name, "Call foo.com / bar.com") | |||
down_spec = self.channel.down_webhook_spec | |||
self.assertEqual(down_spec["method"], "POST") | |||
self.assertEqual(down_spec["url"], "http://foo.com") | |||
self.assertEqual(down_spec["body"], "going down") | |||
self.assertEqual(down_spec["headers"], {"X-Foo": "1", "X-Bar": "2"}) | |||
up_spec = self.channel.up_webhook_spec | |||
self.assertEqual(up_spec["method"], "POST") | |||
self.assertEqual(up_spec["url"], "https://bar.com") | |||
self.assertEqual(up_spec["body"], "going up") | |||
self.assertEqual(up_spec["headers"], {"Content-Type": "text/plain"}) | |||
def test_it_requires_kind_webhook(self): | |||
self.channel.kind = "email" | |||
self.channel.value = "[email protected]" | |||
self.channel.save() | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.get(self.url) | |||
self.assertEqual(r.status_code, 400) |
@ -0,0 +1,31 @@ | |||
from hc.api.models import Check | |||
from hc.test import BaseTestCase | |||
class FilteringRulesTestCase(BaseTestCase): | |||
def setUp(self): | |||
super(FilteringRulesTestCase, self).setUp() | |||
self.check = Check.objects.create(project=self.project) | |||
self.url = "/checks/%s/filtering_rules/" % self.check.code | |||
self.redirect_url = "/checks/%s/details/" % self.check.code | |||
def test_it_works(self): | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.post(self.url, data={"subject": "SUCCESS", "methods": "POST"}) | |||
self.assertRedirects(r, self.redirect_url) | |||
self.check.refresh_from_db() | |||
self.assertEqual(self.check.subject, "SUCCESS") | |||
self.assertEqual(self.check.methods, "POST") | |||
def test_it_clears_method(self): | |||
self.check.method = "POST" | |||
self.check.save() | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.post(self.url, data={"subject": "SUCCESS", "methods": ""}) | |||
self.assertRedirects(r, self.redirect_url) | |||
self.check.refresh_from_db() | |||
self.assertEqual(self.check.methods, "") |
@ -61,7 +61,7 @@ class LogTestCase(BaseTestCase): | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.get(self.url) | |||
self.assertContains(r, "Sent email alert to [email protected]", status_code=200) | |||
self.assertContains(r, "Sent email to [email protected]", status_code=200) | |||
def test_it_shows_pushover_notification(self): | |||
ch = Channel.objects.create(kind="po", project=self.project) | |||
@ -74,7 +74,14 @@ class LogTestCase(BaseTestCase): | |||
def test_it_shows_webhook_notification(self): | |||
ch = Channel(kind="webhook", project=self.project) | |||
ch.value = "foo/$NAME" | |||
ch.value = json.dumps( | |||
{ | |||
"method_down": "GET", | |||
"url_down": "foo/$NAME", | |||
"body_down": "", | |||
"headers_down": {}, | |||
} | |||
) | |||
ch.save() | |||
Notification(owner=self.check, channel=ch, check_status="down").save() | |||
@ -84,9 +91,6 @@ class LogTestCase(BaseTestCase): | |||
self.assertContains(r, "Called webhook foo/$NAME", status_code=200) | |||
def test_it_allows_cross_team_access(self): | |||
self.bobs_profile.current_project = None | |||
self.bobs_profile.save() | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.get(self.url) | |||
self.assertEqual(r.status_code, 200) |
@ -0,0 +1,43 @@ | |||
from hc.api.models import Check | |||
from hc.test import BaseTestCase | |||
class MetricsTestCase(BaseTestCase): | |||
def setUp(self): | |||
super(MetricsTestCase, self).setUp() | |||
self.project.api_key_readonly = "R" * 32 | |||
self.project.save() | |||
self.check = Check(project=self.project, name="Alice Was Here") | |||
self.check.tags = "foo" | |||
self.check.save() | |||
key = "R" * 32 | |||
self.url = "/projects/%s/checks/metrics/%s" % (self.project.code, key) | |||
def test_it_works(self): | |||
r = self.client.get(self.url) | |||
self.assertEqual(r.status_code, 200) | |||
self.assertContains(r, 'name="Alice Was Here"') | |||
self.assertContains(r, 'tags="foo"') | |||
self.assertContains(r, 'tag="foo"') | |||
self.assertContains(r, "hc_checks_total 1") | |||
def test_it_escapes_newline(self): | |||
self.check.name = "Line 1\nLine2" | |||
self.check.tags = "A\\C" | |||
self.check.save() | |||
r = self.client.get(self.url) | |||
self.assertEqual(r.status_code, 200) | |||
self.assertContains(r, "Line 1\\nLine2") | |||
self.assertContains(r, "A\\\\C") | |||
def test_it_checks_api_key_length(self): | |||
r = self.client.get(self.url + "R") | |||
self.assertEqual(r.status_code, 400) | |||
def test_it_checks_api_key(self): | |||
url = "/projects/%s/checks/metrics/%s" % (self.project.code, "X" * 32) | |||
r = self.client.get(url) | |||
self.assertEqual(r.status_code, 403) |
@ -18,16 +18,28 @@ class MyChecksTestCase(BaseTestCase): | |||
r = self.client.get(self.url) | |||
self.assertContains(r, "Alice Was Here", status_code=200) | |||
def test_it_updates_current_project(self): | |||
self.profile.current_project = None | |||
# last_active_date should have been set | |||
self.profile.refresh_from_db() | |||
self.assertTrue(self.profile.last_active_date) | |||
def test_it_bumps_last_active_date(self): | |||
self.profile.last_active_date = timezone.now() - td(days=10) | |||
self.profile.save() | |||
self.client.login(username="[email protected]", password="password") | |||
self.client.get(self.url) | |||
# last_active_date should have been bumped | |||
self.profile.refresh_from_db() | |||
delta = timezone.now() - self.profile.last_active_date | |||
self.assertTrue(delta.total_seconds() < 1) | |||
def test_it_updates_session(self): | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.get(self.url) | |||
self.assertEqual(r.status_code, 200) | |||
self.profile.refresh_from_db() | |||
self.assertEqual(self.profile.current_project, self.project) | |||
self.assertEqual(self.client.session["last_project_id"], self.project.id) | |||
def test_it_checks_access(self): | |||
self.client.login(username="[email protected]", password="password") | |||
@ -26,9 +26,6 @@ class PauseTestCase(BaseTestCase): | |||
self.assertEqual(r.status_code, 405) | |||
def test_it_allows_cross_team_access(self): | |||
self.bobs_profile.current_project = None | |||
self.bobs_profile.save() | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.post(self.url) | |||
self.assertRedirects(r, self.redirect_url) | |||
@ -44,3 +41,8 @@ class PauseTestCase(BaseTestCase): | |||
self.check.refresh_from_db() | |||
self.assertEqual(self.check.last_start, None) | |||
self.assertEqual(self.check.alert_after, None) | |||
def test_it_does_not_redirect_ajax(self): | |||
self.client.login(username="[email protected]", password="password") | |||
r = self.client.post(self.url, HTTP_X_REQUESTED_WITH="XMLHttpRequest") | |||
self.assertEqual(r.status_code, 200) |