import hashlib
import random
import string
import warnings
from django.conf import settings
from django.core.exceptions import ValidationError
from django.db import models
from django.db.models import Q
from django.utils.encoding import force_text
from django.utils.timezone import now
from django.utils.translation import gettext_lazy as _
from enumfields import Enum
from parler.models import TranslatableModel, TranslatedFields
from shuup.campaigns.consts import (
CAMPAIGNS_CACHE_NAMESPACE,
CATALOG_FILTER_CACHE_NAMESPACE,
CONTEXT_CONDITION_CACHE_NAMESPACE,
)
from shuup.campaigns.models.basket_conditions import CategoryProductsBasketCondition, ProductsInBasketCondition
from shuup.campaigns.utils.campaigns import get_lines_suppliers, get_product_ids_and_quantities
from shuup.campaigns.utils.matcher import get_matching_for_product
from shuup.core import cache
from shuup.core.fields import InternalIdentifierField
from shuup.core.models import Category, Order, Shop
from shuup.core.utils import context_cache
from shuup.utils.analog import define_log_model
from shuup.utils.properties import MoneyPropped
[docs]
class CampaignType(Enum):
CATALOG = 1
BASKET = 2
[docs]
class CampaignQueryset(models.QuerySet):
[docs]
def available(self, shop=None):
query = Q(
Q(active=True)
& (Q(start_datetime__isnull=True) | Q(start_datetime__lte=now()))
& (Q(end_datetime__isnull=True) | Q(end_datetime__gte=now()))
)
if shop:
query &= Q(shop=shop)
return self.filter(query)
[docs]
class Campaign(MoneyPropped, TranslatableModel):
admin_url_suffix = None
shop = models.ForeignKey(
on_delete=models.CASCADE,
to=Shop,
verbose_name=_("shop"),
help_text=_("The shop where the campaign is active."),
)
name = models.CharField(
max_length=120,
verbose_name=_("name"),
help_text=_("The name for this campaign."),
)
# translations in subclass
identifier = InternalIdentifierField(unique=True)
active = models.BooleanField(
default=False,
verbose_name=_("active"),
help_text=_("Enable this if the campaign is currently active. Please also set a start and an end date."),
)
start_datetime = models.DateTimeField(
null=True,
blank=True,
verbose_name=_("start date and time"),
help_text=_(
"The date and time the campaign starts. This is only applicable if the campaign is marked as active."
),
)
end_datetime = models.DateTimeField(
null=True,
blank=True,
verbose_name=_("end date and time"),
help_text=_(
"The date and time the campaign ends. This is only applicable if the campaign is marked as active."
),
)
created_by = models.ForeignKey(
settings.AUTH_USER_MODEL,
blank=True,
null=True,
related_name="+",
on_delete=models.SET_NULL,
verbose_name=_("created by"),
)
modified_by = models.ForeignKey(
settings.AUTH_USER_MODEL,
blank=True,
null=True,
related_name="+",
on_delete=models.SET_NULL,
verbose_name=_("modified by"),
)
created_on = models.DateTimeField(auto_now_add=True, editable=False, verbose_name=_("created on"))
modified_on = models.DateTimeField(auto_now=True, editable=False, verbose_name=_("modified on"))
objects = CampaignQueryset.as_manager()
[docs]
def save(self, *args, **kwargs):
super().save(*args, **kwargs)
cache.bump_version(CAMPAIGNS_CACHE_NAMESPACE)
cache.bump_version(CONTEXT_CONDITION_CACHE_NAMESPACE)
cache.bump_version(CATALOG_FILTER_CACHE_NAMESPACE)
[docs]
def is_available(self):
if not self.active: # move to manager?
return False
if self.start_datetime and self.end_datetime:
if self.start_datetime <= now() <= self.end_datetime:
return True
return False
elif self.start_datetime and not self.end_datetime:
if self.start_datetime > now():
return False
elif not self.start_datetime and self.end_datetime:
if self.end_datetime < now():
return False
return True
@property
def type(self):
return CampaignType.BASKET if isinstance(self, BasketCampaign) else CampaignType.CATALOG
[docs]
class CatalogCampaign(Campaign):
_queryset = None
admin_url_suffix = "catalog_campaign"
conditions = models.ManyToManyField("ContextCondition", blank=True, related_name="campaign")
filters = models.ManyToManyField("CatalogFilter", blank=True, related_name="campaign")
translations = TranslatedFields(
public_name=models.CharField(
max_length=120,
blank=True,
help_text=_("The campaign name to show in the store front."),
)
)
def __str__(self):
return force_text(_("Catalog Campaign: {name}").format(name=self.name))
[docs]
def save(self, *args, **kwargs):
warnings.warn(
"The CatalogCampaign discount module will be removed on next major version.",
DeprecationWarning,
stacklevel=2,
)
super().save(*args, **kwargs)
self.filters.update(active=self.active)
for f in self.filters.all():
for matching_product in f.get_matching_shop_products():
context_cache.bump_cache_for_shop_product(matching_product)
self.conditions.update(active=self.active)
[docs]
def rules_match(
self,
context,
shop_product,
matching_catalog_filters,
matching_context_conditions,
):
if not self.is_available():
return False
# If rule has filters, all of them has to match
for filter_pk in self.filters.values_list("pk", flat=True):
if filter_pk not in matching_catalog_filters:
return False
# All filters match so let's check that also all the conditions match
for condition_pk in self.conditions.values_list("pk", flat=True):
if condition_pk not in matching_context_conditions:
return False
return True
[docs]
@classmethod
def get_for_product(cls, shop_product):
matching_filters = get_matching_for_product(shop_product, provide_category="campaign_catalog_filter")
matching_conditions = get_matching_for_product(shop_product, provide_category="campaign_context_condition")
query_filter = Q(Q(filters__in=matching_filters) | Q(conditions__in=matching_conditions))
return cls.objects.available(shop=shop_product.shop).filter(query_filter).distinct()
[docs]
@classmethod
def get_matching(cls, context, shop_product):
prod_ctx_cache_elements = {
"customer": context.customer.pk or 0,
"shop": context.shop.pk,
"product_id": shop_product.pk,
}
namespace = CAMPAIGNS_CACHE_NAMESPACE
sorted_items = dict(sorted(prod_ctx_cache_elements.items(), key=lambda item: item[0]))
key = "{}:{}".format(
namespace,
hashlib.sha1(str(sorted_items).encode("utf-8")).hexdigest(),
)
cached_matching = cache.get(key, None)
if cached_matching is not None:
return cached_matching
from shuup.campaigns.models.matching import get_matching_catalog_filters, get_matching_context_conditions
matching_context_conditions = get_matching_context_conditions(context)
matching_catalog_filters = get_matching_catalog_filters(shop_product)
if not (matching_context_conditions or matching_catalog_filters):
return []
# Get all possible campaign id's for matching context_conditions
campaigns_based_on_conditions = set(
cls.objects.filter(
active=True,
shop=context.shop,
conditions__id__in=matching_context_conditions,
).values_list("pk", flat=True)
)
campaigns_based_on_catalog_filters = set()
if hasattr(cls, "filters"):
# Get all possible campaigns for matching catalog_filters
campaigns_based_on_catalog_filters = set(
cls.objects.filter(
active=True,
shop=context.shop,
filters__id__in=matching_catalog_filters,
).values_list("pk", flat=True)
)
all_possible_campaigns_ids = campaigns_based_on_conditions | campaigns_based_on_catalog_filters
matching = []
for campaign in cls.objects.filter(id__in=all_possible_campaigns_ids):
if campaign.rules_match(
context,
shop_product,
matching_catalog_filters,
matching_context_conditions,
):
matching.append(campaign)
cache.set(key, matching, timeout=None)
return matching
[docs]
class BasketCampaign(Campaign):
admin_url_suffix = "basket_campaign"
basket_line_text = models.CharField(
max_length=120,
verbose_name=_("basket line text"),
help_text=_("This text will be shown in a basket."),
)
conditions = models.ManyToManyField("BasketCondition", blank=True, related_name="campaign")
coupon = models.OneToOneField(
"Coupon",
null=True,
blank=True,
related_name="campaign",
verbose_name=_("coupon"),
on_delete=models.CASCADE,
)
supplier = models.ForeignKey(
"shuup.Supplier",
null=True,
blank=True,
related_name="basket_campaigns",
verbose_name=_("supplier"),
help_text=_(
"When set, this campaign will match only products from the selected supplier. "
"Rules and Effects will also be restricted to include only the products of this supplier."
),
on_delete=models.CASCADE,
)
translations = TranslatedFields(
public_name=models.CharField(
max_length=120,
verbose_name=_("public name"),
help_text=_("The campaign name to show in the store front."),
)
)
def __str__(self):
return force_text(_("Basket Campaign: {name}").format(name=self.name))
[docs]
def save(self, *args, **kwargs):
if self.coupon:
code_count_for_shop = BasketCampaign.objects.filter(
active=True, shop_id=self.shop.id, coupon__code=self.coupon.code
)
if not self.id and code_count_for_shop.exists():
raise ValidationError(_("Can't have multiple active campaigns with same code."))
if self.id and code_count_for_shop.exclude(coupon_id=self.coupon.id).exists():
raise ValidationError(_("Can't have multiple active campaigns with same code."))
super().save(*args, **kwargs)
self.conditions.update(active=self.active)
[docs]
@classmethod
def get_for_product(cls, shop_product):
matching_conditions = get_matching_for_product(shop_product, provide_category="campaign_basket_condition")
matching_effects = get_matching_for_product(
shop_product, provide_category="campaign_basket_discount_effect_form"
)
matching_line_effects = get_matching_for_product(
shop_product, provide_category="campaign_basket_line_effect_form"
)
effects_q = Q(Q(line_effects__id__in=matching_line_effects) | Q(discount_effects__id__in=matching_effects))
matching_q = Q(Q(conditions__in=matching_conditions) | effects_q)
return cls.objects.available(shop=shop_product.shop).filter(matching_q).distinct()
[docs]
@classmethod
def get_matching(cls, basket, lines):
matching = []
exclude_condition_ids = set()
product_id_to_qty = get_product_ids_and_quantities(basket)
lines_suppliers = get_lines_suppliers(basket)
# Get ProductsInBasketCondition's that can't match with the basket
products_in_basket_conditions_to_check = set(
ProductsInBasketCondition.objects.filter(products__id__in=product_id_to_qty.keys()).values_list(
"id", flat=True
)
)
exclude_condition_ids |= set(
ProductsInBasketCondition.objects.exclude(id__in=products_in_basket_conditions_to_check).values_list(
"id", flat=True
)
)
# Get CategoryProductsBasketCondition's that can't match with the basket
categories = set(
Category.objects.filter(shop_products__product_id__in=product_id_to_qty.keys()).values_list("id", flat=True)
)
category_products_in_basket_to_check = set(
CategoryProductsBasketCondition.objects.filter(categories__in=categories).values_list("id", flat=True)
)
exclude_condition_ids |= set(
CategoryProductsBasketCondition.objects.exclude(id__in=category_products_in_basket_to_check).values_list(
"id", flat=True
)
)
queryset = cls.objects.filter(active=True, shop=basket.shop)
if exclude_condition_ids:
queryset = queryset.exclude(conditions__id__in=exclude_condition_ids)
# exclude the campaigns that have supplier set and are not included the supplier list
if lines_suppliers:
queryset = queryset.filter(Q(supplier__isnull=True) | Q(supplier__in=lines_suppliers))
for campaign in queryset.prefetch_related("conditions").distinct():
if campaign.rules_match(basket, lines):
matching.append(campaign)
return matching
[docs]
def rules_match(self, basket, lines):
"""
Check if basket rules match.
They will not match if:
1) The campaign is not active
2) The campaign has attached coupon,
which doesn't match or is not active
3) Any of the attached rules don't match
"""
if not self.is_available():
return False
if self.coupon and not (self.coupon.active and self.coupon.code.upper() in [c.upper() for c in basket.codes]):
return False
for rule in self.conditions.all():
if not rule.matches(basket, lines):
return False
return True
[docs]
class CouponUsage(models.Model):
coupon = models.ForeignKey(on_delete=models.CASCADE, to="Coupon", related_name="usages")
order = models.ForeignKey(on_delete=models.CASCADE, to=Order, related_name="coupon_usages")
created_by = models.ForeignKey(
settings.AUTH_USER_MODEL,
blank=True,
null=True,
related_name="+",
on_delete=models.SET_NULL,
verbose_name=_("created by"),
)
modified_by = models.ForeignKey(
settings.AUTH_USER_MODEL,
blank=True,
null=True,
related_name="+",
on_delete=models.SET_NULL,
verbose_name=_("modified by"),
)
created_on = models.DateTimeField(auto_now_add=True, editable=False, verbose_name=_("created on"))
modified_on = models.DateTimeField(auto_now=True, editable=False, verbose_name=_("modified on"))
[docs]
@classmethod
def add_usage(cls, order, coupon):
return cls.objects.create(order=order, coupon=coupon)
[docs]
class Coupon(models.Model):
admin_url_suffix = "coupon"
name_field = "code" # TODO: Document me
search_fields = ["code"] # used by Select2Multiple to know which fields use to search by
code = models.CharField(max_length=12)
usage_limit_customer = models.PositiveIntegerField(
blank=True,
null=True,
verbose_name=_("usage limit per customer"),
help_text=_("Limit the amount of usages per a single customer."),
)
usage_limit = models.PositiveIntegerField(
blank=True,
null=True,
verbose_name=_("usage limit"),
help_text=_(
"Set the absolute limit of usages for this coupon. If the limit is zero (0), coupon can't be used."
),
)
active = models.BooleanField(default=False, verbose_name=_("is active"))
shop = models.ForeignKey(
on_delete=models.CASCADE,
to="shuup.Shop",
verbose_name=_("shop"),
related_name="campaign_coupons",
null=True,
help_text=_("The shop where the coupon is active."),
)
supplier = models.ForeignKey(
"shuup.Supplier",
null=True,
blank=True,
related_name="campaign_coupons",
verbose_name=_("supplier"),
on_delete=models.CASCADE,
)
created_by = models.ForeignKey(
settings.AUTH_USER_MODEL,
blank=True,
null=True,
related_name="+",
on_delete=models.SET_NULL,
verbose_name=_("created by"),
)
modified_by = models.ForeignKey(
settings.AUTH_USER_MODEL,
blank=True,
null=True,
related_name="+",
on_delete=models.SET_NULL,
verbose_name=_("modified by"),
)
created_on = models.DateTimeField(auto_now_add=True, editable=False, verbose_name=_("created on"))
modified_on = models.DateTimeField(auto_now=True, editable=False, verbose_name=_("modified on"))
[docs]
def save(self, **kwargs):
campaign = BasketCampaign.objects.filter(active=True, coupon_id=self.id).first()
if (
campaign
and BasketCampaign.objects.filter(active=True, shop_id=campaign.shop.id, coupon__code=self.code)
.exclude(id=campaign.id)
.exists()
):
raise ValidationError(_("Can not have multiple active campaigns with the same code."))
return super().save(**kwargs)
[docs]
@classmethod
def generate_code(cls, length=6):
if length > 12:
length = 12
return "".join(random.choice(string.ascii_uppercase + string.digits) for _ in range(length))
@property
def exhausted(self):
val = bool(self.usage_limit and self.usages.count() >= self.usage_limit)
return val
@property
def attached(self):
return BasketCampaign.objects.filter(coupon=self).exists()
[docs]
def attach_to_campaign(self, campaign):
if not self.attached:
self.campaign = campaign
[docs]
@classmethod
def is_usable(cls, code, customer):
try:
code = cls.objects.get(code__iexact=code, active=True)
return code.can_use_code(customer)
except cls.DoesNotExist:
return False
[docs]
def can_use_code(self, customer):
"""
Check if customer can use the code.
:param customer:
:type customer: `Contact` or None
:rtype: True|False
"""
if not self.active:
return False
if not self.attached:
return False
if self.usage_limit_customer:
if not customer or customer.is_anonymous:
return False
if self.usages.filter(order__customer=customer, coupon=self).count() >= self.usage_limit_customer:
return False
return not self.exhausted
[docs]
def use(self, order):
return CouponUsage.add_usage(order=order, coupon=self)
[docs]
def increase_customer_usage_limit_by(self, amount):
if self.usage_limit_customer:
new_limit = self.usage_limit_customer + amount
else:
new_limit = self.usages.count() + amount
self.usage_limit_customer = new_limit
[docs]
def increase_usage_limit_by(self, amount):
self.usage_limit = self.usage_limit + amount if self.usage_limit else (self.usages.count() + amount)
[docs]
def has_been_used(self, usage_count=1):
"""See if code was already used the number of maximum times given"""
return CouponUsage.objects.filter(coupon=self).count() >= usage_count
def __str__(self):
return self.code
CatalogCampaignLogEntry = define_log_model(CatalogCampaign)
BasketCampaignLogEntry = define_log_model(BasketCampaign)
CouponLogEntry = define_log_model(Coupon)
CouponUsageLogEntry = define_log_model(CouponUsage)