import itertools
import logging
import re
from datetime import datetime
from dateutil.relativedelta import relativedelta
from django.conf import settings
from django.contrib.auth import get_user_model
from django.contrib.auth.models import Permission
from django.contrib.contenttypes.models import ContentType
from django.core.exceptions import ObjectDoesNotExist, PermissionDenied
from django.db import models
from django.db.models import (
BooleanField,
CharField,
Count,
F,
Func,
IntegerField,
Prefetch,
Q,
expressions,
)
from django.db.models.functions import Cast
from django.db.models.query import QuerySet
from django.db.models.signals import post_save, pre_delete
from django.template import Context, Template
from django.urls import reverse
from django.utils import timezone
from django.utils.translation import gettext
from django.utils.translation import gettext_lazy as _
from guardian.models import GroupObjectPermissionBase, UserObjectPermissionBase
from guardian.shortcuts import assign_perm, get_objects_for_user, get_users_with_perms
from model_utils import Choices
from model_utils.fields import MonitorField, StatusField
from poradnia.template_mail.utils import TemplateKey, TemplateMailManager
from poradnia.utils.constants import NAME_MAX_LENGTH
from poradnia.utils.mixins import FormattedDatetimeMixin, UserPrettyNameMixin
from poradnia.utils.utils import get_numeric_param
# TODO: move to settings and fix for DEV and DEMO modes
CASE_PK_RE = r"sprawa-(?P<pk>\d+)@porady.siecobywatelska.pl"
logger = logging.getLogger(__name__)
def delete_files_for_cases(cases):
from poradnia.letters.models import Attachment, Letter
def delete_qs(qs, field):
for x in qs:
if not getattr(x, field):
continue
getattr(x, field).delete()
delete_qs(Attachment.objects.filter(letter__case__in=cases), "attachment")
delete_qs(Letter.objects.filter(case__in=cases), "eml")
[dokumentacja]
class CaseQuerySet(FormattedDatetimeMixin, UserPrettyNameMixin, QuerySet):
def for_assign(self, user):
return self.filter(
caseuserobjectpermission__user=user,
caseuserobjectpermission__permission__codename="can_view",
)
def for_user(self, user):
return get_objects_for_user(user, "can_view", self)
def with_perm(self):
return self.prefetch_related("caseuserobjectpermission_set")
def with_record_count(self):
return self.annotate(record_count=Count("record"))
def with_involved_staff(self):
qs = (
CaseUserObjectPermission.objects.filter(user__is_staff=True)
.select_related("permission", "user")
.all()
)
return self.prefetch_related(
Prefetch("caseuserobjectpermission_set", queryset=qs)
)
def involved_staff(self):
involved_staff_ids = set(
CaseUserObjectPermission.objects.filter(user__is_staff=True)
.select_related("permission", "user")
.values_list("user", flat=True)
.all()
)
qs = (
get_user_model()
.objects.filter(id__in=involved_staff_ids)
.order_by("nicename")
)
return qs
def by_involved_in(self, user, by_user=True, by_group=False):
condition = Q()
if by_user:
condition = condition | Q(caseuserobjectpermission__user=user)
if by_group:
condition = condition | Q(casegroupobjectpermission__group__user=user)
return self.filter(condition)
def by_msg(self, message):
envelope = message.get_email_object().get(
"Envelope-To"
) or message.get_email_object().get("To")
if not envelope:
return self.none()
result = re.search(CASE_PK_RE, envelope)
if not result:
return self.none()
return self.filter(pk=result.group("pk"))
def by_addresses(self, addresses):
pks = [
re.match(CASE_PK_RE, address).group("pk")
for address in addresses
if re.match(CASE_PK_RE, address)
]
return self.filter(pk__in=pks)
def order_for_user(self, user, is_next):
order = "" if is_next else "-"
if user.is_staff:
field_name = self.model.STAFF_ORDER_DEFAULT_FIELD
else:
field_name = self.model.USER_ORDER_DEFAULT_FIELD
return self.order_by(
"{}{}".format(order, field_name), "{}{}".format(order, "pk")
)
def with_month_year(self):
return self.annotate(
month=Func(F("created_on"), function="month", output_field=IntegerField())
).annotate(
year=Func(F("created_on"), function="year", output_field=IntegerField())
)
def with_advice_status(self):
return self.annotate(advice_count=Count("advice")).annotate(
has_advice=expressions.Case(
expressions.When(advice_count=0, then=False),
default=True,
output_field=BooleanField(),
)
)
def with_formatted_deadline(self):
return self.annotate(
# TODO add explicit datetime formatting with TZ for MySql
deadline_str=Cast("deadline__time", output_field=CharField())
)
def area_filter(self, jst):
return self.filter(
advice__jst__tree_id=jst.tree_id,
advice__jst__lft__range=(jst.lft, jst.rght),
)
def ajax_boolean_filter(self, request, prefix, field):
filter_values = []
for choice in [("yes", True), ("no", False)]:
filter_name = prefix + choice[0]
if get_numeric_param(request, filter_name):
filter_values.append(choice[1])
if filter_values:
return self.filter(**{field + "__in": filter_values})
else:
return self.filter(**{field + "__isnull": True})
def ajax_status_filter(self, request):
choices = Case.STATUS._identifier_map
filter_values = []
for choice in choices.keys():
filter_name = "status_" + choice.lower()
if get_numeric_param(request, filter_name):
filter_values.append(choices[choice])
if filter_values:
return self.filter(status__in=filter_values)
else:
return self.filter(status__isnull=True)
def ajax_involved_staff_filter(self, request):
involved_staff_filter = get_numeric_param(request, "involved_staff_filter")
if involved_staff_filter:
involved = get_user_model().objects.filter(id=involved_staff_filter).first()
return self.by_involved_in(user=involved).distinct()
return self
def ajax_has_deadline_filter(self, request):
# to provide empty queryset when none of the options is selected
deadline_query = Q(deadline=0)
# build query for deadline according to user selection
for choice in [("yes", False), ("no", True)]:
filter_name = "has_deadline_" + choice[0]
if get_numeric_param(request, filter_name):
deadline_query |= Q(deadline__isnull=choice[1])
return self.filter(deadline_query)
def old_cases_to_delete(self):
years_to_store = settings.YEARS_TO_STORE_CASES
current_month = datetime.now().replace(
day=1, hour=0, minute=0, second=0, microsecond=0
)
oldest_date = current_month + relativedelta(years=-years_to_store)
return self.filter(last_action__lt=oldest_date)
[dokumentacja]
class Case(models.Model):
STAFF_ORDER_DEFAULT_FIELD = "last_action"
USER_ORDER_DEFAULT_FIELD = "last_send"
STATUS = Choices(
("0", "free", _("free")),
("3", "moderated", _("moderated")),
("1", "assigned", _("assigned")),
("2", "closed", _("closed")),
)
id = models.AutoField(verbose_name=_("Case number"), primary_key=True)
name = models.CharField(max_length=NAME_MAX_LENGTH, verbose_name=_("Subject"))
status = StatusField()
status_changed = MonitorField(monitor="status")
client = models.ForeignKey(
to=settings.AUTH_USER_MODEL,
related_name="case_client",
on_delete=models.CASCADE,
verbose_name=_("Client"),
)
letter_count = models.IntegerField(default=0, verbose_name=_("Letter count"))
last_send = models.DateTimeField(null=True, blank=True, verbose_name=_("Last send"))
last_action = models.DateTimeField(
null=True, blank=True, verbose_name=_("Last action")
)
last_received = models.DateTimeField(
null=True, blank=True, verbose_name=_("Last received")
)
deadline = models.ForeignKey(
to="events.Event",
null=True,
blank=True,
related_name="event_deadline",
on_delete=models.CASCADE,
verbose_name=_("Dead-line"),
)
objects = CaseQuerySet.as_manager()
created_by = models.ForeignKey(
to=settings.AUTH_USER_MODEL,
related_name="case_created",
verbose_name=_("Created by"),
on_delete=models.CASCADE,
)
created_on = models.DateTimeField(auto_now_add=True, verbose_name=_("Created on"))
modified_by = models.ForeignKey(
to=settings.AUTH_USER_MODEL,
null=True,
on_delete=models.CASCADE,
related_name="case_modified",
verbose_name=_("Modified by"),
)
modified_on = models.DateTimeField(
auto_now=True, null=True, blank=True, verbose_name=_("Modified on")
)
handled = models.BooleanField(default=False, verbose_name=_("Handled"))
has_project = models.BooleanField(default=False, verbose_name=_("Has project"))
def status_display(self):
return self.STATUS[self.status]
def get_absolute_url(self):
return reverse("cases:detail", kwargs={"pk": str(self.pk)})
def render_case_link(self):
url = self.get_absolute_url()
label = self.name
bold_start = "" if self.handled else "<b>"
bold_end = "" if self.handled else "</b>"
return f'{bold_start}<a href="{url}">{label}</a>{bold_end}'
def render_case_link_formatted(self, user):
url = self.get_absolute_url()
label = self.name
template_string = """
{% load cases_tags %}
<span class="{{ object.status|status2css }}">
{% if user.is_staff and not object.handled %}<b>{% endif %}
<a href="{{ url }}">{{ label }}</a>
{% if user.is_staff and not object.handled %}</b>{% endif %}
</span>"""
template = Template(template_string)
context = Context({"object": self, "url": url, "label": label, "user": user})
return template.render(context=context)
def render_status(self):
STATUS_STYLE = {
"0": "fa fa-circle-o ",
"1": "fa fa-dot-circle-o",
"3": "fa fa-plus-square-o",
"2": "fa fa-circle",
}
status_icon = STATUS_STYLE[self.status]
return f'<span class="{status_icon}"></span>'
def render_project_badge(self):
if self.has_project:
title = _("Reply to client to remove badge")
name = _("Project")
return f"""
<span class="label label-success" title="{title}">
<i class="fa fa-pencil"></i> {name}
</span>
"""
else:
return ""
def render_handled(self):
if not self.handled:
return '<span class="fa fa-check"></span>'
else:
return ""
def render_involved_staff(self):
permissions = self.caseuserobjectpermission_set.all()
grouped_permissions = itertools.groupby(permissions, lambda p: p.user)
user_list = [
{
"grouper": key,
"title": ", ".join([p.permission.name for p in list(group)]),
}
for key, group in grouped_permissions
if key.is_staff
]
return "<br>".join(
[
f"""
<span class="label label-info" title="{user["title"]}">
{user["grouper"].get_nicename()}</span>
"""
for user in user_list
]
)
def render_case_advice_link(self):
try:
return self.advice.render_advice_link()
except ObjectDoesNotExist:
return ""
def get_edit_url(self):
return reverse("cases:edit", kwargs={"pk": str(self.pk)})
def get_close_url(self):
return reverse("cases:close", kwargs={"pk": str(self.pk)})
def get_users_with_perms(self, *args, **kwargs):
return get_users_with_perms(self, with_group_users=False, *args, **kwargs)
def __str__(self):
return self.name
def get_email(self):
return settings.PORADNIA_EMAIL_OUTPUT % self.__dict__
# TODO: Remove
def perm_check(self, user, perm):
if not (user.has_perm("cases." + perm) or user.has_perm("cases." + perm, self)):
raise PermissionDenied
return True
class Meta:
ordering = ["last_send"]
verbose_name = _("Case")
verbose_name_plural = _("Cases")
permissions = (
("can_view", _("Can view")),
("can_assign", _("Can assign new permissions")),
("can_send_to_client", _("Can send text to client")),
("can_manage_permission", _("Can assign permission")),
("can_add_record", _("Can add record")),
("can_change_own_record", _("Can change own records")),
("can_change_all_record", _("Can change all records")),
("can_close_case", _("Can close case")),
("can_merge_case", _("Can merge case")),
# Global permission
("can_select_client", _("Can select client")),
)
def update_handled(self):
from poradnia.letters.models import Letter
try:
obj = Letter.objects.case(self).filter(status="done").last()
if obj.created_by.is_staff:
self.handled = True
else:
self.handled = False
except IndexError:
self.handled = False
self.save()
def update_counters(self, save=True):
from poradnia.letters.models import Letter
letters_list = Letter.objects.case(self)
self.letter_count = letters_list.count()
try:
last_action = letters_list.last()
self.last_action = last_action.created_on
except IndexError:
pass
try:
last_send = letters_list.last_staff_send()
self.last_send = last_send.status_changed or last_send.created_on
except IndexError:
self.last_send = None
try:
last_received = letters_list.last_received()
self.last_received = last_received.created_on
except IndexError:
self.last_received = None
try:
self.deadline = (
self.event_set.filter(deadline=True)
.filter(time__gte=timezone.now())
.order_by("time")
.all()[0]
)
except IndexError:
self.deadline = None
if save:
self.save()
def update_status(self, reopen=False, save=True):
if reopen or (self.status != self.STATUS.closed):
if self.has_assignees():
self.status = self.STATUS.assigned
elif self.has_team():
self.status = self.STATUS.moderated
else:
self.status = self.STATUS.free
if save:
self.save()
[dokumentacja]
def has_team(self):
"""
Checks if there exists a staff member who has a permission to handle
the case.
"""
content_type = ContentType.objects.get_for_model(Case)
qs = CaseUserObjectPermission.objects.filter(
permission__content_type=content_type,
content_object=self,
user__is_staff=True,
)
return qs.exists()
[dokumentacja]
def has_assignees(self):
"""
Checks if there exists a staff member who has a permission to handle
the case.
"""
content_type = ContentType.objects.get_for_model(Case)
qs = CaseUserObjectPermission.objects.filter(
permission__codename="can_send_to_client",
permission__content_type=content_type,
content_object=self,
user__is_staff=True,
)
return qs.exists()
def assign_perm(self):
assign_perm("can_view", self.created_by, self) # assign creator
assign_perm("can_add_record", self.created_by, self) # assign creator
if self.created_by.has_perm("cases.can_send_to_client"):
assign_perm("can_send_to_client", self.created_by, self)
if self.created_by != self.client:
assign_perm("can_view", self.client, self) # assign client
assign_perm("can_add_record", self.client, self) # assign client
# TODO: Remove
def send_notification(self, actor, user_qs, target=None, **context):
if target is None:
target = self
users_to_notify = user_qs
User = get_user_model()
if not settings.NOTIFY_AUTHOR:
users_to_notify = User.objects.exclude(pk=actor.pk).distinct() & user_qs
logger.info(
f"Case: {self.id} - sending notification "
f"to author: {settings.NOTIFY_AUTHOR}"
)
for user in users_to_notify:
user.notify(
actor=actor, target=target, from_email=self.get_email(), **context
)
logger.info(f"Notification sent to {user} with {context}")
def close(self, actor, notify=True):
self.modified_by = actor
self.status = self.STATUS.closed
if notify:
self.send_notification(
actor=actor, user_qs=self.get_users_with_perms(), verb="closed"
)
def get_next_for_user(self, user, **kwargs):
return self.get_next_or_prev_for_user(is_next=True, user=user)
def get_prev_for_user(self, user, **kwargs):
return self.get_next_or_prev_for_user(is_next=False, user=user)
def get_next_or_prev_for_user(self, is_next, user, **kwargs):
op = "gt" if is_next else "lt"
if user.is_staff:
field_name = self.STAFF_ORDER_DEFAULT_FIELD
else:
field_name = self.USER_ORDER_DEFAULT_FIELD
param = getattr(self, field_name)
q = Q()
if param:
q = q | Q(**{"{}__{}".format(field_name, op): param})
if self.pk:
q = q | Q(**{field_name: param, "pk__%s" % op: self.pk})
manager = self.__class__._default_manager.using(self._state.db).filter(**kwargs)
qs = manager.filter(q)
qs = qs.order_for_user(user=user, is_next=is_next)
qs = qs.for_user(user)
try:
return qs[0]
except IndexError:
raise self.DoesNotExist(
"%s matching query does not exist." % self.__class__._meta.object_name
)
[dokumentacja]
class DeleteCaseProxy(Case):
class Meta:
proxy = True
verbose_name = _("Cases to delete")
verbose_name_plural = _("Cases to delete")
[dokumentacja]
class CaseUserObjectPermission(UserObjectPermissionBase):
content_object = models.ForeignKey(Case, on_delete=models.CASCADE)
[dokumentacja]
def save(self, *args, **kwargs):
super().save(*args, **kwargs)
if self.permission.codename == "can_send_to_client":
self.content_object.update_status()
[dokumentacja]
def delete(self, *args, **kwargs):
"""
Note: this method is not invoked in usual circumstances (`remove_perm` call).
"""
super().delete(*args, **kwargs)
self.content_object.update_status()
[dokumentacja]
class CaseGroupObjectPermission(GroupObjectPermissionBase):
content_object = models.ForeignKey(Case, on_delete=models.CASCADE)
limit = {"content_type__app_label": "cases", "content_type__model": "case"}
[dokumentacja]
class PermissionGroup(models.Model):
name = models.CharField(max_length=25, verbose_name=_("Name"))
permissions = models.ManyToManyField(
to=Permission, verbose_name=_("Permissions"), limit_choices_to=limit
)
def __str__(self):
return self.name
@property
def group_help_text(self):
perm_name_list = [gettext(p.name) for p in self.permissions.all()]
return f"\n{self.name}:\n" + "\n".join(
[f"- {n}" for n in sorted(perm_name_list)]
)
def notify_new_case(sender, instance, created, **kwargs):
if created:
User = get_user_model()
users = User.objects.filter(notify_new_case=True).all()
email = [x.email for x in users]
TemplateMailManager.send(
template_key=TemplateKey.CASE_NEW,
recipient_list=email,
context={"case": instance},
)
post_save.connect(receiver=notify_new_case, sender=Case, dispatch_uid="new_case_notify")
def assign_perm_new_case(sender, instance, created, **kwargs):
if created:
instance.assign_perm()
post_save.connect(
receiver=assign_perm_new_case, sender=Case, dispatch_uid="assign_perm_new_case"
)
def delete_files(sender, instance, **kwargs):
delete_files_for_cases([instance])
pre_delete.connect(
receiver=delete_files, sender=Case, dispatch_uid="delete_files_for_case"
)