Source code for shuup.core.utils.product_statistics

import datetime

from django.db.models import Case, F, Sum, When
from django.utils.translation import get_language

from shuup.core import cache
from shuup.core.models import OrderLine, OrderLineType, Product, ProductCatalogPrice
from shuup.utils.dates import to_aware


[docs] def get_best_selling_product_info(shop_ids, cutoff_days=30, supplier=None, orderable_only=True, quantity=100): shop_ids = sorted(map(int, shop_ids)) cutoff_date = datetime.date.today() - datetime.timedelta(days=cutoff_days) cache_key = "best_sellers:{!r}_{}_{}_{}_{}".format( shop_ids, cutoff_date, (supplier.pk if supplier else ""), (1 if orderable_only else 0), quantity, ) sales_data = cache.get(cache_key) if sales_data is None: valid_products = ProductCatalogPrice.objects.filter(shop__in=shop_ids) if orderable_only: valid_products = valid_products.filter(is_available=True) if supplier: valid_products = valid_products.filter(supplier=supplier) queryset = OrderLine.objects.filter( order__shop_id__in=shop_ids, order__order_date__gte=to_aware(cutoff_date), type=OrderLineType.PRODUCT, product_id__in=valid_products.values_list("product_id", flat=True).distinct(), ) if supplier: queryset = queryset.filter(supplier=supplier) sales_data = ( queryset.annotate( group_product_id=Case( When( product__variation_parent_id__isnull=False, then=F("product__variation_parent_id"), ), default=F("product_id"), ) ) .values("group_product_id") .annotate(total=Sum("quantity")) .order_by("-total")[:quantity] .values_list("group_product_id", "total") ) cache.set(cache_key, sales_data, 3 * 60 * 60) # three hours return sales_data
[docs] def get_products_ordered_with(prod, count=20, request=None, language=None): cache_key = f"ordered_with:{prod.pk}" product_ids = cache.get(cache_key) if product_ids is None: # XXX: could this be optimized more? (and does it matter?) order_ids = OrderLine.objects.filter(product=prod, type=OrderLineType.PRODUCT).values_list( "order__id", flat=True ) product_ids = ( OrderLine.objects.filter(order_id__in=order_ids) .exclude(product=prod) .distinct() .values_list("product", flat=True) ) cache.set(cache_key, set(product_ids), 4 * 60 * 60) return ( Product.objects.all_visible(request, language=language) # type: ignore .filter(id__in=product_ids) .order_by("?")[:count] )
[docs] def get_products_by_brand(prod, count=6, request=None, language=None): language = language or get_language() return list( Product.objects.all_visible(request, language) .filter(manufacturer_id__in=[prod.manufacturer_id]) .exclude(pk=prod.id) .order_by("?")[:count] )
[docs] def get_products_by_same_categories(prod, count=6, request=None, language=None): categories = prod.categories.values_list("pk", flat=True) language = language or get_language() return list( Product.objects.all_visible(request, language) .filter(categories__id__in=categories) .exclude(pk=prod.id) .order_by("?")[:count] )