Source code for shuup.core.models._orders

import datetime
from collections import defaultdict
from decimal import Decimal
from typing import TYPE_CHECKING

import six
from django.conf import settings
from django.contrib.auth import get_user_model
from django.core.exceptions import ObjectDoesNotExist, ValidationError
from django.db import models
from django.db.models import Q
from django.db.transaction import atomic
from django.utils.crypto import get_random_string
from django.utils.translation import gettext_lazy as _
from enumfields import Enum, EnumIntegerField
from jsonfield import JSONField
from parler.managers import TranslatableQuerySet
from parler.models import TranslatableModel, TranslatedFields

from shuup.core import taxing
from shuup.core.excs import (
    InvalidOrderStatusError,
    NoPaymentToCreateException,
    NoProductsToShipException,
    NoRefundToCreateException,
    NoShippingAddressException,
)
from shuup.core.fields import CurrencyField, InternalIdentifierField, LanguageField, MoneyValueField, UnsavedForeignKey
from shuup.core.pricing import TaxfulPrice, TaxlessPrice
from shuup.core.settings_provider import ShuupSettings
from shuup.core.signals import (
    order_changed,
    order_status_changed,
    payment_created,
    refund_created,
    shipment_created,
    shipment_created_and_processed,
)
from shuup.utils.analog import LogEntryKind, define_log_model
from shuup.utils.dates import local_now, to_aware
from shuup.utils.django_compat import force_text
from shuup.utils.money import Money
from shuup.utils.properties import MoneyPropped, TaxfulPriceProperty, TaxlessPriceProperty

from ._order_lines import OrderLineType
from ._order_utils import get_order_identifier, get_reference_number
from ._products import Product
from ._suppliers import Supplier

if TYPE_CHECKING:  # pragma: no cover
    from ._service_base import Service, ServiceBehaviorComponent  # noqa: F401

User = get_user_model()


class PaymentStatus(Enum):
    NOT_PAID = 0
    PARTIALLY_PAID = 1
    FULLY_PAID = 2
    CANCELED = 3
    DEFERRED = 4

    class Labels:
        NOT_PAID = _("not paid")
        PARTIALLY_PAID = _("partially paid")
        FULLY_PAID = _("fully paid")
        CANCELED = _("canceled")
        DEFERRED = _("deferred")


class ShippingStatus(Enum):
    NOT_SHIPPED = 0
    PARTIALLY_SHIPPED = 1
    FULLY_SHIPPED = 2

    class Labels:
        NOT_SHIPPED = _("not shipped")
        PARTIALLY_SHIPPED = _("partially shipped")
        FULLY_SHIPPED = _("fully shipped")


class OrderStatusRole(Enum):
    NONE = 0
    INITIAL = 1
    COMPLETE = 2
    CANCELED = 3
    PROCESSING = 4
    # TODO: Failed state?

    class Labels:
        NONE = _("none")
        INITIAL = _("Initial")
        COMPLETE = _("Complete")
        CANCELED = _("Canceled")
        PROCESSING = _("Processing")


class DefaultOrderStatus(Enum):
    NONE = "none"
    INITIAL = "initial"
    COMPLETE = "complete"
    CANCELED = "canceled"
    PROCESSING = "processing"
    # TODO: Failed state?

    class Labels:
        NONE = _("none")
        INITIAL = _("Received")
        COMPLETE = _("Complete")
        CANCELED = _("Canceled")
        PROCESSING = _("In Progress")


class OrderStatusQuerySet(TranslatableQuerySet):
    def _default_for_role(self, role):
        """
        Get the default order status for the given role.

        :param role: The role to look for.
        :type role: OrderStatusRole
        :return: The OrderStatus.
        :rtype: OrderStatus
        """
        try:
            return self.get(default=True, role=role)
        except ObjectDoesNotExist:
            raise ObjectDoesNotExist(
                "Error! No default `{}` OrderStatus exists.".format(getattr(role, "label", role))
            ) from None

    def get_default_initial(self):
        return self._default_for_role(OrderStatusRole.INITIAL)

    def get_default_processing(self):
        return self._default_for_role(OrderStatusRole.PROCESSING)

    def get_default_canceled(self):
        return self._default_for_role(OrderStatusRole.CANCELED)

    def get_default_complete(self):
        return self._default_for_role(OrderStatusRole.COMPLETE)


class OrderStatus(TranslatableModel):
    identifier = InternalIdentifierField(
        db_index=True,
        blank=False,
        editable=True,
        unique=True,
        help_text=_("Internal identifier for status. This is used to identify and distinguish the statuses in Shuup."),
    )
    ordering = models.IntegerField(
        db_index=True,
        default=0,
        verbose_name=_("ordering"),
        help_text=_("The processing order of statuses. Default is always processed first."),
    )
    role = EnumIntegerField(
        OrderStatusRole,
        default=OrderStatusRole.NONE,
        db_index=True,
        verbose_name=_("role"),
        help_text=_("The role of this status. One role can have multiple order statuses."),
    )  # type: ignore
    default = models.BooleanField(
        default=False,
        db_index=True,
        verbose_name=_("default"),
        help_text=_("Defines if the status should be considered as default. Default is always processed first."),
    )

    is_active = models.BooleanField(
        default=True,
        db_index=True,
        verbose_name=_("is active"),
        help_text=_("Defines if the status is usable."),
    )

    objects = OrderStatusQuerySet.as_manager()

    translations = TranslatedFields(
        name=models.CharField(
            verbose_name=_("name"),
            max_length=64,
            help_text=_("Name of the order status."),
        ),
        public_name=models.CharField(
            verbose_name=_("public name"),
            max_length=64,
            help_text=_("The name shown to the customers in shop front."),
        ),
    )

    allowed_next_statuses = models.ManyToManyField(
        "self", verbose_name=_("allowed next statuses"), blank=True, symmetrical=False
    )

    visible_for_customer = models.BooleanField(
        default=True,
        verbose_name=_("is visible for the user"),
        help_text=_("Indicates whether this status is visible for the customers"),
    )

    class Meta:
        unique_together = ("identifier", "role")
        verbose_name = _("order status")
        verbose_name_plural = _("order statuses")

    def __str__(self):
        return force_text(self.safe_translation_getter("name", default=self.identifier))

[docs] def save(self, *args, **kwargs): super().save(*args, **kwargs) if self.default and self.role != OrderStatusRole.NONE: # If this status is the default, make the others for this role non-default. OrderStatus.objects.filter(role=self.role).exclude(pk=self.pk).update(default=False)
class OrderStatusHistory(models.Model): order = models.ForeignKey( "Order", related_name="order_history", blank=True, null=True, on_delete=models.PROTECT, verbose_name=_("order id"), ) previous_order_status = models.ForeignKey( "OrderStatus", related_name="previous_order_status", blank=True, null=True, on_delete=models.PROTECT, verbose_name=_("previous order status"), ) next_order_status = models.ForeignKey( "OrderStatus", related_name="next_order_status", on_delete=models.PROTECT, verbose_name=_("next order status"), ) created_on = models.DateTimeField(auto_now_add=True, editable=False, verbose_name=_("created on")) description = models.TextField(blank=True, null=True, verbose_name=_("description")) creator = UnsavedForeignKey( settings.AUTH_USER_MODEL, related_name="order_status_history_created", blank=True, null=True, on_delete=models.PROTECT, verbose_name=_("creating user"), ) class OrderStatusManager:
[docs] def __init__(self): self.default_statuses = [ { "name": _("Received"), "public_name": _("Received"), "role": OrderStatusRole.INITIAL, "identifier": DefaultOrderStatus.INITIAL.value, "default": True, "is_active": True, }, { "name": _("In Progress"), "public_name": _("In Progress"), "role": OrderStatusRole.PROCESSING, "identifier": DefaultOrderStatus.PROCESSING.value, "default": True, "is_active": True, }, { "name": _("Complete"), "public_name": _("Complete"), "role": OrderStatusRole.COMPLETE, "identifier": DefaultOrderStatus.COMPLETE.value, "default": True, "is_active": True, }, { "name": _("Canceled"), "public_name": _("Canceled"), "role": OrderStatusRole.CANCELED, "identifier": DefaultOrderStatus.CANCELED.value, "default": True, "is_active": True, }, ]
[docs] def is_default(self, status_object): return any(s["identifier"] == status_object.identifier for s in self.default_statuses)
[docs] def ensure_default_statuses(self): """ Ensure Default Statuses. It is important to ensure that default statuses are always available. This method will ensure this. """ # These values are based on the old Shuup data update_map = { "none": DefaultOrderStatus.NONE.value, "recv": DefaultOrderStatus.INITIAL.value, "prog": DefaultOrderStatus.PROCESSING.value, "comp": DefaultOrderStatus.COMPLETE.value, "canc": DefaultOrderStatus.CANCELED.value, } for status in OrderStatus.objects.all(): if status.identifier not in update_map: continue status.identifier = update_map[status.identifier] status.save() for i, defaults in enumerate(self.default_statuses): defaults["ordering"] = i status = OrderStatus.objects.filter(identifier=defaults["identifier"]).first() if status: defaults.pop("name") defaults.pop("public_name") for k, v in six.iteritems(defaults): setattr(status, k, v) status.save() else: OrderStatus.objects.create(**defaults) self.ensure_allowed_next_statuses()
[docs] def ensure_allowed_next_statuses(self): """ Populates allowed_next_statuses with default values. """ # Ensure default status transitions are always set up # Set up INITIAL status transitions try: initial_status = OrderStatus.objects.filter(identifier=DefaultOrderStatus.INITIAL.value).first() if initial_status: allowed_statuses = OrderStatus.objects.filter( identifier__in=[ DefaultOrderStatus.PROCESSING.value, DefaultOrderStatus.COMPLETE.value, DefaultOrderStatus.CANCELED.value, ] ) # Clear existing and add new transitions initial_status.allowed_next_statuses.clear() initial_status.allowed_next_statuses.add(*allowed_statuses) # Set up PROCESSING status transitions processing_status = OrderStatus.objects.filter(identifier=DefaultOrderStatus.PROCESSING.value).first() if processing_status: allowed_statuses = OrderStatus.objects.filter( identifier__in=[ DefaultOrderStatus.COMPLETE.value, DefaultOrderStatus.CANCELED.value, ] ) # Clear existing and add new transitions processing_status.allowed_next_statuses.clear() processing_status.allowed_next_statuses.add(*allowed_statuses) except Exception: # Fallback in case of any database issues pass
class OrderQuerySet(models.QuerySet): def paid(self): return self.filter(payment_status=PaymentStatus.FULLY_PAID) def incomplete(self): return self.filter( status__role__in=( OrderStatusRole.NONE, OrderStatusRole.INITIAL, OrderStatusRole.PROCESSING, ) ) def complete(self): return self.filter(status__role=OrderStatusRole.COMPLETE) # TODO: read status def valid(self): return self.exclude(Q(deleted=True) | Q(status__role=OrderStatusRole.CANCELED)) # TODO: read status def since(self, days, tz=None): since_date = (local_now(tz) - datetime.timedelta(days=days)).date() since = to_aware(since_date, tz=tz) return self.in_date_range(since, None) def in_date_range(self, start, end): """ Limit to orders is given date range. :type start: datetime.datetime|None :param start: Start time, inclusive. :type end: datetime.datetime|None :param end: End time, inclusive. """ result = self if start: result = result.filter(order_date__gte=to_aware(start)) if end: result = result.filter(order_date__lte=to_aware(end)) return result class Order(MoneyPropped, models.Model): # Identification shop = UnsavedForeignKey("Shop", on_delete=models.PROTECT, verbose_name=_("shop")) created_on = models.DateTimeField(auto_now_add=True, editable=False, db_index=True, verbose_name=_("created on")) modified_on = models.DateTimeField(auto_now=True, editable=False, db_index=True, verbose_name=_("modified on")) identifier = InternalIdentifierField(unique=True, db_index=True, verbose_name=_("order identifier")) # TODO: label is actually a choice field, need to check migrations/choice deconstruction label = models.CharField(max_length=32, db_index=True, verbose_name=_("label")) # The key shouldn't be possible to deduce (i.e. it should be random), but it is # not a secret. (It could, however, be used as key material for an actual secret.) key = models.CharField(max_length=32, unique=True, blank=False, verbose_name=_("key")) reference_number = models.CharField( max_length=64, db_index=True, unique=True, blank=True, null=True, verbose_name=_("reference number"), ) # Contact information customer = UnsavedForeignKey( "Contact", related_name="customer_orders", blank=True, null=True, on_delete=models.PROTECT, verbose_name=_("customer"), ) orderer = UnsavedForeignKey( "PersonContact", related_name="orderer_orders", blank=True, null=True, on_delete=models.PROTECT, verbose_name=_("orderer"), ) billing_address = models.ForeignKey( "ImmutableAddress", related_name="billing_orders", blank=True, null=True, on_delete=models.PROTECT, verbose_name=_("billing address"), ) shipping_address = models.ForeignKey( "ImmutableAddress", related_name="shipping_orders", blank=True, null=True, on_delete=models.PROTECT, verbose_name=_("shipping address"), ) tax_number = models.CharField(max_length=64, blank=True, verbose_name=_("tax number")) phone = models.CharField(max_length=64, blank=True, verbose_name=_("phone")) email = models.EmailField(max_length=128, blank=True, verbose_name=_("email address")) # Customer related information that might change after order, but is important # for accounting and/or reports later. account_manager = models.ForeignKey( "PersonContact", blank=True, null=True, on_delete=models.PROTECT, verbose_name=_("account manager"), ) customer_groups = models.ManyToManyField( "ContactGroup", related_name="customer_group_orders", verbose_name=_("customer groups"), blank=True, ) tax_group = models.ForeignKey( "CustomerTaxGroup", blank=True, null=True, on_delete=models.PROTECT, verbose_name=_("tax group"), ) # Status creator = UnsavedForeignKey( settings.AUTH_USER_MODEL, related_name="orders_created", blank=True, null=True, on_delete=models.PROTECT, verbose_name=_("creating user"), ) modified_by = UnsavedForeignKey( settings.AUTH_USER_MODEL, related_name="orders_modified", blank=True, null=True, on_delete=models.PROTECT, verbose_name=_("modifier user"), ) deleted = models.BooleanField(db_index=True, default=False, verbose_name=_("deleted")) status = UnsavedForeignKey("OrderStatus", verbose_name=_("status"), on_delete=models.PROTECT) payment_status = EnumIntegerField( PaymentStatus, db_index=True, default=PaymentStatus.NOT_PAID, verbose_name=_("payment status") ) # type: ignore shipping_status = EnumIntegerField( ShippingStatus, db_index=True, default=ShippingStatus.NOT_SHIPPED, verbose_name=_("shipping status"), ) # type: ignore # Methods payment_method = UnsavedForeignKey( "PaymentMethod", related_name="payment_orders", blank=True, null=True, default=None, on_delete=models.PROTECT, verbose_name=_("payment method"), ) payment_method_name = models.CharField( max_length=100, blank=True, default="", verbose_name=_("payment method name") ) payment_data = JSONField(blank=True, null=True, verbose_name=_("payment data")) shipping_method = UnsavedForeignKey( "ShippingMethod", related_name="shipping_orders", blank=True, null=True, default=None, on_delete=models.PROTECT, verbose_name=_("shipping method"), ) shipping_method_name = models.CharField( max_length=100, blank=True, default="", verbose_name=_("shipping method name") ) shipping_data = JSONField(blank=True, null=True, verbose_name=_("shipping data")) extra_data = JSONField(blank=True, null=True, verbose_name=_("extra data")) # Money stuff taxful_total_price = TaxfulPriceProperty("taxful_total_price_value", "currency") taxless_total_price = TaxlessPriceProperty("taxless_total_price_value", "currency") taxful_total_price_value = MoneyValueField(editable=False, verbose_name=_("grand total"), default=0) taxless_total_price_value = MoneyValueField(editable=False, verbose_name=_("taxless total"), default=0) currency = CurrencyField(verbose_name=_("currency")) prices_include_tax = models.BooleanField(verbose_name=_("prices include tax")) display_currency = CurrencyField(blank=True, verbose_name=_("display currency")) display_currency_rate = models.DecimalField( max_digits=36, decimal_places=9, default=1, verbose_name=_("display currency rate"), ) # Other ip_address = models.GenericIPAddressField(null=True, blank=True, verbose_name=_("IP address")) # `order_date` is not `auto_now_add` for backdating purposes order_date = models.DateTimeField(editable=False, db_index=True, verbose_name=_("order date")) payment_date = models.DateTimeField(null=True, editable=False, verbose_name=_("payment date")) language = LanguageField(blank=True, verbose_name=_("language")) customer_comment = models.TextField(blank=True, verbose_name=_("customer comment")) admin_comment = models.TextField(blank=True, verbose_name=_("admin comment/notes")) require_verification = models.BooleanField(default=False, verbose_name=_("requires verification")) all_verified = models.BooleanField(default=False, verbose_name=_("all lines verified")) marketing_permission = models.BooleanField(default=False, verbose_name=_("marketing permission")) _codes = JSONField(blank=True, null=True, verbose_name=_("codes")) common_select_related = ("billing_address",) objects = OrderQuerySet.as_manager() class Meta: ordering = ("-id",) verbose_name = _("order") verbose_name_plural = _("orders") def __str__(self): # pragma: no cover if self.billing_address_id: name = self.billing_address.name else: name = "-" if ShuupSettings.get_setting("SHUUP_ENABLE_MULTIPLE_SHOPS"): return f"Order {self.identifier} ({self.shop.name}, {name})" else: return f"Order {self.identifier} ({name})" @property def codes(self): return list(self._codes or []) @codes.setter def codes(self, value): codes = [] for code in value: if not isinstance(code, six.text_type): raise TypeError("Error! `codes` must be a list of strings.") codes.append(code) self._codes = codes
[docs] def cache_prices(self): taxful_total = TaxfulPrice(0, self.currency) taxless_total = TaxlessPrice(0, self.currency) for line in self.lines.all().prefetch_related("taxes"): taxful_total += line.taxful_price taxless_total += line.taxless_price self.taxful_total_price = taxful_total self.taxless_total_price = taxless_total
def _cache_contact_values(self): sources = [ self.shipping_address, self.billing_address, self.customer, self.orderer, ] fields = ("tax_number", "email", "phone") for field in fields: if getattr(self, field, None): continue for source in sources: val = getattr(source, field, None) if val: setattr(self, field, val) break if not self.id and self.customer: # These fields are used for reporting and should not # change after create even if empty at the moment of ordering. self.account_manager = getattr(self.customer, "account_manager", None) self.tax_group = self.customer.tax_group def _cache_contact_values_post_create(self): if self.customer: # These fields are used for reporting and should not # change after create even if empty at the moment of ordering. self.customer_groups.set(self.customer.groups.all()) def _cache_values(self): self._cache_contact_values() if not self.label: self.label = settings.SHUUP_DEFAULT_ORDER_LABEL if not self.currency: self.currency = self.shop.currency if not self.prices_include_tax: self.prices_include_tax = self.shop.prices_include_tax if not self.display_currency: self.display_currency = self.currency self.display_currency_rate = 1 if self.shipping_method_id and not self.shipping_method_name: self.shipping_method_name = self.shipping_method.safe_translation_getter( "name", default=self.shipping_method.identifier, any_language=True ) if self.payment_method_id and not self.payment_method_name: self.payment_method_name = self.payment_method.safe_translation_getter( "name", default=self.payment_method.identifier, any_language=True ) if not self.key: self.key = get_random_string(32) if not self.modified_by: self.modified_by = self.creator def _save_identifiers(self): self.identifier = f"{get_order_identifier(self)}" self.reference_number = get_reference_number(self) super().save( update_fields=( "identifier", "reference_number", ) )
[docs] def full_clean(self, exclude=None, validate_unique=True): self._cache_values() return super().full_clean(exclude, validate_unique)
[docs] def save(self, *args, **kwargs): if not self.creator_id: if not settings.SHUUP_ALLOW_ANONYMOUS_ORDERS: raise ValidationError( "Error! Anonymous (userless) orders are not allowed " "when `SHUUP_ALLOW_ANONYMOUS_ORDERS` is not enabled." ) self._cache_values() first_save = not self.pk if self.status is None: self.status = OrderStatus.objects.get_default_initial() super().save(*args, **kwargs) if first_save: # Have to do a double save the first time around to be able to save identifiers self._save_identifiers() self._cache_contact_values_post_create() self.change_status(next_status=self.status, user=self.creator, first_save=True) order_changed.send(type(self), order=self)
[docs] def delete(self, using=None): if not self.deleted: self.deleted = True self.add_log_entry("Success! Deleted (soft).", kind=LogEntryKind.DELETION) # Bypassing local `save()` on purpose. super().save(update_fields=("deleted",), using=using)
[docs] def set_canceled(self): if self.status.role != OrderStatusRole.CANCELED: self.status = OrderStatus.objects.get_default_canceled() self.save()
def _set_paid(self): if self.payment_status != PaymentStatus.FULLY_PAID: # pragma: no branch self.add_log_entry(_("Order was marked as paid.")) self.payment_status = PaymentStatus.FULLY_PAID self.payment_date = local_now() self.save() def _set_partially_paid(self): if self.payment_status != PaymentStatus.PARTIALLY_PAID: self.add_log_entry(_("Order was marked as partially paid.")) self.payment_status = PaymentStatus.PARTIALLY_PAID self.save()
[docs] def is_paid(self): return self.payment_status == PaymentStatus.FULLY_PAID
[docs] def is_partially_paid(self): return self.payment_status == PaymentStatus.PARTIALLY_PAID
[docs] def is_deferred(self): return self.payment_status == PaymentStatus.DEFERRED
[docs] def is_not_paid(self): return self.payment_status == PaymentStatus.NOT_PAID
[docs] def get_total_paid_amount(self): amounts = self.payments.values_list("amount_value", flat=True) return Money(sum(amounts, Decimal(0)), self.currency)
[docs] def get_total_unpaid_amount(self): difference = self.taxful_total_price.amount - self.get_total_paid_amount() return max(difference, Money(0, self.currency))
[docs] def can_create_payment(self): zero = Money(0, self.currency) return not (self.is_paid() or self.is_canceled()) and self.get_total_unpaid_amount() > zero
[docs] def create_payment(self, amount, payment_identifier=None, description=""): """ Create a payment with a given amount for this order. If the order already has payments and sum of their amounts is equal or greater than `self.taxful_total_price` and the order is not a zero price order, an exception is raised. If the end sum of all payments is equal or greater than `self.taxful_total_price`, then the order is marked as paid. :param amount: Amount of the payment to be created. :type amount: Money :param payment_identifier: Identifier of the created payment. If not set, default value of `gateway_id:order_id:number` will be used (where `number` is a number of payments in the order). :type payment_identifier: str|None :param description: Description of the payment. Will be set to `method` property of the created payment. :type description: str :returns: The created Payment object :rtype: shuup.core.models.Payment """ assert isinstance(amount, Money) assert amount.currency == self.currency payments = self.payments.order_by("created_on") total_paid_amount = self.get_total_paid_amount() if total_paid_amount >= self.taxful_total_price.amount and self.taxful_total_price: raise NoPaymentToCreateException( f"Error! Order {self.pk} has already been fully paid ({total_paid_amount}" + f" >= {self.taxful_total_price})." ) if not payment_identifier: number = payments.count() + 1 payment_identifier = f"{self.id}:{number}" # type: ignore payment = self.payments.create( # type: ignore payment_identifier=payment_identifier, amount_value=amount.value, description=description, ) if self.get_total_paid_amount() >= self.taxful_total_price.amount: # type: ignore self._set_paid() # also calls save else: self._set_partially_paid() payment_created.send(sender=type(self), order=self, payment=payment) return payment
[docs] def can_create_shipment(self): return self.get_unshipped_products() and not self.is_canceled() and self.shipping_address
# TODO: Rethink either the usage of shipment parameter or renaming the method for 3.0
[docs] @atomic def create_shipment(self, product_quantities, supplier=None, shipment=None): """ Create a shipment for this order from `product_quantities`. `product_quantities` is expected to be a dict, which maps Product instances to quantities. Only quantities over 0 are taken into account, and if the mapping is empty or has no quantity value over 0, `NoProductsToShipException` will be raised. Orders without a shipping address defined, will raise `NoShippingAddressException`. :param product_quantities: a dict mapping Product instances to quantities to ship. :type product_quantities: dict[shuup.shop.models.Product, decimal.Decimal] :param supplier: Optional Supplier for this product. No validation is made. :param shipment: Optional unsaved Shipment for ShipmentProduct's. If not given Shipment is created based on supplier parameter. :raises: NoProductsToShipException, NoShippingAddressException :return: Saved, complete Shipment object. :rtype: shuup.core.models.Shipment """ if not product_quantities or not any(quantity > 0 for quantity in product_quantities.values()): raise NoProductsToShipException( "Error! No products to ship (`quantities` is empty or has no quantity over 0)." ) if self.shipping_address is None: raise NoShippingAddressException("Error! Shipping address is not defined for this order.") assert supplier or shipment if shipment: assert shipment.order == self else: from ._shipments import Shipment shipment = Shipment(order=self, supplier=supplier) shipment.save() if not supplier: supplier = shipment.supplier supplier.ship_products(shipment, product_quantities) self.add_log_entry(_("Success! Shipment #%d was created.") % shipment.id) self.update_shipping_status() shipment_created.send(sender=type(self), order=self, shipment=shipment) shipment_created_and_processed.send(sender=type(self), order=self, shipment=shipment) return shipment
[docs] def can_create_refund(self, supplier=None): unrefunded_amount = self.get_total_unrefunded_amount(supplier) unrefunded_quantity = self.get_total_unrefunded_quantity(supplier) return ( (unrefunded_amount.value > 0 or unrefunded_quantity > 0) and not self.is_canceled() and (self.payment_status not in (PaymentStatus.NOT_PAID, PaymentStatus.CANCELED)) )
[docs] @atomic def create_refund(self, refund_data, created_by=None, supplier=None): """ Create a refund if passed a list of refund line data. Refund line data is simply a list of dictionaries where each dictionary contains data for a particular refund line. Additionally, if the parent line is of `enum` type `OrderLineType.PRODUCT` and the `restock_products` boolean flag is set to `True`, the products will be restocked with the exact amount set in the order supplier's `quantity` field. :param refund_data: List of dicts containing refund data. :type refund_data: [dict] :param created_by: Refund creator's user instance, used for adjusting supplier stock. :type created_by: django.contrib.auth.User|None """ tax_module = taxing.get_tax_module() refund_lines = tax_module.create_refund_lines(self, supplier, created_by, refund_data) self.cache_prices() self.save() self.update_shipping_status() self.update_payment_status() refund_created.send(sender=type(self), order=self, refund_lines=refund_lines)
[docs] def create_full_refund(self, restock_products=False, created_by=None): """ Create a full refund for entire order content, with the option of restocking stocked products. :param restock_products: Boolean indicating whether to also restock the products. :param created_by: Refund creator's user instance, used for adjusting supplier stock. :type restock_products: bool|False """ if self.has_refunds(): raise NoRefundToCreateException self.cache_prices() line_data = [ { "line": line, "quantity": line.quantity, "amount": line.taxful_price.amount, "restock_products": restock_products, } for line in self.lines.filter(quantity__gt=0) if line.type != OrderLineType.REFUND ] self.create_refund(line_data, created_by)
[docs] def get_total_refunded_amount(self, supplier=None): refunds = self.lines.refunds() if supplier: refunds = refunds.filter(Q(parent_line__supplier=supplier) | Q(supplier=supplier)) total = sum([line.taxful_price.amount.value for line in refunds]) return Money(-total, self.currency)
[docs] def get_total_unrefunded_amount(self, supplier=None): if supplier: total_refund_amount = sum( [ line.max_refundable_amount.value for line in self.lines.filter(supplier=supplier).exclude(type=OrderLineType.REFUND) ] ) arbitrary_refunds = abs( sum( [ refund_line.taxful_price.value for refund_line in self.lines.filter( supplier=supplier, parent_line__isnull=True, type=OrderLineType.REFUND, ) ] ) ) return ( Money(max(total_refund_amount - arbitrary_refunds, 0), self.currency) if total_refund_amount else Money(0, self.currency) ) return max(self.taxful_total_price.amount, Money(0, self.currency))
[docs] def get_total_unrefunded_quantity(self, supplier=None): queryset = self.lines.all() if supplier: queryset = queryset.filter(supplier=supplier) return sum([line.max_refundable_quantity for line in queryset])
[docs] def get_total_tax_amount(self): return sum((line.tax_amount for line in self.lines.all()), Money(0, self.currency))
[docs] def has_refunds(self): return self.lines.refunds().exists()
[docs] def create_shipment_of_all_products(self, supplier=None): """ Create a shipment of all the products in this Order, no matter whether or not any have been previously marked as shipped or not. See the documentation for `create_shipment`. :param supplier: The Supplier to use. If `None`, the first supplier in the order is used. (If several are in the order, this fails.) :return: Saved, complete Shipment object. :rtype: shuup.shop.models.Shipment """ from ._products import ShippingMode suppliers_to_product_quantities = defaultdict(lambda: defaultdict(lambda: 0)) lines = self.lines.filter(type=OrderLineType.PRODUCT, product__shipping_mode=ShippingMode.SHIPPED).values_list( "supplier_id", "product_id", "quantity" ) for supplier_id, product_id, quantity in lines: if product_id: suppliers_to_product_quantities[supplier_id][product_id] += quantity if not suppliers_to_product_quantities: raise NoProductsToShipException("Error! Could not find any products to ship.") if supplier is None: if len(suppliers_to_product_quantities) > 1: # pragma: no cover raise ValueError( "Error! `create_shipment_of_all_products` can be used only when there is a single supplier." ) supplier_id, quantities = suppliers_to_product_quantities.popitem() supplier = Supplier.objects.get(pk=supplier_id) else: quantities = suppliers_to_product_quantities[supplier.id] products = {product.pk: product for product in Product.objects.filter(pk__in=quantities.keys())} quantities = {products[product_id]: quantity for (product_id, quantity) in quantities.items()} return self.create_shipment(quantities, supplier=supplier)
[docs] def check_all_verified(self): if not self.all_verified: new_all_verified = not self.lines.filter(verified=False).exists() if new_all_verified: self.all_verified = True if self.require_verification: self.add_log_entry(_("All rows requiring verification have been verified.")) self.require_verification = False self.save() return self.all_verified
[docs] def get_purchased_attachments(self): from ._product_media import ProductMedia if self.payment_status != PaymentStatus.FULLY_PAID: return ProductMedia.objects.none() prods = self.lines.exclude(product=None).values_list("product_id", flat=True) return ProductMedia.objects.filter(product__in=prods, enabled=True, purchased=True)
[docs] def get_tax_summary(self): """ :rtype: taxing.TaxSummary """ all_line_taxes = [] untaxed = TaxlessPrice(0, self.currency) for line in self.lines.all(): line_taxes = list(line.taxes.all()) all_line_taxes.extend(line_taxes) if not line_taxes: untaxed += line.taxless_price return taxing.TaxSummary.from_line_taxes(all_line_taxes, untaxed)
[docs] def get_product_ids_and_quantities(self, supplier=None): lines = self.lines.filter(type=OrderLineType.PRODUCT) if supplier: supplier_id = supplier if isinstance(supplier, six.integer_types) else supplier.pk lines = lines.filter(supplier_id=supplier_id) quantities = defaultdict(lambda: 0) for product_id, quantity in lines.values_list("product_id", "quantity"): quantities[product_id] += quantity return dict(quantities)
[docs] def has_products(self): return self.lines.products().exists()
[docs] def has_products_requiring_shipment(self, supplier=None): from ._products import ShippingMode lines = self.lines.products().filter(product__shipping_mode=ShippingMode.SHIPPED) if supplier: supplier_id = supplier if isinstance(supplier, six.integer_types) else supplier.pk lines = lines.filter(supplier_id=supplier_id) return lines.exists()
[docs] def is_complete(self): return self.status.role == OrderStatusRole.COMPLETE
[docs] def can_set_complete(self): return not (self.is_complete() or self.is_canceled() or bool(self.get_unshipped_products()))
[docs] def is_fully_shipped(self): return self.shipping_status == ShippingStatus.FULLY_SHIPPED
[docs] def is_partially_shipped(self): return self.shipping_status == ShippingStatus.PARTIALLY_SHIPPED
[docs] def is_canceled(self): return self.status.role == OrderStatusRole.CANCELED
[docs] def can_set_canceled(self): canceled = self.status.role == OrderStatusRole.CANCELED paid = self.is_paid() shipped = self.shipping_status != ShippingStatus.NOT_SHIPPED return not (canceled or paid or shipped)
[docs] def update_shipping_status(self): status_before_update = self.shipping_status shipments_count = self.shipments.all_except_deleted().out_only().count() has_unshipped_products = bool(self.get_unshipped_products()) if shipments_count: # get the number of sent shipments sent_shipments_count = self.get_sent_shipments().out_only().count() # nothing sent if sent_shipments_count == 0: self.shipping_status = ShippingStatus.NOT_SHIPPED # fully sent and no unshipped products elif shipments_count == sent_shipments_count and not has_unshipped_products: self.shipping_status = ShippingStatus.FULLY_SHIPPED else: self.shipping_status = ShippingStatus.PARTIALLY_SHIPPED else: self.shipping_status = ShippingStatus.NOT_SHIPPED if status_before_update != self.shipping_status: self.add_log_entry(_(f"New shipping status is set to: {self.shipping_status}.")) self.save(update_fields=("shipping_status",))
[docs] def update_payment_status(self): status_before_update = self.payment_status if self.get_total_unpaid_amount().value == 0: self.payment_status = PaymentStatus.FULLY_PAID elif self.get_total_paid_amount().value > 0: self.payment_status = PaymentStatus.PARTIALLY_PAID elif self.payment_status != PaymentStatus.DEFERRED: # Do not make deferred here not paid self.payment_status = PaymentStatus.NOT_PAID if status_before_update != self.payment_status: self.add_log_entry(_(f"New payment status is set to: {self.payment_status}.")) self.save(update_fields=("payment_status",))
[docs] def get_known_additional_data(self): """ Get a list of "known additional data" in this order's `payment_data`, `shipping_data` and `extra_data`. The list is returned in the order the fields are specified in the settings entries for said known keys. `dict(that_list)` can of course be used to "flatten" the list into a dict. :return: list of 2-tuples. """ output = [] for data_dict, name_mapping in ( (self.payment_data, settings.SHUUP_ORDER_KNOWN_PAYMENT_DATA_KEYS), (self.shipping_data, settings.SHUUP_ORDER_KNOWN_SHIPPING_DATA_KEYS), (self.extra_data, settings.SHUUP_ORDER_KNOWN_EXTRA_DATA_KEYS), ): if hasattr(data_dict, "get"): for key, display_name in name_mapping: if key in data_dict: output.append((force_text(display_name), data_dict[key])) return output
[docs] def get_product_summary(self, supplier=None): """Return a dict of product IDs -> {ordered, unshipped, refunded, shipped, line_text, suppliers}""" supplier_id = (supplier if isinstance(supplier, six.integer_types) else supplier.pk) if supplier else None products = defaultdict(lambda: defaultdict(lambda: Decimal(0))) def _append_suppliers_info(product_id, supplier): if not products[product_id]["suppliers"]: products[product_id]["suppliers"] = [supplier] elif supplier not in products[product_id]["suppliers"]: products[product_id]["suppliers"].append(supplier) # Quantity for all orders # Note! This contains all product lines so we do not need to worry # about suppliers after this. lines = self.lines.filter(type=OrderLineType.PRODUCT) if supplier_id: lines = lines.filter(supplier_id=supplier_id) lines_values = lines.values_list("product_id", "text", "quantity", "supplier__name") for product_id, line_text, quantity, supplier_name in lines_values: products[product_id]["line_text"] = line_text products[product_id]["ordered"] += quantity _append_suppliers_info(product_id, supplier_name) # Quantity to ship for product_id, quantity in self._get_to_ship_quantities(supplier_id): products[product_id]["unshipped"] += quantity # Quantity shipped for product_id, quantity in self._get_shipped_quantities(supplier_id): products[product_id]["shipped"] += quantity products[product_id]["unshipped"] -= quantity # Quantity refunded for product_id in self._get_refunded_product_ids(supplier_id): refunds = self.lines.refunds().filter(parent_line__product_id=product_id) refunded_quantity = refunds.aggregate(total=models.Sum("quantity"))["total"] or 0 products[product_id]["refunded"] = refunded_quantity products[product_id]["unshipped"] = max(products[product_id]["unshipped"] - refunded_quantity, 0) return products
def _get_to_ship_quantities(self, supplier_id): from ._products import ShippingMode lines_to_ship = self.lines.filter(type=OrderLineType.PRODUCT, product__shipping_mode=ShippingMode.SHIPPED) if supplier_id: lines_to_ship = lines_to_ship.filter(supplier_id=supplier_id) return lines_to_ship.values_list("product_id", "quantity") def _get_shipped_quantities(self, supplier_id): from ._shipments import ShipmentProduct, ShipmentStatus shipment_prods = ShipmentProduct.objects.filter(shipment__order=self).exclude( shipment__status=ShipmentStatus.DELETED ) if supplier_id: shipment_prods = shipment_prods.filter(shipment__supplier_id=supplier_id) return shipment_prods.values_list("product_id", "quantity") def _get_refunded_product_ids(self, supplier_id): refunded_prods = self.lines.refunds().filter(type=OrderLineType.REFUND, parent_line__type=OrderLineType.PRODUCT) if supplier_id: refunded_prods = refunded_prods.filter(parent_line__supplier_id=supplier_id) return refunded_prods.distinct().values_list("parent_line__product_id", flat=True)
[docs] def get_unshipped_products(self, supplier=None): return { product: summary_datum for product, summary_datum in self.get_product_summary(supplier=supplier).items() if summary_datum["unshipped"] }
[docs] def get_status_display(self): return force_text(self.status)
[docs] def get_payment_method_display(self): return force_text(self.payment_method_name)
[docs] def get_shipping_method_display(self): return force_text(self.shipping_method_name)
[docs] def get_tracking_codes(self): return [shipment.tracking_code for shipment in self.shipments.all_except_deleted() if shipment.tracking_code]
[docs] def get_sent_shipments(self): return self.shipments.all_except_deleted().sent()
[docs] def can_edit(self): return ( settings.SHUUP_ALLOW_EDITING_ORDER and not self.has_refunds() and not self.is_canceled() and not self.is_complete() and self.shipping_status == ShippingStatus.NOT_SHIPPED and self.payment_status == PaymentStatus.NOT_PAID )
[docs] def get_customer_name(self): name_attrs = ["customer", "billing_address", "orderer", "shipping_address"] for attr in name_attrs: if getattr(self, f"{attr}_id"): return getattr(self, attr).name
[docs] def get_available_shipping_methods(self): """ Get available shipping methods. :rtype: list[ShippingMethod] """ from shuup.core.models import ShippingMethod product_ids = self.lines.products().values_list("id", flat=True) return [ m for m in ShippingMethod.objects.available(shop=self.shop, products=product_ids) if m.is_available_for(self) ]
[docs] def get_available_payment_methods(self): """ Get available payment methods. :rtype: list[PaymentMethod] """ from shuup.core.models import PaymentMethod product_ids = self.lines.products().values_list("id", flat=True) return [ m for m in PaymentMethod.objects.available(shop=self.shop, products=product_ids) if m.is_available_for(self) ]
[docs] def change_status( self, next_status: OrderStatus, user: User = None, description: str = None, save=True, first_save=False, ): # validate next_status is valid or not # if changing to the same status, then return immediately if next_status == self.status and not first_save: return if first_save: old_status = None else: old_status = self.status if old_status and (next_status not in old_status.allowed_next_statuses.all()): raise InvalidOrderStatusError(_("Error! Can not change to this status")) # update new status of order self.status = next_status # create a new OrderStatusHistory entry OrderStatusHistory.objects.create( order=self, previous_order_status=old_status, next_order_status=next_status, description=description, creator=user, ) if save: self.save(update_fields=("status",)) # end OrderStatusHistory creation order_status_changed.send(type(self), order=self, old_status=old_status, new_status=self.status)
OrderLogEntry = define_log_model(Order)