You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

125 lines
4.6 KiB

  1. from hc.api.models import Check, Ping
  2. from hc.test import BaseTestCase
  3. from hc.api.management.commands.smtpd import _process_message
  4. PAYLOAD_TMPL = """
  5. From: "User Name" <username@gmail.com>
  6. To: "John Smith" <john@example.com>
  7. Subject: %s
  8. ...
  9. """.strip()
  10. class SmtpdTestCase(BaseTestCase):
  11. def setUp(self):
  12. super().setUp()
  13. self.check = Check.objects.create(project=self.project)
  14. self.email = "%s@does.not.matter" % self.check.code
  15. def test_it_works(self):
  16. _process_message("1.2.3.4", "[email protected]", self.email, b"hello world")
  17. ping = Ping.objects.latest("id")
  18. self.assertEqual(ping.scheme, "email")
  19. self.assertEqual(ping.ua, "Email from [email protected]")
  20. self.assertEqual(ping.body, "hello world")
  21. self.assertEqual(ping.kind, None)
  22. def test_it_handles_subject_filter_match(self):
  23. self.check.subject = "SUCCESS"
  24. self.check.save()
  25. body = PAYLOAD_TMPL % "[SUCCESS] Backup completed"
  26. _process_message("1.2.3.4", "[email protected]", self.email, body.encode("utf8"))
  27. ping = Ping.objects.latest("id")
  28. self.assertEqual(ping.scheme, "email")
  29. self.assertEqual(ping.ua, "Email from [email protected]")
  30. self.assertEqual(ping.kind, None)
  31. def test_it_handles_subject_filter_miss(self):
  32. self.check.subject = "SUCCESS"
  33. self.check.save()
  34. body = PAYLOAD_TMPL % "[FAIL] Backup did not complete"
  35. _process_message("1.2.3.4", "[email protected]", self.email, body.encode("utf8"))
  36. ping = Ping.objects.latest("id")
  37. self.assertEqual(ping.scheme, "email")
  38. self.assertEqual(ping.ua, "Email from [email protected]")
  39. self.assertEqual(ping.kind, "ign")
  40. def test_it_handles_subject_fail_filter_match(self):
  41. self.check.subject_fail = "FAIL"
  42. self.check.save()
  43. body = PAYLOAD_TMPL % "[FAIL] Backup did not complete"
  44. _process_message("1.2.3.4", "[email protected]", self.email, body.encode("utf8"))
  45. ping = Ping.objects.latest("id")
  46. self.assertEqual(ping.scheme, "email")
  47. self.assertEqual(ping.ua, "Email from [email protected]")
  48. self.assertEqual(ping.kind, "fail")
  49. def test_it_handles_subject_fail_filter_miss(self):
  50. self.check.subject_fail = "FAIL"
  51. self.check.save()
  52. body = PAYLOAD_TMPL % "[SUCCESS] Backup completed"
  53. _process_message("1.2.3.4", "[email protected]", self.email, body.encode("utf8"))
  54. ping = Ping.objects.latest("id")
  55. self.assertEqual(ping.scheme, "email")
  56. self.assertEqual(ping.ua, "Email from [email protected]")
  57. self.assertEqual(ping.kind, "ign")
  58. def test_it_handles_multiple_subject_keywords(self):
  59. self.check.subject = "SUCCESS, OK"
  60. self.check.save()
  61. body = PAYLOAD_TMPL % "[OK] Backup completed"
  62. _process_message("1.2.3.4", "[email protected]", self.email, body.encode("utf8"))
  63. ping = Ping.objects.latest("id")
  64. self.assertEqual(ping.scheme, "email")
  65. self.assertEqual(ping.ua, "Email from [email protected]")
  66. self.assertEqual(ping.kind, None)
  67. def test_it_handles_multiple_subject_fail_keywords(self):
  68. self.check.subject_fail = "FAIL, WARNING"
  69. self.check.save()
  70. body = PAYLOAD_TMPL % "[WARNING] Backup did not complete"
  71. _process_message("1.2.3.4", "[email protected]", self.email, body.encode("utf8"))
  72. ping = Ping.objects.latest("id")
  73. self.assertEqual(ping.scheme, "email")
  74. self.assertEqual(ping.ua, "Email from [email protected]")
  75. self.assertEqual(ping.kind, "fail")
  76. def test_it_handles_subject_fail_before_success(self):
  77. self.check.subject = "SUCCESS"
  78. self.check.subject_fail = "FAIL"
  79. self.check.save()
  80. body = PAYLOAD_TMPL % "[SUCCESS] 1 Backup completed, [FAIL] 1 Backup did not complete"
  81. _process_message("1.2.3.4", "[email protected]", self.email, body.encode("utf8"))
  82. ping = Ping.objects.latest("id")
  83. self.assertEqual(ping.scheme, "email")
  84. self.assertEqual(ping.ua, "Email from [email protected]")
  85. self.assertEqual(ping.kind, "fail")
  86. def test_it_handles_encoded_subject(self):
  87. self.check.subject = "SUCCESS"
  88. self.check.save()
  89. body = PAYLOAD_TMPL % "=?US-ASCII?B?W1NVQ0NFU1NdIEJhY2t1cCBjb21wbGV0ZWQ=?="
  90. _process_message("1.2.3.4", "[email protected]", self.email, body.encode("utf8"))
  91. ping = Ping.objects.latest("id")
  92. self.assertEqual(ping.scheme, "email")
  93. self.assertEqual(ping.ua, "Email from [email protected]")
  94. self.assertEqual(ping.kind, None)