Source code for ls.joyous.models.event_base

# ------------------------------------------------------------------------------
# Joyous events models
# ------------------------------------------------------------------------------
import datetime as dt
import calendar
from collections import OrderedDict
from uuid import uuid4
from django.conf import settings
from django.db import models
from django.db.models import Q
from django.db.models.query import ModelIterable
from django.utils import timezone
from django.utils.translation import gettext_lazy as _
from django.utils.translation import gettext
from timezone_field import TimeZoneField
from wagtail.core.models import Page, PageManager, PageViewRestriction
from wagtail.core.query import PageQuerySet
from wagtail.core.fields import RichTextField
from wagtail.admin.edit_handlers import (FieldPanel,
        PageChooserPanel, BaseCompositeEditHandler)
from wagtail.images import get_image_model_string
from wagtail.search import index
from wagtail.admin.forms import WagtailAdminPageForm
from ..utils.telltime import getLocalDateAndTime
from ..utils.telltime import getTimeFrom, getTimeTo
from ..utils.telltime import timeFormat, dateFormat
from ..edit_handlers import MapFieldPanel
from ..forms import BorgPageForm
from .groups import get_group_model_string, get_group_model


# ------------------------------------------------------------------------------
# Private
# ------------------------------------------------------------------------------
def _filterContentPanels(panels, remove):
    retval = []
    for panel in panels:
        if isinstance(panel, FieldPanel) and panel.field_name in remove:
            continue
        elif isinstance(panel, BaseCompositeEditHandler):
            panel.children = _filterContentPanels(panel.children, remove)
        retval.append(panel)
    return retval

# ------------------------------------------------------------------------------
# Helper types and constants
# ------------------------------------------------------------------------------
class ThisEvent:
    _fields = ('title', 'page', 'url')

    def __init__(self, *args, **kwargs):
        super().__init__()
        argCount = len(args)
        if argCount == 1:
            # new ctor
            self.page = args[0]
        elif argCount == 3:
            # old ctor
            self.title, self.page, self.url = args
        else:
            raise TypeError("Expected 1 or 3 args not {}".format(argCount))
        for kw, arg in kwargs.items():
            setattr(self, kw, arg)

    def __getattr__(self, attr):
        return getattr(self.page, attr)

    def _asdict(self):
        return OrderedDict(zip(self._fields, self))

    def _astuple(self):
        return (self.title, self.page, self.url)

    def __repr__(self):
        return "ThisEvent (title=%r, page=%r, url=%r)" % self._astuple()

    def __len__(self):
        return 3

    def __getitem__(self, key):
        return self._astuple()[key]

[docs]class EventsOnDay: # TODO support __add__ """ The events that occur on a certain day. Both events that start on that day and events that are still continuing. """ def __init__(self, date, holiday=None, days_events=None, continuing_events=None): if days_events is None: days_events = [] if continuing_events is None: continuing_events = [] self.date = date self.holiday = holiday self.days_events = days_events self.continuing_events = continuing_events @property def all_events(self): """ All the events that occur on this day, ``days_events + continuing_events``. """ return self.days_events + self.continuing_events @property def preview(self): """ A short description of some of the events on this day (limited to 100 characters). """ return ", ".join(event.title for event in self.all_events)[:100] @property def weekday(self): """ The weekday abbreviation for this days (e.g. "mon"). """ return calendar.day_abbr[self.date.weekday()].lower()
class EventsByDayList(list): def __init__(self, fromDate, toDate, holidays=None): if holidays is None: holidays = {} self.fromOrd = fromDate.toordinal() self.toOrd = toDate.toordinal() days = [dt.date.fromordinal(ord) for ord in range(self.fromOrd, self.toOrd+1)] super().__init__(EventsOnDay(day, holidays.get(day)) for day in days) def add(self, thisEvent, pageFromDate, pageToDate): pageFromOrd = pageFromDate.toordinal() pageToOrd = pageToDate.toordinal() dayNum = pageFromOrd - self.fromOrd if 0 <= dayNum <= self.toOrd - self.fromOrd: self[dayNum].days_events.append(thisEvent) for pageOrd in range(pageFromOrd + 1, pageToOrd + 1): dayNum = pageOrd - self.fromOrd if 0 <= dayNum <= self.toOrd - self.fromOrd: self[dayNum].continuing_events.append(thisEvent) # ------------------------------------------------------------------------------ # Event models # ------------------------------------------------------------------------------
[docs]class EventCategory(models.Model): """The category type of an event.""" class Meta: ordering = ["name"] verbose_name = _("event category") verbose_name_plural = _("event categories") code = models.CharField(_("code"), max_length=4, unique=True) name = models.CharField(_("name"), max_length=80) # Anything inheriting from models.Model needs its own __init__ or # modeltranslation patch_constructor may break it def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) def __str__(self): return self.name
# ------------------------------------------------------------------------------ class EventManager(PageManager): def get_queryset(self): return self._queryset_class(self.model).live() def __call__(self, request): # a shortcut return self.get_queryset().auth(request) class EventQuerySet(PageQuerySet): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.request = None self.postFilter = None def _clone(self): qs = super()._clone() qs.request = self.request qs.postFilter = self.postFilter return qs def _fetch_all(self): self._fetchResults() self._filterResults() def _fetchResults(self): super()._fetch_all() def _filterResults(self): if self.postFilter: self._result_cache[:] = filter(self.postFilter, self._result_cache) def count(self): if self.postFilter and self._result_cache is None: # if we have a postFilter then force a call to _fetch_all self._fetch_all() return super().count() def upcoming(self): if getattr(settings, "JOYOUS_UPCOMING_INCLUDES_STARTED", False): return self.current() else: return self.future() def current(self): qs = self._clone() qs.postFilter = self.__predicateBasedOn('_current_datetime_from') return qs def future(self): qs = self._clone() qs.postFilter = self.__predicateBasedOn('_future_datetime_from') return qs def past(self): qs = self._clone() qs.postFilter = self.__predicateBasedOn('_past_datetime_from') return qs def __predicateBasedOn(self, attribute): def predicate(item): # If used after byDay [ e.g. qry.byDay(from, to).upcoming() ] then # this will reject the whole days_events if just one event does not # match the predicate. for event in getattr(item, 'days_events', [item]): page = getattr(event, 'page', event) if not getattr(page, attribute, False): return False return True return predicate def this(self): request = self.request class ThisIterable(ModelIterable): def __iter__(self): for page in super().__iter__(): yield ThisEvent(page.title, page, page.get_url(request)) qs = self._clone() qs._iterable_class = ThisIterable return qs def authorized_q(self, request): PASSWORD = PageViewRestriction.PASSWORD LOGIN = PageViewRestriction.LOGIN GROUPS = PageViewRestriction.GROUPS KEY = PageViewRestriction.passed_view_restrictions_session_key restrictions = PageViewRestriction.objects.all() passed = request.session.get(KEY, []) if passed: restrictions = restrictions.exclude(id__in=passed, restriction_type=PASSWORD) if request.user.is_authenticated: restrictions = restrictions.exclude(restriction_type=LOGIN) if request.user.is_superuser: restrictions = restrictions.exclude(restriction_type=GROUPS) else: membership = request.user.groups.all() if membership: restrictions = restrictions.exclude(groups__in=membership, restriction_type=GROUPS) q = Q() for restriction in restrictions: q &= ~self.descendant_of_q(restriction.page, inclusive=True) return q def auth(self, request): self.request = request if request is None: return self else: return self.filter(self.authorized_q(request)) # Possible Future feature redact unauthorized events?? #def redact(self, request) class EventPageForm(BorgPageForm): def clean(self): cleaned_data = super().clean() self._checkStartBeforeEnd(cleaned_data) return cleaned_data def _checkStartBeforeEnd(self, cleaned_data): startTime = getTimeFrom(cleaned_data.get('time_from')) endTime = getTimeTo(cleaned_data.get('time_to')) if startTime > endTime: self.add_error('time_to', _("Event cannot end before it starts")) # Cannot serialize: functools._lru_cache_wrapper object # There are some values Django cannot serialize into migration files. def _get_default_timezone(): return timezone.get_default_timezone()
[docs]class EventBase(models.Model): class Meta: abstract = True uid = models.CharField(max_length=255, db_index=True, editable=False, default=uuid4) category = models.ForeignKey(EventCategory, related_name="+", verbose_name=_("category"), on_delete=models.SET_NULL, blank=True, null=True) image = models.ForeignKey(get_image_model_string(), null=True, blank=True, related_name='+', verbose_name=_("image"), on_delete=models.SET_NULL) time_from = models.TimeField(_("start time"), null=True, blank=True) time_to = models.TimeField(_("end time"), null=True, blank=True) tz = TimeZoneField(verbose_name=_("time zone"), default=_get_default_timezone) group_page = models.ForeignKey(get_group_model_string(), null=True, blank=True, verbose_name=_("group page"), on_delete=models.SET_NULL) details = RichTextField(_("details"), blank=True) location = models.CharField(_("location"), max_length=255, blank=True) website = models.URLField(_("website"), blank=True) # Init these variables to prevent template DEBUG messages # Yes, this is very ugly. An alternative solution would be welcome. cancellation_details = None extra_information = None postponed_from_when = None search_fields = Page.search_fields + [ index.SearchField('location'), index.SearchField('details'), ] content_panels1 = [ FieldPanel('details', classname="full"), MapFieldPanel('location'), FieldPanel('website'), ] if getattr(settings, "JOYOUS_GROUP_SELECTABLE", False): content_panels1.append(PageChooserPanel('group_page')) # Anything inheriting from models.Model needs its own __init__ or # modeltranslation patch_constructor may break it def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) @property def group(self): """ The group this event belongs to. Adding the event as a child of a group automatically assigns the event to that group. """ retval = None parent = self.get_parent() Group = get_group_model() if issubclass(parent.specific_class, Group): retval = parent.specific if retval is None: retval = self.group_page return retval @property def _current_datetime_from(self): """ The datetime this event will start or did start in the local timezone, or None if it is finished. """ fromDt = self._getFromDt() toDt = self._getToDt() return fromDt if toDt >= timezone.localtime() else None @property def _future_datetime_from(self): """ The datetime this event next starts in the local timezone, or None if in the past. """ fromDt = self._getFromDt() return fromDt if fromDt >= timezone.localtime() else None @property def _past_datetime_from(self): """ The datetime this event previously started in the local timezone, or None if it never did. """ fromDt = self._getFromDt() return fromDt if fromDt < timezone.localtime() else None @property def _first_datetime_from(self): """ The datetime this event first started in the local time zone, or None if it never did. """ return self._getFromDt() @property def status(self): """ The current status of the event (started, finished or pending). """ now = timezone.localtime() if self._getToDt() < now: return "finished" elif self._getFromDt() < now: return "started" @property def status_text(self): """ A text description of the current status of the event. """ status = self.status if status == "finished": return _("This event has finished.") elif status == "started": return _("This event has started.") else: return "" @property def at(self): """ A string describing what time the event starts (in the local time zone). """ return timeFormat(self._getFromTime())
[docs] @classmethod def _removeContentPanels(cls, *args): """ Remove the panels and so hide the fields named. """ remove = [] for arg in args: if type(arg) is str: remove.append(arg) else: remove.extend(arg) cls.content_panels = _filterContentPanels(cls.content_panels, remove)
[docs] def isAuthorized(self, request): """ Is the user authorized for the requested action with this event? """ restrictions = self.get_view_restrictions() if restrictions and request is None: return False else: return all(restriction.accept_request(request) for restriction in restrictions)
[docs] def get_context(self, request, *args, **kwargs): retval = super().get_context(request, *args, **kwargs) retval['themeCSS'] = getattr(settings, "JOYOUS_THEME_CSS", "") return retval
[docs] def _getLocalWhen(self, date_from, date_to=None): """ Returns a string describing when the event occurs (in the local time zone). """ dateFrom, timeFrom = getLocalDateAndTime(date_from, self.time_from, self.tz, dt.time.min) if date_to is not None: dateTo, timeTo = getLocalDateAndTime(date_to, self.time_to, self.tz) else: if self.time_to is not None: dateTo, timeTo = getLocalDateAndTime(date_from, self.time_to, self.tz) else: dateTo = dateFrom timeTo = None if dateFrom == dateTo: retval = _("{date} {atTime}").format(date=dateFormat(dateFrom), atTime=timeFormat(timeFrom, timeTo, gettext("at "))) else: retval = _("{date} {atTime}").format(date=dateFormat(dateFrom), atTime=timeFormat(timeFrom, prefix=gettext("at "))) retval = _("{dateTimeFrom} to {dateTo} {atTimeTo}").format( dateTimeFrom=retval.strip(), dateTo=dateFormat(dateTo), atTimeTo=timeFormat(timeTo, prefix=gettext("at "))) return retval.strip()
[docs] def _getFromTime(self, atDate=None): """ Time that the event starts (in the local time zone) for the given date. """ raise NotImplementedError()
[docs] def _getFromDt(self): """ Datetime that the event starts (in the local time zone). """ # This is used by the default implementations of # _current_datetime_from, _future_datetime_from, _past_datetime_from, # and _first_datetime_from raise NotImplementedError()
[docs] def _getToDt(self): """ Datetime that the event ends (in the local time zone). """ # This is used by the default implementation of # _current_datetime_from raise NotImplementedError()
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------