Kod źródłowy modułu poradnia.cases.models

import itertools
import logging
import re
from datetime import datetime

import requests
from dateutil.relativedelta import relativedelta
from django.conf import settings
from django.contrib.auth import get_user_model
from django.contrib.auth.models import Permission
from django.contrib.contenttypes.models import ContentType
from django.core.exceptions import (
    ImproperlyConfigured,
    ObjectDoesNotExist,
    PermissionDenied,
)
from django.db import models
from django.db.models import (
    BooleanField,
    CharField,
    Count,
    F,
    Func,
    IntegerField,
    Prefetch,
    Q,
    expressions,
)
from django.db.models.functions import Cast
from django.db.models.query import QuerySet
from django.db.models.signals import post_save, pre_delete
from django.template import Context, Template
from django.urls import reverse
from django.utils import timezone
from django.utils.translation import gettext
from django.utils.translation import gettext_lazy as _
from guardian.models import GroupObjectPermissionBase, UserObjectPermissionBase
from guardian.shortcuts import assign_perm, get_objects_for_user, get_users_with_perms
from model_utils import Choices
from model_utils.fields import MonitorField, StatusField

from poradnia.template_mail.utils import TemplateKey, TemplateMailManager
from poradnia.utils.constants import NAME_MAX_LENGTH
from poradnia.utils.mixins import FormattedDatetimeMixin, UserPrettyNameMixin
from poradnia.utils.utils import get_numeric_param

# TODO: move to settings and fix for DEV and DEMO modes
CASE_PK_RE = r"sprawa-(?P<pk>\d+)@porady.siecobywatelska.pl"


logger = logging.getLogger(__name__)


def delete_files_for_cases(cases):
    from poradnia.letters.models import Attachment, Letter

    def delete_qs(qs, field):
        for x in qs:
            if not getattr(x, field):
                continue
            getattr(x, field).delete()

    delete_qs(Attachment.objects.filter(letter__case__in=cases), "attachment")
    delete_qs(Letter.objects.filter(case__in=cases), "eml")


[dokumentacja] class CaseQuerySet(FormattedDatetimeMixin, UserPrettyNameMixin, QuerySet): def for_assign(self, user): return self.filter( caseuserobjectpermission__user=user, caseuserobjectpermission__permission__codename="can_view", ) def for_user(self, user): return get_objects_for_user(user, "can_view", self) def with_perm(self): return self.prefetch_related("caseuserobjectpermission_set") def with_record_count(self): return self.annotate(record_count=Count("record")) def with_involved_staff(self): qs = ( CaseUserObjectPermission.objects.filter(user__is_staff=True) .select_related("permission", "user") .all() ) return self.prefetch_related( Prefetch("caseuserobjectpermission_set", queryset=qs) ) def involved_staff(self): involved_staff_ids = set( CaseUserObjectPermission.objects.filter(user__is_staff=True) .select_related("permission", "user") .values_list("user", flat=True) .all() ) qs = ( get_user_model() .objects.filter(id__in=involved_staff_ids) .order_by("nicename") ) return qs def by_involved_in(self, user, by_user=True, by_group=False): condition = Q() if by_user: condition = condition | Q(caseuserobjectpermission__user=user) if by_group: condition = condition | Q(casegroupobjectpermission__group__user=user) return self.filter(condition) def by_msg(self, message): envelope = message.get_email_object().get( "Envelope-To" ) or message.get_email_object().get("To") if not envelope: return self.none() result = re.search(CASE_PK_RE, envelope) if not result: return self.none() return self.filter(pk=result.group("pk")) def by_addresses(self, addresses): pks = [ re.match(CASE_PK_RE, address).group("pk") for address in addresses if re.match(CASE_PK_RE, address) ] return self.filter(pk__in=pks) def order_for_user(self, user, is_next): order = "" if is_next else "-" if user.is_staff: field_name = self.model.STAFF_ORDER_DEFAULT_FIELD else: field_name = self.model.USER_ORDER_DEFAULT_FIELD return self.order_by( "{}{}".format(order, field_name), "{}{}".format(order, "pk") ) def with_month_year(self): return self.annotate( month=Func(F("created_on"), function="month", output_field=IntegerField()) ).annotate( year=Func(F("created_on"), function="year", output_field=IntegerField()) ) def with_advice_status(self): return self.annotate(advice_count=Count("advice")).annotate( has_advice=expressions.Case( expressions.When(advice_count=0, then=False), default=True, output_field=BooleanField(), ) ) def with_formatted_deadline(self): return self.annotate( # TODO add explicit datetime formatting with TZ for MySql deadline_str=Cast("deadline__time", output_field=CharField()) ) def area_filter(self, jst): return self.filter( advice__jst__tree_id=jst.tree_id, advice__jst__lft__range=(jst.lft, jst.rght), ) def ajax_boolean_filter(self, request, prefix, field): filter_values = [] for choice in [("yes", True), ("no", False)]: filter_name = prefix + choice[0] if get_numeric_param(request, filter_name): filter_values.append(choice[1]) if filter_values: return self.filter(**{field + "__in": filter_values}) else: return self.filter(**{field + "__isnull": True}) def ajax_status_filter(self, request): choices = Case.STATUS._identifier_map filter_values = [] for choice in choices.keys(): filter_name = "status_" + choice.lower() if get_numeric_param(request, filter_name): filter_values.append(choices[choice]) if filter_values: return self.filter(status__in=filter_values) else: return self.filter(status__isnull=True) def ajax_involved_staff_filter(self, request): involved_staff_filter = get_numeric_param(request, "involved_staff_filter") if involved_staff_filter: involved = get_user_model().objects.filter(id=involved_staff_filter).first() return self.by_involved_in(user=involved).distinct() return self def ajax_has_deadline_filter(self, request): # to provide empty queryset when none of the options is selected deadline_query = Q(deadline=0) # build query for deadline according to user selection for choice in [("yes", False), ("no", True)]: filter_name = "has_deadline_" + choice[0] if get_numeric_param(request, filter_name): deadline_query |= Q(deadline__isnull=choice[1]) return self.filter(deadline_query) def ajax_has_tag_filter(self, request): # to provide empty queryset when none of the options is selected tag_query = Q(pk__in=[]) # build query for tag according to user selection for choice in [("yes", False), ("no", True)]: filter_name = "has_tag_" + choice[0] if get_numeric_param(request, filter_name): tag_query |= Q(advice__isnull=choice[1]) return self.filter(tag_query) def old_cases_to_delete(self): years_to_store = settings.YEARS_TO_STORE_CASES current_month = datetime.now().replace( day=1, hour=0, minute=0, second=0, microsecond=0 ) oldest_date = current_month + relativedelta(years=-years_to_store) return self.filter(last_action__lt=oldest_date)
[dokumentacja] class Case(models.Model): STAFF_ORDER_DEFAULT_FIELD = "last_action" USER_ORDER_DEFAULT_FIELD = "last_send" STATUS = Choices( ("0", "free", _("free")), ("3", "moderated", _("moderated")), ("1", "assigned", _("assigned")), ("2", "closed", _("closed")), ) STATUS_STYLE = { "0": "far fa-circle ", "1": "far fa-circle-dot", "3": "far fa-square-plus", "2": "fas fa-circle", } id = models.AutoField(verbose_name=_("Case number"), primary_key=True) name = models.CharField(max_length=NAME_MAX_LENGTH, verbose_name=_("Subject")) status = StatusField() status_changed = MonitorField(monitor="status") client = models.ForeignKey( to=settings.AUTH_USER_MODEL, related_name="case_client", on_delete=models.CASCADE, verbose_name=_("Client"), ) letter_count = models.IntegerField(default=0, verbose_name=_("Letter count")) last_send = models.DateTimeField(null=True, blank=True, verbose_name=_("Last send")) last_action = models.DateTimeField( null=True, blank=True, verbose_name=_("Last action") ) last_received = models.DateTimeField( null=True, blank=True, verbose_name=_("Last received") ) deadline = models.ForeignKey( to="events.Event", null=True, blank=True, related_name="event_deadline", on_delete=models.CASCADE, verbose_name=_("Dead-line"), ) objects = CaseQuerySet.as_manager() created_by = models.ForeignKey( to=settings.AUTH_USER_MODEL, related_name="case_created", verbose_name=_("Created by"), on_delete=models.CASCADE, ) created_on = models.DateTimeField(auto_now_add=True, verbose_name=_("Created on")) modified_by = models.ForeignKey( to=settings.AUTH_USER_MODEL, null=True, on_delete=models.CASCADE, related_name="case_modified", verbose_name=_("Modified by"), ) modified_on = models.DateTimeField( auto_now=True, null=True, blank=True, verbose_name=_("Modified on") ) handled = models.BooleanField(default=False, verbose_name=_("Handled")) has_project = models.BooleanField(default=False, verbose_name=_("Has project")) advice_classification_response_status = models.IntegerField( null=True, blank=True, verbose_name=_("Advice classification response status") ) advice_classification_request_sent_on = models.DateTimeField( null=True, blank=True, verbose_name=_("Advice classification request sent on") ) def status_display(self): return self.STATUS[self.status] def get_absolute_url(self): return reverse("cases:detail", kwargs={"pk": str(self.pk)}) def render_case_link(self): url = self.get_absolute_url() label = self.name bold_start = "" if self.handled else "<b>" bold_end = "" if self.handled else "</b>" return f'{bold_start}<a href="{url}">{label}</a>{bold_end}' def render_case_link_formatted(self, user): url = self.get_absolute_url() label = self.name template_string = """ {% load cases_tags %} <span class="{{ object.status|status2css }}"> {% if user.is_staff and not object.handled %}<b>{% endif %} <a href="{{ url }}">{{ label }}</a> {% if user.is_staff and not object.handled %}</b>{% endif %} </span>""" template = Template(template_string) context = Context({"object": self, "url": url, "label": label, "user": user}) return template.render(context=context) def render_status(self): status_icon = self.STATUS_STYLE[self.status] return f'<span class="{status_icon}"></span>' def render_project_badge(self): if self.has_project: title = _("Reply to client to remove badge") name = _("Project") return f""" <span class="label label-success" title="{title}"> <i class="fas fa-pencil"></i> {name} </span> """ else: return "" def render_handled(self): if not self.handled: return '<span class="fas fa-check"></span>' else: return "" def render_involved_staff(self): permissions = self.caseuserobjectpermission_set.all() grouped_permissions = itertools.groupby(permissions, lambda p: p.user) user_list = [ { "grouper": key, "title": ", ".join([p.permission.name for p in list(group)]), } for key, group in grouped_permissions if key.is_staff ] return "<br>".join([f""" <span class="label label-info" title="{user["title"]}"> {user["grouper"].get_nicename()}</span> """ for user in user_list]) def render_case_advice_link(self): try: return self.advice.render_advice_link() except ObjectDoesNotExist: return "" def get_edit_url(self): return reverse("cases:edit", kwargs={"pk": str(self.pk)}) def get_close_url(self): return reverse("cases:close", kwargs={"pk": str(self.pk)}) def get_users_with_perms(self, *args, **kwargs): return get_users_with_perms(self, with_group_users=False, *args, **kwargs) def __str__(self): return self.name def get_email(self): return settings.PORADNIA_EMAIL_OUTPUT % self.__dict__ # TODO: Remove def perm_check(self, user, perm): if not (user.has_perm("cases." + perm) or user.has_perm("cases." + perm, self)): raise PermissionDenied return True class Meta: ordering = ["last_send"] verbose_name = _("Case") verbose_name_plural = _("Cases") permissions = ( ("can_view", _("Can view")), ("can_assign", _("Can assign new permissions")), ("can_send_to_client", _("Can send text to client")), ("can_manage_permission", _("Can assign permission")), ("can_add_record", _("Can add record")), ("can_change_own_record", _("Can change own records")), ("can_change_all_record", _("Can change all records")), ("can_close_case", _("Can close case")), ("can_merge_case", _("Can merge case")), # Global permission ("can_select_client", _("Can select client")), ) def update_handled(self): from poradnia.letters.models import Letter try: obj = Letter.objects.case(self).filter(status="done").last() if obj.created_by.is_staff: self.handled = True else: self.handled = False except IndexError: self.handled = False self.save() def update_counters(self, save=True): from poradnia.letters.models import Letter letters_list = Letter.objects.case(self) self.letter_count = letters_list.count() try: last_action = letters_list.last() self.last_action = last_action.created_on except IndexError: pass try: last_send = letters_list.last_staff_send() self.last_send = last_send.status_changed or last_send.created_on except IndexError: self.last_send = None try: last_received = letters_list.last_received() self.last_received = last_received.created_on except IndexError: self.last_received = None try: self.deadline = ( self.event_set.filter(deadline=True) .filter(time__gte=timezone.now()) .order_by("time") .all()[0] ) except IndexError: self.deadline = None if save: self.save() def update_status(self, reopen=False, save=True): if reopen or (self.status != self.STATUS.closed): if self.has_assignees(): self.status = self.STATUS.assigned elif self.has_team(): self.status = self.STATUS.moderated else: self.status = self.STATUS.free if save: self.save()
[dokumentacja] def has_team(self): """ Checks if there exists a staff member who has a permission to handle the case. """ content_type = ContentType.objects.get_for_model(Case) qs = CaseUserObjectPermission.objects.filter( permission__content_type=content_type, content_object=self, user__is_staff=True, ) return qs.exists()
[dokumentacja] def has_assignees(self): """ Checks if there exists a staff member who has a permission to handle the case. """ content_type = ContentType.objects.get_for_model(Case) qs = CaseUserObjectPermission.objects.filter( permission__codename="can_send_to_client", permission__content_type=content_type, content_object=self, user__is_staff=True, ) return qs.exists()
def assign_perm(self): assign_perm("can_view", self.created_by, self) # assign creator assign_perm("can_add_record", self.created_by, self) # assign creator if self.created_by.has_perm("cases.can_send_to_client"): assign_perm("can_send_to_client", self.created_by, self) if self.created_by != self.client: assign_perm("can_view", self.client, self) # assign client assign_perm("can_add_record", self.client, self) # assign client # TODO: Remove def send_notification(self, actor, user_qs, target=None, **context): if target is None: target = self users_to_notify = user_qs User = get_user_model() if not settings.NOTIFY_AUTHOR: users_to_notify = User.objects.exclude(pk=actor.pk).distinct() & user_qs logger.info( f"Case: {self.id} - sending notification " f"to author: {settings.NOTIFY_AUTHOR}" ) for user in users_to_notify: user.notify( actor=actor, target=target, from_email=self.get_email(), **context ) logger.info(f"Notification sent to {user} with {context}") def close(self, actor, notify=True): self.modified_by = actor self.status = self.STATUS.closed if notify: self.send_notification( actor=actor, user_qs=self.get_users_with_perms(), verb="closed" ) def get_next_for_user(self, user, **kwargs): return self.get_next_or_prev_for_user(is_next=True, user=user) def get_prev_for_user(self, user, **kwargs): return self.get_next_or_prev_for_user(is_next=False, user=user) def get_next_or_prev_for_user(self, is_next, user, **kwargs): op = "gt" if is_next else "lt" if user.is_staff: field_name = self.STAFF_ORDER_DEFAULT_FIELD else: field_name = self.USER_ORDER_DEFAULT_FIELD param = getattr(self, field_name) q = Q() if param: q = q | Q(**{"{}__{}".format(field_name, op): param}) if self.pk: q = q | Q(**{field_name: param, "pk__%s" % op: self.pk}) manager = self.__class__._default_manager.using(self._state.db).filter(**kwargs) qs = manager.filter(q) qs = qs.order_for_user(user=user, is_next=is_next) qs = qs.for_user(user) try: return qs[0] except IndexError: raise self.DoesNotExist( "%s matching query does not exist." % self.__class__._meta.object_name )
[dokumentacja] def request_n8n_advice_classification(self): """ Send this case to the configured n8n advice-classification webhook. The webhook is expected to create or update the Advice object related to this Case. The request is authenticated with a Bearer token and contains case context, the first two non-staff letters with attachment OCR text, active advicer taxonomy tags, client data, and the first involved staff user as advisor candidate. Required settings: - N8N_ADVICE_WEBHOOK_URL - N8N_ADVICE_WEBHOOK_TOKEN Returns the decoded JSON response when n8n returns JSON; otherwise returns a dict with the response status code and raw response text. Raises requests.HTTPError for non-2xx responses. """ webhook_url = getattr(settings, "N8N_ADVICE_WEBHOOK_URL", None) webhook_token = getattr(settings, "N8N_ADVICE_WEBHOOK_TOKEN", None) if not webhook_url or not webhook_token: raise ImproperlyConfigured( "N8N_ADVICE_WEBHOOK_URL and N8N_ADVICE_WEBHOOK_TOKEN must be set." ) response = requests.post( webhook_url, json=self._build_n8n_advice_payload(), headers={ "Authorization": f"Bearer {webhook_token}", "Content-Type": "application/json", }, timeout=getattr(settings, "N8N_ADVICE_WEBHOOK_TIMEOUT", 30), ) response.raise_for_status() self.advice_classification_request_sent_on = timezone.now() self.advice_classification_response_status = response.status_code self.save( update_fields=[ "advice_classification_request_sent_on", "advice_classification_response_status", ] ) try: return response.json() except ValueError: return { "status_code": response.status_code, "text": response.text, }
def _build_n8n_advice_payload(self): from poradnia.advicer.models import ( Area, InstitutionKind, Issue, PersonKind, ) from poradnia.letters.models import Letter def serialize_user(user): if not user: return None return { "id": user.pk, "name": ( user.get_nicename() if hasattr(user, "get_nicename") else user.get_full_name() or user.get_username() ), } def serialize_tags(model): return list( model.objects.filter(active=True) .order_by("id") .values("id", "name", "tag_helper") ) letters = ( Letter.objects.filter( record__case=self, created_by_is_staff=False, ) .prefetch_related("attachment_set") .order_by("id")[:2] ) advisor = ( self.caseuserobjectpermission_set.filter(user__is_staff=True) .select_related("user") .order_by("user__nicename", "user_id") .first() ) return { "case": { "id": self.pk, "name": self.name, }, "letters": [ { "id": letter.pk, "text": letter.text, "attachments": [ { "id": attachment.pk, "text_content": attachment.text_content, } for attachment in letter.attachment_set.all().order_by("id") ], } for letter in letters ], "issues": serialize_tags(Issue), "areas": serialize_tags(Area), "person_kinds": serialize_tags(PersonKind), "institution_kinds": serialize_tags(InstitutionKind), "client": serialize_user(self.client), "advisor": serialize_user(advisor.user if advisor else None), }
[dokumentacja] class DeleteCaseProxy(Case): class Meta: proxy = True verbose_name = _("Cases to delete") verbose_name_plural = _("Cases to delete")
[dokumentacja] class CaseUserObjectPermission(UserObjectPermissionBase): content_object = models.ForeignKey(Case, on_delete=models.CASCADE)
[dokumentacja] def save(self, *args, **kwargs): super().save(*args, **kwargs) if self.permission.codename == "can_send_to_client": self.content_object.update_status()
[dokumentacja] def delete(self, *args, **kwargs): """ Note: this method is not invoked in usual circumstances (`remove_perm` call). """ super().delete(*args, **kwargs) self.content_object.update_status()
[dokumentacja] class CaseGroupObjectPermission(GroupObjectPermissionBase): content_object = models.ForeignKey(Case, on_delete=models.CASCADE)
limit = {"content_type__app_label": "cases", "content_type__model": "case"}
[dokumentacja] class PermissionGroup(models.Model): name = models.CharField(max_length=25, verbose_name=_("Name")) permissions = models.ManyToManyField( to=Permission, verbose_name=_("Permissions"), limit_choices_to=limit ) def __str__(self): return self.name @property def group_help_text(self): perm_name_list = [gettext(p.name) for p in self.permissions.all()] return f"\n{self.name}:\n" + "\n".join( [f"- {n}" for n in sorted(perm_name_list)] )
def notify_new_case(sender, instance, created, **kwargs): if created: User = get_user_model() users = User.objects.filter(notify_new_case=True).all() email = [x.email for x in users] TemplateMailManager.send( template_key=TemplateKey.CASE_NEW, recipient_list=email, context={"case": instance}, ) post_save.connect(receiver=notify_new_case, sender=Case, dispatch_uid="new_case_notify") def assign_perm_new_case(sender, instance, created, **kwargs): if created: instance.assign_perm() post_save.connect( receiver=assign_perm_new_case, sender=Case, dispatch_uid="assign_perm_new_case" ) def delete_files(sender, instance, **kwargs): delete_files_for_cases([instance]) pre_delete.connect( receiver=delete_files, sender=Case, dispatch_uid="delete_files_for_case" )