Browse Source

Validate and reject cron schedules with six components

pull/211/head
Pēteris Caune 6 years ago
parent
commit
a402dce293
No known key found for this signature in database GPG Key ID: E28D7679E9A9EDE2
5 changed files with 52 additions and 43 deletions
  1. +1
    -0
      CHANGELOG.md
  2. +2
    -2
      hc/front/tests/test_cron_preview.py
  3. +29
    -28
      hc/front/tests/test_update_timeout.py
  4. +5
    -3
      hc/front/validators.py
  5. +15
    -10
      hc/front/views.py

+ 1
- 0
CHANGELOG.md View File

@ -14,6 +14,7 @@ All notable changes to this project will be documented in this file.
- Fix after-login redirects (the "?next=" query parameter) - Fix after-login redirects (the "?next=" query parameter)
- Update Check.status field when user edits timeout & grace settings - Update Check.status field when user edits timeout & grace settings
- Use timezone-aware datetimes with croniter, avoid ambiguities around DST - Use timezone-aware datetimes with croniter, avoid ambiguities around DST
- Validate and reject cron schedules with six components
## 1.3.0 - 2018-11-21 ## 1.3.0 - 2018-11-21


+ 2
- 2
hc/front/tests/test_cron_preview.py View File

@ -15,8 +15,8 @@ class CronPreviewTestCase(BaseTestCase):
r = self.client.post("/checks/cron_preview/", payload) r = self.client.post("/checks/cron_preview/", payload)
self.assertContains(r, "cron-preview-title", status_code=200) self.assertContains(r, "cron-preview-title", status_code=200)
def test_it_handles_invalid_cron_expression(self):
for schedule in [None, "", "*", "100 100 100 100 100"]:
def test_it_rejects_invalid_cron_expression(self):
for schedule in [None, "", "*", "100 100 100 100 100", "* * * * * *"]:
payload = {"schedule": schedule, "tz": "UTC"} payload = {"schedule": schedule, "tz": "UTC"}
r = self.client.post("/checks/cron_preview/", payload) r = self.client.post("/checks/cron_preview/", payload)
self.assertContains(r, "Invalid cron expression", status_code=200) self.assertContains(r, "Invalid cron expression", status_code=200)


+ 29
- 28
hc/front/tests/test_update_timeout.py View File

@ -13,12 +13,13 @@ class UpdateTimeoutTestCase(BaseTestCase):
self.check.last_ping = timezone.now() self.check.last_ping = timezone.now()
self.check.save() self.check.save()
self.url = "/checks/%s/timeout/" % self.check.code
def test_it_works(self): def test_it_works(self):
url = "/checks/%s/timeout/" % self.check.code
payload = {"kind": "simple", "timeout": 3600, "grace": 60} payload = {"kind": "simple", "timeout": 3600, "grace": 60}
self.client.login(username="[email protected]", password="password") self.client.login(username="[email protected]", password="password")
r = self.client.post(url, data=payload)
r = self.client.post(self.url, data=payload)
self.assertRedirects(r, "/checks/") self.assertRedirects(r, "/checks/")
self.check.refresh_from_db() self.check.refresh_from_db()
@ -34,12 +35,11 @@ class UpdateTimeoutTestCase(BaseTestCase):
self.check.status = "down" self.check.status = "down"
self.check.save() self.check.save()
url = "/checks/%s/timeout/" % self.check.code
# 1 week: # 1 week:
payload = {"kind": "simple", "timeout": 3600 * 24 * 7, "grace": 60} payload = {"kind": "simple", "timeout": 3600 * 24 * 7, "grace": 60}
self.client.login(username="[email protected]", password="password") self.client.login(username="[email protected]", password="password")
r = self.client.post(url, data=payload)
r = self.client.post(self.url, data=payload)
self.assertRedirects(r, "/checks/") self.assertRedirects(r, "/checks/")
self.check.refresh_from_db() self.check.refresh_from_db()
@ -51,7 +51,6 @@ class UpdateTimeoutTestCase(BaseTestCase):
self.assertEqual(flip.new_status, "up") self.assertEqual(flip.new_status, "up")
def test_it_saves_cron_expression(self): def test_it_saves_cron_expression(self):
url = "/checks/%s/timeout/" % self.check.code
payload = { payload = {
"kind": "cron", "kind": "cron",
"schedule": "5 * * * *", "schedule": "5 * * * *",
@ -60,7 +59,7 @@ class UpdateTimeoutTestCase(BaseTestCase):
} }
self.client.login(username="[email protected]", password="password") self.client.login(username="[email protected]", password="password")
r = self.client.post(url, data=payload)
r = self.client.post(self.url, data=payload)
self.assertRedirects(r, "/checks/") self.assertRedirects(r, "/checks/")
self.check.refresh_from_db() self.check.refresh_from_db()
@ -68,10 +67,6 @@ class UpdateTimeoutTestCase(BaseTestCase):
self.assertEqual(self.check.schedule, "5 * * * *") self.assertEqual(self.check.schedule, "5 * * * *")
def test_it_validates_cron_expression(self): def test_it_validates_cron_expression(self):
self.check.last_ping = None
self.check.save()
url = "/checks/%s/timeout/" % self.check.code
payload = { payload = {
"kind": "cron", "kind": "cron",
"schedule": "* invalid *", "schedule": "* invalid *",
@ -80,18 +75,30 @@ class UpdateTimeoutTestCase(BaseTestCase):
} }
self.client.login(username="[email protected]", password="password") self.client.login(username="[email protected]", password="password")
r = self.client.post(url, data=payload)
r = self.client.post(self.url, data=payload)
self.assertEqual(r.status_code, 400) self.assertEqual(r.status_code, 400)
# Check should still have its original data: # Check should still have its original data:
self.check.refresh_from_db() self.check.refresh_from_db()
self.assertEqual(self.check.kind, "simple") self.assertEqual(self.check.kind, "simple")
def test_it_validates_tz(self):
self.check.last_ping = None
self.check.save()
def test_it_rejects_six_field_cron_expression(self):
payload = {
"kind": "cron",
"schedule": "* * * * * *", # six fields instead of five
"tz": "UTC",
"grace": 60
}
url = "/checks/%s/timeout/" % self.check.code
self.client.login(username="[email protected]", password="password")
r = self.client.post(self.url, data=payload)
self.assertEqual(r.status_code, 400)
# Check should still have its original data:
self.check.refresh_from_db()
self.assertEqual(self.check.kind, "simple")
def test_it_validates_tz(self):
payload = { payload = {
"kind": "cron", "kind": "cron",
"schedule": "* * * * *", "schedule": "* * * * *",
@ -100,7 +107,7 @@ class UpdateTimeoutTestCase(BaseTestCase):
} }
self.client.login(username="[email protected]", password="password") self.client.login(username="[email protected]", password="password")
r = self.client.post(url, data=payload)
r = self.client.post(self.url, data=payload)
self.assertEqual(r.status_code, 400) self.assertEqual(r.status_code, 400)
# Check should still have its original data: # Check should still have its original data:
@ -108,7 +115,6 @@ class UpdateTimeoutTestCase(BaseTestCase):
self.assertEqual(self.check.kind, "simple") self.assertEqual(self.check.kind, "simple")
def test_it_rejects_missing_schedule(self): def test_it_rejects_missing_schedule(self):
url = "/checks/%s/timeout/" % self.check.code
# tz field is omitted so this should fail: # tz field is omitted so this should fail:
payload = { payload = {
"kind": "cron", "kind": "cron",
@ -117,11 +123,10 @@ class UpdateTimeoutTestCase(BaseTestCase):
} }
self.client.login(username="[email protected]", password="password") self.client.login(username="[email protected]", password="password")
r = self.client.post(url, data=payload)
r = self.client.post(self.url, data=payload)
self.assertEqual(r.status_code, 400) self.assertEqual(r.status_code, 400)
def test_it_rejects_missing_tz(self): def test_it_rejects_missing_tz(self):
url = "/checks/%s/timeout/" % self.check.code
# tz field is omitted so this should fail: # tz field is omitted so this should fail:
payload = { payload = {
"kind": "cron", "kind": "cron",
@ -130,17 +135,16 @@ class UpdateTimeoutTestCase(BaseTestCase):
} }
self.client.login(username="[email protected]", password="password") self.client.login(username="[email protected]", password="password")
r = self.client.post(url, data=payload)
r = self.client.post(self.url, data=payload)
self.assertEqual(r.status_code, 400) self.assertEqual(r.status_code, 400)
def test_team_access_works(self): def test_team_access_works(self):
url = "/checks/%s/timeout/" % self.check.code
payload = {"kind": "simple", "timeout": 7200, "grace": 60} payload = {"kind": "simple", "timeout": 7200, "grace": 60}
# Logging in as bob, not alice. Bob has team access so this # Logging in as bob, not alice. Bob has team access so this
# should work. # should work.
self.client.login(username="[email protected]", password="password") self.client.login(username="[email protected]", password="password")
self.client.post(url, data=payload)
self.client.post(self.url, data=payload)
check = Check.objects.get(code=self.check.code) check = Check.objects.get(code=self.check.code)
assert check.timeout.total_seconds() == 7200 assert check.timeout.total_seconds() == 7200
@ -163,26 +167,23 @@ class UpdateTimeoutTestCase(BaseTestCase):
assert r.status_code == 404 assert r.status_code == 404
def test_it_checks_ownership(self): def test_it_checks_ownership(self):
url = "/checks/%s/timeout/" % self.check.code
payload = {"timeout": 3600, "grace": 60} payload = {"timeout": 3600, "grace": 60}
self.client.login(username="[email protected]", password="password") self.client.login(username="[email protected]", password="password")
r = self.client.post(url, data=payload)
r = self.client.post(self.url, data=payload)
self.assertEqual(r.status_code, 404) self.assertEqual(r.status_code, 404)
def test_it_rejects_get(self): def test_it_rejects_get(self):
url = "/checks/%s/timeout/" % self.check.code
self.client.login(username="[email protected]", password="password") self.client.login(username="[email protected]", password="password")
r = self.client.get(url)
r = self.client.get(self.url)
self.assertEqual(r.status_code, 405) self.assertEqual(r.status_code, 405)
def test_it_allows_cross_team_access(self): def test_it_allows_cross_team_access(self):
self.bobs_profile.current_team = None self.bobs_profile.current_team = None
self.bobs_profile.save() self.bobs_profile.save()
url = "/checks/%s/timeout/" % self.check.code
payload = {"kind": "simple", "timeout": 3600, "grace": 60} payload = {"kind": "simple", "timeout": 3600, "grace": 60}
self.client.login(username="[email protected]", password="password") self.client.login(username="[email protected]", password="password")
r = self.client.post(url, data=payload)
r = self.client.post(self.url, data=payload)
self.assertRedirects(r, "/checks/") self.assertRedirects(r, "/checks/")

+ 5
- 3
hc/front/validators.py View File

@ -20,9 +20,11 @@ class CronExpressionValidator(object):
message = "Not a valid cron expression." message = "Not a valid cron expression."
def __call__(self, value): def __call__(self, value):
try:
croniter(value)
except:
# Expect 5 components-
if len(value.split()) != 5:
raise ValidationError(message=self.message)
if not croniter.is_valid(value):
raise ValidationError(message=self.message) raise ValidationError(message=self.message)


+ 15
- 10
hc/front/views.py View File

@ -321,20 +321,25 @@ def update_timeout(request, code):
@require_POST @require_POST
def cron_preview(request): def cron_preview(request):
schedule = request.POST.get("schedule")
schedule = request.POST.get("schedule", "")
tz = request.POST.get("tz") tz = request.POST.get("tz")
ctx = {"tz": tz, "dates": []} ctx = {"tz": tz, "dates": []}
try:
zone = pytz.timezone(tz)
now_local = timezone.localtime(timezone.now(), zone)
it = croniter(schedule, now_local)
for i in range(0, 6):
ctx["dates"].append(it.get_next(datetime))
except UnknownTimeZoneError:
ctx["bad_tz"] = True
except:
if len(schedule.split()) != 5:
ctx["bad_schedule"] = True
elif not croniter.is_valid(schedule):
ctx["bad_schedule"] = True ctx["bad_schedule"] = True
if "bad_schedule" not in ctx:
try:
zone = pytz.timezone(tz)
now_local = timezone.localtime(timezone.now(), zone)
it = croniter(schedule, now_local)
for i in range(0, 6):
ctx["dates"].append(it.get_next(datetime))
except UnknownTimeZoneError:
ctx["bad_tz"] = True
return render(request, "front/cron_preview.html", ctx) return render(request, "front/cron_preview.html", ctx)


Loading…
Cancel
Save