from django.core.exceptions import ValidationError
from django.db.models import F
from django.utils.translation import gettext_lazy as _
from shuup.core.excs import NoProductsToShipException
from shuup.core.models import Product
from shuup.core.signals import stocks_updated
from shuup.core.stocks import ProductStockStatus
from shuup.core.suppliers import BaseSupplierModule
from shuup.core.suppliers.enums import StockAdjustmentType
from shuup.core.tasks import run_task
from shuup.core.utils import context_cache
from shuup.simple_supplier.utils import get_current_stock_value
from shuup.utils.django_compat import force_text
from shuup.utils.djangoenv import has_installed
from .models import StockAdjustment, StockCount
[docs]
class SimpleSupplierModule(BaseSupplierModule):
identifier = "simple_supplier"
name = _("Simple Supplier")
[docs]
def get_orderability_errors(self, shop_product, quantity, customer, *args, **kwargs):
"""
:param shop_product: Shop Product.
:type shop_product: shuup.core.models.ShopProduct
:param quantity: Quantity to order.
:type quantity: decimal.Decimal
:param customer: Contact.
:type user: django.contrib.auth.models.AbstractUser
:rtype: iterable[ValidationError]
"""
if shop_product.product.kind not in self.get_supported_product_kinds_values():
return
stock_status = self.get_stock_status(shop_product.product_id)
backorder_maximum = shop_product.backorder_maximum
if stock_status.error:
yield ValidationError(stock_status.error, code="stock_error")
if self.supplier.stock_managed and stock_status.stock_managed:
# Check stock availability
available_quantity = stock_status.logical_count
if backorder_maximum is not None:
# Backorders allowed up to the maximum
available_quantity += backorder_maximum
if quantity > available_quantity:
yield ValidationError(
stock_status.message or _("Error! Insufficient quantity in stock."),
code="stock_insufficient",
)
[docs]
def get_stock_statuses(self, product_ids, *args, **kwargs):
stock_counts = (
Product.objects.filter(
pk__in=product_ids,
simple_supplier_stock_count__supplier=self.supplier,
kind__in=self.get_supported_product_kinds_values(),
)
.annotate(
physical_count=F("simple_supplier_stock_count__physical_count"),
logical_count=F("simple_supplier_stock_count__logical_count"),
stock_managed=F("simple_supplier_stock_count__stock_managed"),
)
.values_list("pk", "physical_count", "logical_count", "stock_managed")
)
values = {
product_id: (physical_count or 0, logical_count or 0, stock_managed or False)
for (
product_id,
physical_count,
logical_count,
stock_managed,
) in stock_counts
}
null = (0, 0, self.supplier.stock_managed)
stati = []
for product_id in product_ids:
stock_managed = values.get(product_id, null)[2]
if stock_managed is None:
stock_managed = self.supplier.stock_manage
stati.append(
ProductStockStatus(
product_id=product_id,
physical_count=values.get(product_id, null)[0],
logical_count=values.get(product_id, null)[1],
stock_managed=stock_managed,
)
)
return {pss.product_id: pss for pss in stati}
[docs]
def adjust_stock(
self,
product_id,
delta,
purchase_price=0,
created_by=None,
type=StockAdjustmentType.INVENTORY,
*args,
**kwargs,
):
stock_count = StockCount.objects.select_related("product").get_or_create(
supplier=self.supplier,
product_id=product_id,
)[0]
if not stock_count.stock_managed or stock_count.product.kind not in self.get_supported_product_kinds_values():
# item doesn't manage stocks
return {}
adjustment = StockAdjustment.objects.create(
supplier=self.supplier,
product_id=product_id,
delta=delta,
purchase_price_value=purchase_price,
created_by=created_by,
type=type,
)
self.update_stock(product_id)
return adjustment
[docs]
def update_stock(self, product_id, *args, **kwargs):
"""
Supplier module update stock should always bump product
cache and send `shuup.core.signals.stocks_updated` signal.
"""
supplier_id = self.supplier.pk
sv, _ = StockCount.objects.select_related("product").get_or_create(
supplier_id=supplier_id, product_id=product_id
)
# kind not supported
if sv.product.kind not in self.get_supported_product_kinds_values():
return
# item doesn't manage stocks
if not sv.stock_managed:
# make sure to index products either way
run_task(
"shuup.simple_supplier.tasks.index_product",
product=product_id,
supplier=self.supplier.pk,
)
return
values = get_current_stock_value(supplier_id=supplier_id, product_id=product_id)
sv.logical_count = values["logical_count"]
sv.physical_count = values["physical_count"]
latest_event = StockAdjustment.objects.filter(
supplier=supplier_id, product=product_id, type=StockAdjustmentType.INVENTORY
).last()
if latest_event:
sv.stock_value_value = latest_event.purchase_price_value * sv.logical_count
# TODO: get rid of this and move to shuup.notify app instead, through signals
if self.supplier.stock_managed and has_installed("shuup.notify"):
if sv.alert_limit and sv.physical_count < sv.alert_limit:
product = Product.objects.filter(id=product_id).first()
if product:
from .notify_events import AlertLimitReached
for shop in self.supplier.shops.all():
supplier_email = self.supplier.contact_address.email if self.supplier.contact_address else ""
shop_email = shop.contact_address.email if shop.contact_address else ""
AlertLimitReached(
supplier=self.supplier,
product=product,
shop_email=shop_email,
supplier_email=supplier_email,
).run(shop=shop)
sv.save(update_fields=("logical_count", "physical_count", "stock_value_value"))
context_cache.bump_cache_for_product(product_id)
stocks_updated.send(
type(self),
shops=self.supplier.shops.all(),
product_ids=[product_id],
supplier=self.supplier,
)
run_task(
"shuup.simple_supplier.tasks.index_product",
product=product_id,
supplier=self.supplier.pk,
)
[docs]
def ship_products(self, shipment, product_quantities, *args, **kwargs):
# stocks are managed, do stocks check
if self.supplier.stock_managed:
insufficient_stocks = {}
for product, quantity in product_quantities.items():
if quantity > 0 and product.kind in self.get_supported_product_kinds_values():
stock_status = self.get_stock_status(product.pk)
if stock_status.stock_managed and stock_status.physical_count < quantity:
insufficient_stocks[product] = stock_status.physical_count
if insufficient_stocks:
formatted_counts = [
_("%(name)s (physical stock: %(quantity)s)")
% {"name": force_text(name), "quantity": force_text(int(quantity))}
for (name, quantity) in insufficient_stocks.items()
]
raise NoProductsToShipException(
_("Insufficient physical stock count for the following products: `%(product_counts)s`.")
% {"product_counts": ", ".join(formatted_counts)}
)
for product, quantity in product_quantities.items():
if quantity == 0 or product.kind not in self.get_supported_product_kinds_values():
continue
sp = shipment.products.create(product=product, quantity=quantity)
sp.cache_values()
sp.save()
shipment.cache_values()
shipment.save()