From b5a0ff35387ae6e6cfafe93eeb4d2a9aae023ae3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C4=93teris=20Caune?= Date: Thu, 15 Sep 2016 22:52:48 +0300 Subject: [PATCH] Handle concurrent sendalerts using QuerySet.update(). Fixes #39 --- hc/api/management/commands/sendalerts.py | 88 ++++++++++++------------ hc/api/tests/test_sendalerts.py | 23 +------ requirements.txt | 1 - 3 files changed, 44 insertions(+), 68 deletions(-) diff --git a/hc/api/management/commands/sendalerts.py b/hc/api/management/commands/sendalerts.py index 33d9906d..420d5c9f 100644 --- a/hc/api/management/commands/sendalerts.py +++ b/hc/api/management/commands/sendalerts.py @@ -1,70 +1,68 @@ -import logging import time +from threading import Thread -from concurrent.futures import ThreadPoolExecutor from django.core.management.base import BaseCommand from django.db import connection from django.utils import timezone from hc.api.models import Check -executor = ThreadPoolExecutor(max_workers=10) -logger = logging.getLogger(__name__) + +def notify(check_id, stdout): + check = Check.objects.get(id=check_id) + + tmpl = "\nSending alert, status=%s, code=%s\n" + stdout.write(tmpl % (check.status, check.code)) + errors = check.send_alert() + for ch, error in errors: + stdout.write("ERROR: %s %s %s\n" % (ch.kind, ch.value, error)) class Command(BaseCommand): help = 'Sends UP/DOWN email alerts' + owned = Check.objects.filter(user__isnull=False) - def handle_many(self): - """ Send alerts for many checks simultaneously. """ - query = Check.objects.filter(user__isnull=False).select_related("user") + def handle_one(self): + """ Process a single check. """ now = timezone.now() - going_down = query.filter(alert_after__lt=now, status="up") - going_up = query.filter(alert_after__gt=now, status="down") - # Don't combine this in one query so Postgres can query using index: - checks = list(going_down.iterator()) + list(going_up.iterator()) - if not checks: - return False - - futures = [executor.submit(self.handle_one, check) for check in checks] - for future in futures: - future.result() - - return True - - def handle_one(self, check): - """ Send an alert for a single check. - - Return True if an appropriate check was selected and processed. - Return False if no checks need to be processed. - """ - - # Save the new status. If sendalerts crashes, - # it won't process this check again. - check.status = check.get_status() - check.save() - - tmpl = "\nSending alert, status=%s, code=%s\n" - self.stdout.write(tmpl % (check.status, check.code)) - errors = check.send_alert() - for ch, error in errors: - self.stdout.write("ERROR: %s %s %s\n" % (ch.kind, ch.value, error)) - - connection.close() - return True + # Look for checks that are going down + flipped = "down" + q = self.owned.filter(alert_after__lt=now, status="up") + check = q.first() + + if not check: + # If none found, look for checks that are going up + flipped = "up" + q = self.owned.filter(alert_after__gt=now, status="down") + check = q.first() + + if check: + # Atomically update status to the opposite + q = Check.objects.filter(id=check.id, status=check.status) + num_updated = q.update(status=flipped) + if num_updated == 1: + # Send notifications only if status update succeeded + # (no other sendalerts process got there first) + t = Thread(target=notify, args=(check.id, self.stdout)) + t.start() + return True + + return False def handle(self, *args, **options): self.stdout.write("sendalerts is now running") ticks = 0 while True: - if self.handle_many(): - ticks = 1 - else: - ticks += 1 - time.sleep(1) + while self.handle_one(): + ticks = 0 + + ticks += 1 + time.sleep(2) if ticks % 60 == 0: formatted = timezone.now().isoformat() self.stdout.write("-- MARK %s --" % formatted) + + connection.close() diff --git a/hc/api/tests/test_sendalerts.py b/hc/api/tests/test_sendalerts.py index b4c17e94..5ee29d2a 100644 --- a/hc/api/tests/test_sendalerts.py +++ b/hc/api/tests/test_sendalerts.py @@ -4,31 +4,10 @@ from django.utils import timezone from hc.api.management.commands.sendalerts import Command from hc.api.models import Check from hc.test import BaseTestCase -from mock import patch class SendAlertsTestCase(BaseTestCase): - @patch("hc.api.management.commands.sendalerts.Command.handle_one") - def test_it_handles_few(self, mock): - yesterday = timezone.now() - timedelta(days=1) - names = ["Check %d" % d for d in range(0, 10)] - - for name in names: - check = Check(user=self.alice, name=name) - check.alert_after = yesterday - check.status = "up" - check.save() - - result = Command().handle_many() - assert result, "handle_many should return True" - - handled_names = [] - for args, kwargs in mock.call_args_list: - handled_names.append(args[0].name) - - assert set(names) == set(handled_names) - def test_it_handles_grace_period(self): check = Check(user=self.alice, status="up") # 1 day 30 minutes after ping the check is in grace period: @@ -36,4 +15,4 @@ class SendAlertsTestCase(BaseTestCase): check.save() # Expect no exceptions-- - Command().handle_one(check) + Command().handle_one() diff --git a/requirements.txt b/requirements.txt index c21bee1e..1282ec4c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,7 +3,6 @@ django-ses-backend==0.1.1 Django==1.10 django_compressor==2.1 djmail==0.11.0 -futures==3.0.3 premailer==2.9.6 psycopg2==2.6.1 requests==2.9.1