Source code for shuup.discounts.utils

from typing import TYPE_CHECKING, Iterable

import six
from django.conf import settings
from django.db.models import Q

from shuup.core.models import (
    AnonymousContact,
    ProductCatalogDiscountedPrice,
    ProductCatalogDiscountedPriceRule,
    ShopProduct,
    Supplier,
)
from shuup.core.pricing import PricingContext
from shuup.core.utils import context_cache
from shuup.utils.dates import to_timestamp

if TYPE_CHECKING:  # pragma: no cover
    from shuup.discounts.models import Discount


def _get_price_expiration_cache_key(shop_id):
    return f"price_expiration_{shop_id}"


[docs] def get_potential_discounts_for_product(context, product, available_only=True, groups_ids=None, all_contacts=False): """ Get a queryset of all possible discounts for a given context and product If `available_only` is True, only the discounts which match happy hours, start/end dates will be returned If `available_only` is False, all discounts that match with the context and product, that are active will be returned. """ shop = context.shop product_id = product if isinstance(product, six.integer_types) else product.pk category_ids = { category_id for category_id in ShopProduct.objects.filter(product_id=product_id, shop=context.shop).values_list( "categories__id", flat=True ) if category_id } group_ids = groups_ids if groups_ids else list(context.customer.groups_ids) # Product condition is always applied condition_query = Q(product__isnull=True) | Q(product_id=product_id) # Supplier condition is always applied supplier = context.supplier if supplier: condition_query &= Q(supplier=supplier) | Q(supplier__isnull=True) else: # No supplier in context means no discounts limited to specific # suppliers condition_query &= Q(supplier__isnull=True) # Apply category conditions if category_ids: condition_query &= ( Q(category__isnull=True) | (Q(exclude_selected_category=False) & Q(category__id__in=category_ids)) | (Q(exclude_selected_category=True) & ~Q(category__id__in=category_ids)) ) else: condition_query &= Q(category__isnull=True) | Q(exclude_selected_category=True, category__isnull=False) # Apply contact conditions if not all_contacts: if context.customer: condition_query &= Q(contact__isnull=True) | Q(contact=context.customer) else: condition_query &= Q(contact__isnull=True) if group_ids: # Apply contact group conditions condition_query &= Q(Q(contact_group__isnull=True) | Q(contact_group__id__in=group_ids)) else: condition_query &= Q(contact_group__isnull=True) from shuup.discounts.models import Discount if available_only: base_queryset = Discount.objects.available(shop) else: base_queryset = Discount.objects.filter(shop=shop, active=True) # Get all possible discounts for the current product and context return base_queryset.filter(condition_query).distinct()
[docs] def get_active_discount_for_code(order_or_order_source, code): from shuup.discounts.models import Discount shop = order_or_order_source.shop return Discount.objects.available(shop).first()
[docs] def get_next_dates_for_range(weekday, from_hour, to_hour): """ Get datetime ranges for the next weekday Example: Give me the date ranges for the next Sunday from 1pm to 10pm It will return a tuple of datetimes. If the requested weekday is the same of today, it will return both the ranges for today and also for the next week. :rtype list[datetime.datetime] """ import datetime from django.utils.timezone import now now_datetime = now() next_date = now_datetime + datetime.timedelta(days=(abs(weekday - now_datetime.weekday()) % 7)) ranges = [ next_date.replace(hour=from_hour.hour, minute=from_hour.minute), next_date.replace(hour=to_hour.hour, minute=to_hour.minute), ] # the next date is the same as today, let's return also the next week ranges if next_date.date() == now().date(): next_week_date = next_date + datetime.timedelta(days=7) ranges.extend( [ next_week_date.replace(hour=from_hour.hour, minute=from_hour.minute), next_week_date.replace(hour=to_hour.hour, minute=to_hour.minute), ] ) return ranges
[docs] def bump_price_expiration(shop_ids: Iterable[int]): """ Bump price expiration cache for shop ids """ for shop_id in shop_ids: context_cache.bump_cache_for_item(_get_price_expiration_cache_key(shop_id))
[docs] def get_price_expiration(context, product): """ Returns the price expiration for the product through a UNIX timestamp This routine loads all dates that can possibly affect the price of the product in the future. After fetching all the event dates, the expiration time will be the minimum datetime that is greater than now: expire_on = min( event_date for event_dates in [ next_discount_start, next_discount_ends, next_happy_hour_start, next_happy_hour_end, ] if event_date > now ) :rtype numbers.Number|None :returns the price expiration time timestamp """ cache_params = { "identifier": "price_expiration", "item": _get_price_expiration_cache_key(context.shop.pk), "context": {}, } if settings.SHUUP_DISCOUNTS_PER_PRODUCT_EXPIRATION_DATES: cache_params["customer"] = getattr(context, "customer", None) cache_params["product"] = product key, value = context_cache.get_cached_value(**cache_params) if value is not None: return value context_cache_key = "price_expiration_{shop_id}".format(**{"shop_id": context.shop.pk}) if hasattr(context, "context_cache_key"): return getattr(context, context_cache_key) from shuup.discounts.models import Discount, TimeRange if settings.SHUUP_DISCOUNTS_PER_PRODUCT_EXPIRATION_DATES: potential_discounts = get_potential_discounts_for_product(context, product, available_only=False) else: potential_discounts = Discount.objects.active(context.shop) event_dates = [] time_ranges = TimeRange.objects.filter(happy_hour__discounts__in=potential_discounts).distinct() for weekday, from_hour, to_hour in time_ranges.values_list("weekday", "from_hour", "to_hour"): event_dates.extend(get_next_dates_for_range(weekday, from_hour, to_hour)) from django.utils.timezone import now now_datetime = now() if event_dates: min_event_date = min(event_date for event_date in event_dates if event_date > now_datetime) min_event_date_timestamp = to_timestamp(min_event_date) # cache the value in the context cache, setting the timeout as the price expiration time cache_timeout = max((min_event_date - now_datetime).total_seconds(), 0) context_cache.set_cached_value(key, min_event_date_timestamp, timeout=cache_timeout) # cache the context in the context, so if it is used again it will contain the calculated value setattr(context, context_cache_key, min_event_date_timestamp) return min_event_date_timestamp
[docs] def index_linked_shop_products(discount, discounts_groups_ids=None, ignore_shop_products_ids=None): """ Reindex all shop products previously linked this is required when a shop product shouldn't be linked anymore """ from shuup.discounts.models import ShopProductCatalogDiscountsLink from shuup.discounts.modules import ProductDiscountModule if ignore_shop_products_ids is None: ignore_shop_products_ids = [] if discounts_groups_ids is None: discounts_groups_ids = [] discounts_links = ( ShopProductCatalogDiscountsLink.objects.select_related("shop_product") .filter(Q(discounts=discount), ~Q(shop_product__in=ignore_shop_products_ids)) .distinct() ) for discounts_link in discounts_links: if discounts_link.shop_product_id in ignore_shop_products_ids: continue ignore_shop_products_ids.add(discounts_link.shop_product_id) ProductCatalogDiscountedPrice.objects.filter( catalog_rule__module_identifier=ProductDiscountModule.identifier, shop=discounts_link.shop_product.shop_id, product=discounts_link.shop_product.product_id, ).delete() for supplier in discounts_link.shop_product.suppliers.all(): index_shop_product_price(discounts_link.shop_product, supplier, discounts_groups_ids)
[docs] def index_shop_product_price( shop_product: ShopProduct, supplier: Supplier, contact_groups_ids: Iterable[int] = [], ): """ Index discounts for the shop product """ from shuup.discounts.models import ShopProductCatalogDiscountsLink from shuup.discounts.modules import ProductDiscountModule default_price = shop_product.default_price_value context = PricingContext(shop=shop_product.shop, customer=AnonymousContact(), supplier=supplier) discounts = get_potential_discounts_for_product( context, shop_product.product, available_only=False, groups_ids=contact_groups_ids, all_contacts=True, ) # link this shop product to the potencial discounts discounts_link = ShopProductCatalogDiscountsLink.objects.get_or_create(shop_product=shop_product)[0] discounts_link.discounts.set(discounts) if not discounts.exists(): # delete all discounted prices ProductCatalogDiscountedPrice.objects.filter( product=shop_product.product, shop=shop_product.shop, catalog_rule__module_identifier=ProductDiscountModule.identifier, ).delete() for discount in discounts: discount_options = [default_price] if discount.discounted_price_value is not None: discount_options.append(discount.discounted_price_value) if discount.discount_amount_value is not None: discount_options.append(default_price - discount.discount_amount_value) if discount.discount_percentage is not None: discount_options.append(default_price - (default_price * discount.discount_percentage)) best_discounted_price = max(min(discount_options), 0) happy_hours_times = list( discount.happy_hours.values_list( "time_ranges__from_hour", "time_ranges__to_hour", "time_ranges__weekday", ) ) # if ther is no happy hour configured, # let's create one rule without time constraints if not happy_hours_times: happy_hours_times.append((None, None, None)) for from_hour, to_hour, weekday in happy_hours_times: catalog_rule = ProductCatalogDiscountedPriceRule.objects.get_or_create( module_identifier=ProductDiscountModule.identifier, contact_group=discount.contact_group, contact=discount.contact, valid_start_date=discount.start_datetime, valid_end_date=discount.end_datetime, valid_start_hour=from_hour, valid_end_hour=to_hour, valid_weekday=weekday, )[0] ProductCatalogDiscountedPrice.objects.update_or_create( product=shop_product.product, shop=shop_product.shop, supplier=supplier, catalog_rule=catalog_rule, defaults={"discounted_price_value": best_discounted_price}, )