diff --git a/CHANGELOG.md b/CHANGELOG.md index 188134fc..4195dbe3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ All notable changes to this project will be documented in this file. - Fix after-login redirects (the "?next=" query parameter) - Update Check.status field when user edits timeout & grace settings - Use timezone-aware datetimes with croniter, avoid ambiguities around DST +- Validate and reject cron schedules with six components ## 1.3.0 - 2018-11-21 diff --git a/hc/front/tests/test_cron_preview.py b/hc/front/tests/test_cron_preview.py index 571a3aeb..5ed68d0e 100644 --- a/hc/front/tests/test_cron_preview.py +++ b/hc/front/tests/test_cron_preview.py @@ -15,8 +15,8 @@ class CronPreviewTestCase(BaseTestCase): r = self.client.post("/checks/cron_preview/", payload) self.assertContains(r, "cron-preview-title", status_code=200) - def test_it_handles_invalid_cron_expression(self): - for schedule in [None, "", "*", "100 100 100 100 100"]: + def test_it_rejects_invalid_cron_expression(self): + for schedule in [None, "", "*", "100 100 100 100 100", "* * * * * *"]: payload = {"schedule": schedule, "tz": "UTC"} r = self.client.post("/checks/cron_preview/", payload) self.assertContains(r, "Invalid cron expression", status_code=200) diff --git a/hc/front/tests/test_update_timeout.py b/hc/front/tests/test_update_timeout.py index 186b8937..c4a03933 100644 --- a/hc/front/tests/test_update_timeout.py +++ b/hc/front/tests/test_update_timeout.py @@ -13,12 +13,13 @@ class UpdateTimeoutTestCase(BaseTestCase): self.check.last_ping = timezone.now() self.check.save() + self.url = "/checks/%s/timeout/" % self.check.code + def test_it_works(self): - url = "/checks/%s/timeout/" % self.check.code payload = {"kind": "simple", "timeout": 3600, "grace": 60} self.client.login(username="alice@example.org", password="password") - r = self.client.post(url, data=payload) + r = self.client.post(self.url, data=payload) self.assertRedirects(r, "/checks/") self.check.refresh_from_db() @@ -34,12 +35,11 @@ class UpdateTimeoutTestCase(BaseTestCase): self.check.status = "down" self.check.save() - url = "/checks/%s/timeout/" % self.check.code # 1 week: payload = {"kind": "simple", "timeout": 3600 * 24 * 7, "grace": 60} self.client.login(username="alice@example.org", password="password") - r = self.client.post(url, data=payload) + r = self.client.post(self.url, data=payload) self.assertRedirects(r, "/checks/") self.check.refresh_from_db() @@ -51,7 +51,6 @@ class UpdateTimeoutTestCase(BaseTestCase): self.assertEqual(flip.new_status, "up") def test_it_saves_cron_expression(self): - url = "/checks/%s/timeout/" % self.check.code payload = { "kind": "cron", "schedule": "5 * * * *", @@ -60,7 +59,7 @@ class UpdateTimeoutTestCase(BaseTestCase): } self.client.login(username="alice@example.org", password="password") - r = self.client.post(url, data=payload) + r = self.client.post(self.url, data=payload) self.assertRedirects(r, "/checks/") self.check.refresh_from_db() @@ -68,10 +67,6 @@ class UpdateTimeoutTestCase(BaseTestCase): self.assertEqual(self.check.schedule, "5 * * * *") def test_it_validates_cron_expression(self): - self.check.last_ping = None - self.check.save() - - url = "/checks/%s/timeout/" % self.check.code payload = { "kind": "cron", "schedule": "* invalid *", @@ -80,18 +75,30 @@ class UpdateTimeoutTestCase(BaseTestCase): } self.client.login(username="alice@example.org", password="password") - r = self.client.post(url, data=payload) + r = self.client.post(self.url, data=payload) self.assertEqual(r.status_code, 400) # Check should still have its original data: self.check.refresh_from_db() self.assertEqual(self.check.kind, "simple") - def test_it_validates_tz(self): - self.check.last_ping = None - self.check.save() + def test_it_rejects_six_field_cron_expression(self): + payload = { + "kind": "cron", + "schedule": "* * * * * *", # six fields instead of five + "tz": "UTC", + "grace": 60 + } - url = "/checks/%s/timeout/" % self.check.code + self.client.login(username="alice@example.org", password="password") + r = self.client.post(self.url, data=payload) + self.assertEqual(r.status_code, 400) + + # Check should still have its original data: + self.check.refresh_from_db() + self.assertEqual(self.check.kind, "simple") + + def test_it_validates_tz(self): payload = { "kind": "cron", "schedule": "* * * * *", @@ -100,7 +107,7 @@ class UpdateTimeoutTestCase(BaseTestCase): } self.client.login(username="alice@example.org", password="password") - r = self.client.post(url, data=payload) + r = self.client.post(self.url, data=payload) self.assertEqual(r.status_code, 400) # Check should still have its original data: @@ -108,7 +115,6 @@ class UpdateTimeoutTestCase(BaseTestCase): self.assertEqual(self.check.kind, "simple") def test_it_rejects_missing_schedule(self): - url = "/checks/%s/timeout/" % self.check.code # tz field is omitted so this should fail: payload = { "kind": "cron", @@ -117,11 +123,10 @@ class UpdateTimeoutTestCase(BaseTestCase): } self.client.login(username="alice@example.org", password="password") - r = self.client.post(url, data=payload) + r = self.client.post(self.url, data=payload) self.assertEqual(r.status_code, 400) def test_it_rejects_missing_tz(self): - url = "/checks/%s/timeout/" % self.check.code # tz field is omitted so this should fail: payload = { "kind": "cron", @@ -130,17 +135,16 @@ class UpdateTimeoutTestCase(BaseTestCase): } self.client.login(username="alice@example.org", password="password") - r = self.client.post(url, data=payload) + r = self.client.post(self.url, data=payload) self.assertEqual(r.status_code, 400) def test_team_access_works(self): - url = "/checks/%s/timeout/" % self.check.code payload = {"kind": "simple", "timeout": 7200, "grace": 60} # Logging in as bob, not alice. Bob has team access so this # should work. self.client.login(username="bob@example.org", password="password") - self.client.post(url, data=payload) + self.client.post(self.url, data=payload) check = Check.objects.get(code=self.check.code) assert check.timeout.total_seconds() == 7200 @@ -163,26 +167,23 @@ class UpdateTimeoutTestCase(BaseTestCase): assert r.status_code == 404 def test_it_checks_ownership(self): - url = "/checks/%s/timeout/" % self.check.code payload = {"timeout": 3600, "grace": 60} self.client.login(username="charlie@example.org", password="password") - r = self.client.post(url, data=payload) + r = self.client.post(self.url, data=payload) self.assertEqual(r.status_code, 404) def test_it_rejects_get(self): - url = "/checks/%s/timeout/" % self.check.code self.client.login(username="alice@example.org", password="password") - r = self.client.get(url) + r = self.client.get(self.url) self.assertEqual(r.status_code, 405) def test_it_allows_cross_team_access(self): self.bobs_profile.current_team = None self.bobs_profile.save() - url = "/checks/%s/timeout/" % self.check.code payload = {"kind": "simple", "timeout": 3600, "grace": 60} self.client.login(username="bob@example.org", password="password") - r = self.client.post(url, data=payload) + r = self.client.post(self.url, data=payload) self.assertRedirects(r, "/checks/") diff --git a/hc/front/validators.py b/hc/front/validators.py index 77eabb7e..89750ff9 100644 --- a/hc/front/validators.py +++ b/hc/front/validators.py @@ -20,9 +20,11 @@ class CronExpressionValidator(object): message = "Not a valid cron expression." def __call__(self, value): - try: - croniter(value) - except: + # Expect 5 components- + if len(value.split()) != 5: + raise ValidationError(message=self.message) + + if not croniter.is_valid(value): raise ValidationError(message=self.message) diff --git a/hc/front/views.py b/hc/front/views.py index d465a2f5..29700479 100644 --- a/hc/front/views.py +++ b/hc/front/views.py @@ -321,20 +321,25 @@ def update_timeout(request, code): @require_POST def cron_preview(request): - schedule = request.POST.get("schedule") + schedule = request.POST.get("schedule", "") tz = request.POST.get("tz") ctx = {"tz": tz, "dates": []} - try: - zone = pytz.timezone(tz) - now_local = timezone.localtime(timezone.now(), zone) - it = croniter(schedule, now_local) - for i in range(0, 6): - ctx["dates"].append(it.get_next(datetime)) - except UnknownTimeZoneError: - ctx["bad_tz"] = True - except: + + if len(schedule.split()) != 5: + ctx["bad_schedule"] = True + elif not croniter.is_valid(schedule): ctx["bad_schedule"] = True + if "bad_schedule" not in ctx: + try: + zone = pytz.timezone(tz) + now_local = timezone.localtime(timezone.now(), zone) + it = croniter(schedule, now_local) + for i in range(0, 6): + ctx["dates"].append(it.get_next(datetime)) + except UnknownTimeZoneError: + ctx["bad_tz"] = True + return render(request, "front/cron_preview.html", ctx)