- from datetime import timedelta
- import random
- from secrets import token_urlsafe
- from urllib.parse import quote, urlencode
- import uuid
-
- from django.conf import settings
- from django.contrib.auth.hashers import check_password, make_password
- from django.contrib.auth.models import User
- from django.core.signing import TimestampSigner
- from django.db import models
- from django.db.models import Count, Q
- from django.urls import reverse
- from django.utils import timezone
- from fido2.ctap2 import AttestedCredentialData
- from hc.lib import emails
- from hc.lib.date import month_boundaries
- import pytz
-
- NO_NAG = timedelta()
- NAG_PERIODS = (
- (NO_NAG, "Disabled"),
- (timedelta(hours=1), "Hourly"),
- (timedelta(days=1), "Daily"),
- )
-
- REPORT_CHOICES = (("off", "Off"), ("weekly", "Weekly"), ("monthly", "Monthly"))
-
-
- def month(dt):
- """ For a given datetime, return the matching first-day-of-month date. """
- return dt.date().replace(day=1)
-
-
- class ProfileManager(models.Manager):
- def for_user(self, user):
- try:
- return user.profile
- except Profile.DoesNotExist:
- profile = Profile(user=user)
- if not settings.USE_PAYMENTS:
- # If not using payments, set high limits
- profile.check_limit = 500
- profile.sms_limit = 500
- profile.call_limit = 500
- profile.team_limit = 500
-
- profile.save()
- return profile
-
-
- class Profile(models.Model):
- user = models.OneToOneField(User, models.CASCADE, blank=True, null=True)
- next_report_date = models.DateTimeField(null=True, blank=True)
- reports = models.CharField(max_length=10, default="monthly", choices=REPORT_CHOICES)
- nag_period = models.DurationField(default=NO_NAG, choices=NAG_PERIODS)
- next_nag_date = models.DateTimeField(null=True, blank=True)
- ping_log_limit = models.IntegerField(default=100)
- check_limit = models.IntegerField(default=20)
- token = models.CharField(max_length=128, blank=True)
-
- last_sms_date = models.DateTimeField(null=True, blank=True)
- sms_limit = models.IntegerField(default=5)
- sms_sent = models.IntegerField(default=0)
-
- last_call_date = models.DateTimeField(null=True, blank=True)
- call_limit = models.IntegerField(default=0)
- calls_sent = models.IntegerField(default=0)
-
- team_limit = models.IntegerField(default=2)
- sort = models.CharField(max_length=20, default="created")
- deletion_notice_date = models.DateTimeField(null=True, blank=True)
- last_active_date = models.DateTimeField(null=True, blank=True)
- tz = models.CharField(max_length=36, default="UTC")
- theme = models.CharField(max_length=10, null=True, blank=True)
-
- totp = models.CharField(max_length=32, null=True, blank=True)
- totp_created = models.DateTimeField(null=True, blank=True)
-
- objects = ProfileManager()
-
- def __str__(self):
- return "Profile for %s" % self.user.email
-
- def notifications_url(self):
- return settings.SITE_ROOT + reverse("hc-notifications")
-
- def reports_unsub_url(self):
- signer = TimestampSigner(salt="reports")
- signed_username = signer.sign(self.user.username)
- path = reverse("hc-unsubscribe-reports", args=[signed_username])
- return settings.SITE_ROOT + path
-
- def prepare_token(self, salt):
- token = token_urlsafe(24)
- self.token = make_password(token, salt)
- self.save()
- return token
-
- def check_token(self, token, salt):
- return salt in self.token and check_password(token, self.token)
-
- def send_instant_login_link(self, inviting_project=None, redirect_url=None):
- token = self.prepare_token("login")
- path = reverse("hc-check-token", args=[self.user.username, token])
- if redirect_url:
- path += "?next=%s" % redirect_url
-
- ctx = {
- "button_text": "Sign In",
- "button_url": settings.SITE_ROOT + path,
- "inviting_project": inviting_project,
- }
- emails.login(self.user.email, ctx)
-
- def send_transfer_request(self, project):
- token = self.prepare_token("login")
- settings_path = reverse("hc-project-settings", args=[project.code])
- path = reverse("hc-check-token", args=[self.user.username, token])
- path += "?next=%s" % settings_path
-
- ctx = {
- "button_text": "Project Settings",
- "button_url": settings.SITE_ROOT + path,
- "project": project,
- }
- emails.transfer_request(self.user.email, ctx)
-
- def send_sms_limit_notice(self, transport):
- ctx = {"transport": transport, "limit": self.sms_limit}
- if self.sms_limit != 500 and settings.USE_PAYMENTS:
- ctx["url"] = settings.SITE_ROOT + reverse("hc-pricing")
-
- emails.sms_limit(self.user.email, ctx)
-
- def send_call_limit_notice(self):
- ctx = {"limit": self.call_limit}
- if self.call_limit != 500 and settings.USE_PAYMENTS:
- ctx["url"] = settings.SITE_ROOT + reverse("hc-pricing")
-
- emails.call_limit(self.user.email, ctx)
-
- def projects(self):
- """ Return a queryset of all projects we have access to. """
-
- is_owner = Q(owner_id=self.user_id)
- is_member = Q(member__user_id=self.user_id)
- q = Project.objects.filter(is_owner | is_member)
- return q.distinct().order_by("name")
-
- def annotated_projects(self):
- """ Return all projects, annotated with 'n_down'. """
-
- # Subquery for getting project ids
- project_ids = self.projects().values("id")
-
- # Main query with the n_down annotation.
- # Must use the subquery, otherwise ORM gets confused by
- # joins and group by's
- q = Project.objects.filter(id__in=project_ids)
- n_down = Count("check", filter=Q(check__status="down"))
- q = q.annotate(n_down=n_down)
- return q.order_by("name")
-
- def checks_from_all_projects(self):
- """ Return a queryset of checks from projects we have access to. """
-
- project_ids = self.projects().values("id")
-
- from hc.api.models import Check
-
- return Check.objects.filter(project_id__in=project_ids)
-
- def send_report(self, nag=False):
- checks = self.checks_from_all_projects()
-
- # Has there been a ping in last 6 months?
- result = checks.aggregate(models.Max("last_ping"))
- last_ping = result["last_ping__max"]
-
- six_months_ago = timezone.now() - timedelta(days=180)
- if last_ping is None or last_ping < six_months_ago:
- return False
-
- # Is there at least one check that is down?
- num_down = checks.filter(status="down").count()
- if nag and num_down == 0:
- return False
-
- # Sort checks by project. Need this because will group by project in
- # template.
- checks = checks.select_related("project")
- checks = checks.order_by("project_id")
- # list() executes the query, to avoid DB access while
- # rendering the template
- checks = list(checks)
-
- unsub_url = self.reports_unsub_url()
-
- headers = {
- "List-Unsubscribe": "<%s>" % unsub_url,
- "X-Bounce-Url": unsub_url,
- "List-Unsubscribe-Post": "List-Unsubscribe=One-Click",
- }
-
- boundaries = month_boundaries(months=3)
- # throw away the current month, keep two previous months
- boundaries.pop()
-
- ctx = {
- "checks": checks,
- "sort": self.sort,
- "now": timezone.now(),
- "unsub_link": unsub_url,
- "notifications_url": self.notifications_url(),
- "nag": nag,
- "nag_period": self.nag_period.total_seconds(),
- "num_down": num_down,
- "month_boundaries": boundaries,
- "monthly_or_weekly": self.reports,
- }
-
- emails.report(self.user.email, ctx, headers)
- return True
-
- def sms_sent_this_month(self):
- # IF last_sms_date was never set, we have not sent any messages yet.
- if not self.last_sms_date:
- return 0
-
- # If last sent date is not from this month, we've sent 0 this month.
- if month(timezone.now()) > month(self.last_sms_date):
- return 0
-
- return self.sms_sent
-
- def authorize_sms(self):
- """ If monthly limit not exceeded, increase counter and return True """
-
- sent_this_month = self.sms_sent_this_month()
- if sent_this_month >= self.sms_limit:
- return False
-
- self.sms_sent = sent_this_month + 1
- self.last_sms_date = timezone.now()
- self.save()
- return True
-
- def calls_sent_this_month(self):
- # IF last_call_date was never set, we have not made any phone calls yet.
- if not self.last_call_date:
- return 0
-
- # If last sent date is not from this month, we've made 0 calls this month.
- if month(timezone.now()) > month(self.last_call_date):
- return 0
-
- return self.calls_sent
-
- def authorize_call(self):
- """ If monthly limit not exceeded, increase counter and return True """
-
- sent_this_month = self.calls_sent_this_month()
- if sent_this_month >= self.call_limit:
- return False
-
- self.calls_sent = sent_this_month + 1
- self.last_call_date = timezone.now()
- self.save()
- return True
-
- def num_checks_used(self):
- from hc.api.models import Check
-
- return Check.objects.filter(project__owner_id=self.user_id).count()
-
- def num_checks_available(self):
- return self.check_limit - self.num_checks_used()
-
- def can_accept(self, project):
- return project.num_checks() <= self.num_checks_available()
-
- def update_next_nag_date(self):
- any_down = self.checks_from_all_projects().filter(status="down").exists()
- if any_down and self.next_nag_date is None and self.nag_period:
- self.next_nag_date = timezone.now() + self.nag_period
- self.save(update_fields=["next_nag_date"])
- elif not any_down and self.next_nag_date:
- self.next_nag_date = None
- self.save(update_fields=["next_nag_date"])
-
- def choose_next_report_date(self):
- """ Calculate the target date for the next monthly/weekly report.
-
- Monthly reports should get sent on 1st of each month, between
- 9AM and 11AM in user's timezone.
-
- Weekly reports should get sent on Mondays, between
- 9AM and 11AM in user's timezone.
-
- """
-
- if self.reports == "off":
- return None
-
- tz = pytz.timezone(self.tz)
- dt = timezone.now().astimezone(tz)
- dt = dt.replace(hour=9, minute=0) + timedelta(minutes=random.randrange(0, 120))
-
- while True:
- dt += timedelta(days=1)
- if self.reports == "monthly" and dt.day == 1:
- return dt
- elif self.reports == "weekly" and dt.weekday() == 0:
- return dt
-
-
- class Project(models.Model):
- code = models.UUIDField(default=uuid.uuid4, unique=True)
- name = models.CharField(max_length=200, blank=True)
- owner = models.ForeignKey(User, models.CASCADE)
- api_key = models.CharField(max_length=128, blank=True, db_index=True)
- api_key_readonly = models.CharField(max_length=128, blank=True, db_index=True)
- badge_key = models.CharField(max_length=150, unique=True)
-
- def __str__(self):
- return self.name or self.owner.email
-
- @property
- def owner_profile(self):
- return Profile.objects.for_user(self.owner)
-
- def num_checks(self):
- return self.check_set.count()
-
- def num_checks_available(self):
- return self.owner_profile.num_checks_available()
-
- def set_api_keys(self):
- self.api_key = token_urlsafe(nbytes=24)
- self.api_key_readonly = token_urlsafe(nbytes=24)
- self.save()
-
- def invite_suggestions(self):
- q = User.objects.filter(memberships__project__owner_id=self.owner_id)
- q = q.exclude(memberships__project=self)
- return q.distinct().order_by("email")
-
- def can_invite_new_users(self):
- q = User.objects.filter(memberships__project__owner_id=self.owner_id)
- used = q.distinct().count()
- return used < self.owner_profile.team_limit
-
- def invite(self, user, role):
- if Member.objects.filter(user=user, project=self).exists():
- return False
-
- if self.owner_id == user.id:
- return False
-
- Member.objects.create(user=user, project=self, role=role)
- checks_url = reverse("hc-checks", args=[self.code])
- user.profile.send_instant_login_link(self, redirect_url=checks_url)
- return True
-
- def update_next_nag_dates(self):
- """ Update next_nag_date on profiles of all members of this project. """
-
- is_owner = Q(user_id=self.owner_id)
- is_member = Q(user__memberships__project=self)
- q = Profile.objects.filter(is_owner | is_member).exclude(nag_period=NO_NAG)
-
- for profile in q:
- profile.update_next_nag_date()
-
- def overall_status(self):
- if not hasattr(self, "_overall_status"):
- self._overall_status = "up"
- for check in self.check_set.all():
- check_status = check.get_status()
- if check_status == "grace" and self._overall_status == "up":
- self._overall_status = "grace"
- elif check_status == "down":
- self._overall_status = "down"
- break
-
- return self._overall_status
-
- def get_n_down(self):
- result = 0
- for check in self.check_set.all():
- if check.get_status() == "down":
- result += 1
-
- return result
-
- def have_channel_issues(self):
- errors = list(self.channel_set.values_list("last_error", flat=True))
-
- # It's a problem if a project has no integrations at all
- if len(errors) == 0:
- return True
-
- # It's a problem if any integration has a logged error
- return True if max(errors) else False
-
- def transfer_request(self):
- return self.member_set.filter(transfer_request_date__isnull=False).first()
-
- def dashboard_url(self):
- if not self.api_key_readonly:
- return None
-
- frag = urlencode({self.api_key_readonly: str(self)}, quote_via=quote)
- return reverse("hc-dashboard") + "#" + frag
-
- def checks_url(self):
- return settings.SITE_ROOT + reverse("hc-checks", args=[self.code])
-
-
- class Member(models.Model):
- class Role(models.TextChoices):
- READONLY = "r", "Read-only"
- REGULAR = "w", "Member"
- MANAGER = "m", "Manager"
-
- user = models.ForeignKey(User, models.CASCADE, related_name="memberships")
- project = models.ForeignKey(Project, models.CASCADE)
- transfer_request_date = models.DateTimeField(null=True, blank=True)
- role = models.CharField(max_length=1, default=Role.REGULAR, choices=Role.choices)
-
- class Meta:
- constraints = [
- models.UniqueConstraint(
- fields=["user", "project"], name="accounts_member_no_duplicates"
- )
- ]
-
- def can_accept(self):
- return self.user.profile.can_accept(self.project)
-
- @property
- def is_rw(self):
- return self.role in (Member.Role.REGULAR, Member.Role.MANAGER)
-
-
- class Credential(models.Model):
- code = models.UUIDField(default=uuid.uuid4, unique=True)
- name = models.CharField(max_length=100)
- user = models.ForeignKey(User, models.CASCADE, related_name="credentials")
- created = models.DateTimeField(auto_now_add=True)
- data = models.BinaryField()
-
- def unpack(self):
- unpacked, remaining_data = AttestedCredentialData.unpack_from(self.data)
- return unpacked
|