import datetime
from collections import defaultdict
from decimal import Decimal
from typing import TYPE_CHECKING
import six
from django.conf import settings
from django.contrib.auth import get_user_model
from django.core.exceptions import ObjectDoesNotExist, ValidationError
from django.db import models
from django.db.models import Q
from django.db.transaction import atomic
from django.utils.crypto import get_random_string
from django.utils.translation import gettext_lazy as _
from enumfields import Enum, EnumIntegerField
from jsonfield import JSONField
from parler.managers import TranslatableQuerySet
from parler.models import TranslatableModel, TranslatedFields
from shuup.core import taxing
from shuup.core.excs import (
InvalidOrderStatusError,
NoPaymentToCreateException,
NoProductsToShipException,
NoRefundToCreateException,
NoShippingAddressException,
)
from shuup.core.fields import CurrencyField, InternalIdentifierField, LanguageField, MoneyValueField, UnsavedForeignKey
from shuup.core.pricing import TaxfulPrice, TaxlessPrice
from shuup.core.settings_provider import ShuupSettings
from shuup.core.signals import (
order_changed,
order_status_changed,
payment_created,
refund_created,
shipment_created,
shipment_created_and_processed,
)
from shuup.utils.analog import LogEntryKind, define_log_model
from shuup.utils.dates import local_now, to_aware
from shuup.utils.django_compat import force_text
from shuup.utils.money import Money
from shuup.utils.properties import MoneyPropped, TaxfulPriceProperty, TaxlessPriceProperty
from ._order_lines import OrderLineType
from ._order_utils import get_order_identifier, get_reference_number
from ._products import Product
from ._suppliers import Supplier
if TYPE_CHECKING: # pragma: no cover
from ._service_base import Service, ServiceBehaviorComponent # noqa: F401
User = get_user_model()
class PaymentStatus(Enum):
NOT_PAID = 0
PARTIALLY_PAID = 1
FULLY_PAID = 2
CANCELED = 3
DEFERRED = 4
class Labels:
NOT_PAID = _("not paid")
PARTIALLY_PAID = _("partially paid")
FULLY_PAID = _("fully paid")
CANCELED = _("canceled")
DEFERRED = _("deferred")
class ShippingStatus(Enum):
NOT_SHIPPED = 0
PARTIALLY_SHIPPED = 1
FULLY_SHIPPED = 2
class Labels:
NOT_SHIPPED = _("not shipped")
PARTIALLY_SHIPPED = _("partially shipped")
FULLY_SHIPPED = _("fully shipped")
class OrderStatusRole(Enum):
NONE = 0
INITIAL = 1
COMPLETE = 2
CANCELED = 3
PROCESSING = 4
# TODO: Failed state?
class Labels:
NONE = _("none")
INITIAL = _("Initial")
COMPLETE = _("Complete")
CANCELED = _("Canceled")
PROCESSING = _("Processing")
class DefaultOrderStatus(Enum):
NONE = "none"
INITIAL = "initial"
COMPLETE = "complete"
CANCELED = "canceled"
PROCESSING = "processing"
# TODO: Failed state?
class Labels:
NONE = _("none")
INITIAL = _("Received")
COMPLETE = _("Complete")
CANCELED = _("Canceled")
PROCESSING = _("In Progress")
class OrderStatusQuerySet(TranslatableQuerySet):
def _default_for_role(self, role):
"""
Get the default order status for the given role.
:param role: The role to look for.
:type role: OrderStatusRole
:return: The OrderStatus.
:rtype: OrderStatus
"""
try:
return self.get(default=True, role=role)
except ObjectDoesNotExist:
raise ObjectDoesNotExist(
"Error! No default `{}` OrderStatus exists.".format(getattr(role, "label", role))
) from None
def get_default_initial(self):
return self._default_for_role(OrderStatusRole.INITIAL)
def get_default_processing(self):
return self._default_for_role(OrderStatusRole.PROCESSING)
def get_default_canceled(self):
return self._default_for_role(OrderStatusRole.CANCELED)
def get_default_complete(self):
return self._default_for_role(OrderStatusRole.COMPLETE)
class OrderStatus(TranslatableModel):
identifier = InternalIdentifierField(
db_index=True,
blank=False,
editable=True,
unique=True,
help_text=_("Internal identifier for status. This is used to identify and distinguish the statuses in Shuup."),
)
ordering = models.IntegerField(
db_index=True,
default=0,
verbose_name=_("ordering"),
help_text=_("The processing order of statuses. Default is always processed first."),
)
role = EnumIntegerField(
OrderStatusRole,
default=OrderStatusRole.NONE,
db_index=True,
verbose_name=_("role"),
help_text=_("The role of this status. One role can have multiple order statuses."),
) # type: ignore
default = models.BooleanField(
default=False,
db_index=True,
verbose_name=_("default"),
help_text=_("Defines if the status should be considered as default. Default is always processed first."),
)
is_active = models.BooleanField(
default=True,
db_index=True,
verbose_name=_("is active"),
help_text=_("Defines if the status is usable."),
)
objects = OrderStatusQuerySet.as_manager()
translations = TranslatedFields(
name=models.CharField(
verbose_name=_("name"),
max_length=64,
help_text=_("Name of the order status."),
),
public_name=models.CharField(
verbose_name=_("public name"),
max_length=64,
help_text=_("The name shown to the customers in shop front."),
),
)
allowed_next_statuses = models.ManyToManyField(
"self", verbose_name=_("allowed next statuses"), blank=True, symmetrical=False
)
visible_for_customer = models.BooleanField(
default=True,
verbose_name=_("is visible for the user"),
help_text=_("Indicates whether this status is visible for the customers"),
)
class Meta:
unique_together = ("identifier", "role")
verbose_name = _("order status")
verbose_name_plural = _("order statuses")
def __str__(self):
return force_text(self.safe_translation_getter("name", default=self.identifier))
[docs]
def save(self, *args, **kwargs):
super().save(*args, **kwargs)
if self.default and self.role != OrderStatusRole.NONE:
# If this status is the default, make the others for this role non-default.
OrderStatus.objects.filter(role=self.role).exclude(pk=self.pk).update(default=False)
class OrderStatusHistory(models.Model):
order = models.ForeignKey(
"Order",
related_name="order_history",
blank=True,
null=True,
on_delete=models.PROTECT,
verbose_name=_("order id"),
)
previous_order_status = models.ForeignKey(
"OrderStatus",
related_name="previous_order_status",
blank=True,
null=True,
on_delete=models.PROTECT,
verbose_name=_("previous order status"),
)
next_order_status = models.ForeignKey(
"OrderStatus",
related_name="next_order_status",
on_delete=models.PROTECT,
verbose_name=_("next order status"),
)
created_on = models.DateTimeField(auto_now_add=True, editable=False, verbose_name=_("created on"))
description = models.TextField(blank=True, null=True, verbose_name=_("description"))
creator = UnsavedForeignKey(
settings.AUTH_USER_MODEL,
related_name="order_status_history_created",
blank=True,
null=True,
on_delete=models.PROTECT,
verbose_name=_("creating user"),
)
class OrderStatusManager:
[docs]
def __init__(self):
self.default_statuses = [
{
"name": _("Received"),
"public_name": _("Received"),
"role": OrderStatusRole.INITIAL,
"identifier": DefaultOrderStatus.INITIAL.value,
"default": True,
"is_active": True,
},
{
"name": _("In Progress"),
"public_name": _("In Progress"),
"role": OrderStatusRole.PROCESSING,
"identifier": DefaultOrderStatus.PROCESSING.value,
"default": True,
"is_active": True,
},
{
"name": _("Complete"),
"public_name": _("Complete"),
"role": OrderStatusRole.COMPLETE,
"identifier": DefaultOrderStatus.COMPLETE.value,
"default": True,
"is_active": True,
},
{
"name": _("Canceled"),
"public_name": _("Canceled"),
"role": OrderStatusRole.CANCELED,
"identifier": DefaultOrderStatus.CANCELED.value,
"default": True,
"is_active": True,
},
]
[docs]
def is_default(self, status_object):
return any(s["identifier"] == status_object.identifier for s in self.default_statuses)
[docs]
def ensure_default_statuses(self):
"""
Ensure Default Statuses.
It is important to ensure that default
statuses are always available. This method
will ensure this.
"""
# These values are based on the old Shuup data
update_map = {
"none": DefaultOrderStatus.NONE.value,
"recv": DefaultOrderStatus.INITIAL.value,
"prog": DefaultOrderStatus.PROCESSING.value,
"comp": DefaultOrderStatus.COMPLETE.value,
"canc": DefaultOrderStatus.CANCELED.value,
}
for status in OrderStatus.objects.all():
if status.identifier not in update_map:
continue
status.identifier = update_map[status.identifier]
status.save()
for i, defaults in enumerate(self.default_statuses):
defaults["ordering"] = i
status = OrderStatus.objects.filter(identifier=defaults["identifier"]).first()
if status:
defaults.pop("name")
defaults.pop("public_name")
for k, v in six.iteritems(defaults):
setattr(status, k, v)
status.save()
else:
OrderStatus.objects.create(**defaults)
self.ensure_allowed_next_statuses()
[docs]
def ensure_allowed_next_statuses(self):
"""
Populates allowed_next_statuses with default values.
"""
# Ensure default status transitions are always set up
# Set up INITIAL status transitions
try:
initial_status = OrderStatus.objects.filter(identifier=DefaultOrderStatus.INITIAL.value).first()
if initial_status:
allowed_statuses = OrderStatus.objects.filter(
identifier__in=[
DefaultOrderStatus.PROCESSING.value,
DefaultOrderStatus.COMPLETE.value,
DefaultOrderStatus.CANCELED.value,
]
)
# Clear existing and add new transitions
initial_status.allowed_next_statuses.clear()
initial_status.allowed_next_statuses.add(*allowed_statuses)
# Set up PROCESSING status transitions
processing_status = OrderStatus.objects.filter(identifier=DefaultOrderStatus.PROCESSING.value).first()
if processing_status:
allowed_statuses = OrderStatus.objects.filter(
identifier__in=[
DefaultOrderStatus.COMPLETE.value,
DefaultOrderStatus.CANCELED.value,
]
)
# Clear existing and add new transitions
processing_status.allowed_next_statuses.clear()
processing_status.allowed_next_statuses.add(*allowed_statuses)
except Exception:
# Fallback in case of any database issues
pass
class OrderQuerySet(models.QuerySet):
def paid(self):
return self.filter(payment_status=PaymentStatus.FULLY_PAID)
def incomplete(self):
return self.filter(
status__role__in=(
OrderStatusRole.NONE,
OrderStatusRole.INITIAL,
OrderStatusRole.PROCESSING,
)
)
def complete(self):
return self.filter(status__role=OrderStatusRole.COMPLETE) # TODO: read status
def valid(self):
return self.exclude(Q(deleted=True) | Q(status__role=OrderStatusRole.CANCELED)) # TODO: read status
def since(self, days, tz=None):
since_date = (local_now(tz) - datetime.timedelta(days=days)).date()
since = to_aware(since_date, tz=tz)
return self.in_date_range(since, None)
def in_date_range(self, start, end):
"""
Limit to orders is given date range.
:type start: datetime.datetime|None
:param start: Start time, inclusive.
:type end: datetime.datetime|None
:param end: End time, inclusive.
"""
result = self
if start:
result = result.filter(order_date__gte=to_aware(start))
if end:
result = result.filter(order_date__lte=to_aware(end))
return result
class Order(MoneyPropped, models.Model):
# Identification
shop = UnsavedForeignKey("Shop", on_delete=models.PROTECT, verbose_name=_("shop"))
created_on = models.DateTimeField(auto_now_add=True, editable=False, db_index=True, verbose_name=_("created on"))
modified_on = models.DateTimeField(auto_now=True, editable=False, db_index=True, verbose_name=_("modified on"))
identifier = InternalIdentifierField(unique=True, db_index=True, verbose_name=_("order identifier"))
# TODO: label is actually a choice field, need to check migrations/choice deconstruction
label = models.CharField(max_length=32, db_index=True, verbose_name=_("label"))
# The key shouldn't be possible to deduce (i.e. it should be random), but it is
# not a secret. (It could, however, be used as key material for an actual secret.)
key = models.CharField(max_length=32, unique=True, blank=False, verbose_name=_("key"))
reference_number = models.CharField(
max_length=64,
db_index=True,
unique=True,
blank=True,
null=True,
verbose_name=_("reference number"),
)
# Contact information
customer = UnsavedForeignKey(
"Contact",
related_name="customer_orders",
blank=True,
null=True,
on_delete=models.PROTECT,
verbose_name=_("customer"),
)
orderer = UnsavedForeignKey(
"PersonContact",
related_name="orderer_orders",
blank=True,
null=True,
on_delete=models.PROTECT,
verbose_name=_("orderer"),
)
billing_address = models.ForeignKey(
"ImmutableAddress",
related_name="billing_orders",
blank=True,
null=True,
on_delete=models.PROTECT,
verbose_name=_("billing address"),
)
shipping_address = models.ForeignKey(
"ImmutableAddress",
related_name="shipping_orders",
blank=True,
null=True,
on_delete=models.PROTECT,
verbose_name=_("shipping address"),
)
tax_number = models.CharField(max_length=64, blank=True, verbose_name=_("tax number"))
phone = models.CharField(max_length=64, blank=True, verbose_name=_("phone"))
email = models.EmailField(max_length=128, blank=True, verbose_name=_("email address"))
# Customer related information that might change after order, but is important
# for accounting and/or reports later.
account_manager = models.ForeignKey(
"PersonContact",
blank=True,
null=True,
on_delete=models.PROTECT,
verbose_name=_("account manager"),
)
customer_groups = models.ManyToManyField(
"ContactGroup",
related_name="customer_group_orders",
verbose_name=_("customer groups"),
blank=True,
)
tax_group = models.ForeignKey(
"CustomerTaxGroup",
blank=True,
null=True,
on_delete=models.PROTECT,
verbose_name=_("tax group"),
)
# Status
creator = UnsavedForeignKey(
settings.AUTH_USER_MODEL,
related_name="orders_created",
blank=True,
null=True,
on_delete=models.PROTECT,
verbose_name=_("creating user"),
)
modified_by = UnsavedForeignKey(
settings.AUTH_USER_MODEL,
related_name="orders_modified",
blank=True,
null=True,
on_delete=models.PROTECT,
verbose_name=_("modifier user"),
)
deleted = models.BooleanField(db_index=True, default=False, verbose_name=_("deleted"))
status = UnsavedForeignKey("OrderStatus", verbose_name=_("status"), on_delete=models.PROTECT)
payment_status = EnumIntegerField(
PaymentStatus, db_index=True, default=PaymentStatus.NOT_PAID, verbose_name=_("payment status")
) # type: ignore
shipping_status = EnumIntegerField(
ShippingStatus,
db_index=True,
default=ShippingStatus.NOT_SHIPPED,
verbose_name=_("shipping status"),
) # type: ignore
# Methods
payment_method = UnsavedForeignKey(
"PaymentMethod",
related_name="payment_orders",
blank=True,
null=True,
default=None,
on_delete=models.PROTECT,
verbose_name=_("payment method"),
)
payment_method_name = models.CharField(
max_length=100, blank=True, default="", verbose_name=_("payment method name")
)
payment_data = JSONField(blank=True, null=True, verbose_name=_("payment data"))
shipping_method = UnsavedForeignKey(
"ShippingMethod",
related_name="shipping_orders",
blank=True,
null=True,
default=None,
on_delete=models.PROTECT,
verbose_name=_("shipping method"),
)
shipping_method_name = models.CharField(
max_length=100, blank=True, default="", verbose_name=_("shipping method name")
)
shipping_data = JSONField(blank=True, null=True, verbose_name=_("shipping data"))
extra_data = JSONField(blank=True, null=True, verbose_name=_("extra data"))
# Money stuff
taxful_total_price = TaxfulPriceProperty("taxful_total_price_value", "currency")
taxless_total_price = TaxlessPriceProperty("taxless_total_price_value", "currency")
taxful_total_price_value = MoneyValueField(editable=False, verbose_name=_("grand total"), default=0)
taxless_total_price_value = MoneyValueField(editable=False, verbose_name=_("taxless total"), default=0)
currency = CurrencyField(verbose_name=_("currency"))
prices_include_tax = models.BooleanField(verbose_name=_("prices include tax"))
display_currency = CurrencyField(blank=True, verbose_name=_("display currency"))
display_currency_rate = models.DecimalField(
max_digits=36,
decimal_places=9,
default=1,
verbose_name=_("display currency rate"),
)
# Other
ip_address = models.GenericIPAddressField(null=True, blank=True, verbose_name=_("IP address"))
# `order_date` is not `auto_now_add` for backdating purposes
order_date = models.DateTimeField(editable=False, db_index=True, verbose_name=_("order date"))
payment_date = models.DateTimeField(null=True, editable=False, verbose_name=_("payment date"))
language = LanguageField(blank=True, verbose_name=_("language"))
customer_comment = models.TextField(blank=True, verbose_name=_("customer comment"))
admin_comment = models.TextField(blank=True, verbose_name=_("admin comment/notes"))
require_verification = models.BooleanField(default=False, verbose_name=_("requires verification"))
all_verified = models.BooleanField(default=False, verbose_name=_("all lines verified"))
marketing_permission = models.BooleanField(default=False, verbose_name=_("marketing permission"))
_codes = JSONField(blank=True, null=True, verbose_name=_("codes"))
common_select_related = ("billing_address",)
objects = OrderQuerySet.as_manager()
class Meta:
ordering = ("-id",)
verbose_name = _("order")
verbose_name_plural = _("orders")
def __str__(self): # pragma: no cover
if self.billing_address_id:
name = self.billing_address.name
else:
name = "-"
if ShuupSettings.get_setting("SHUUP_ENABLE_MULTIPLE_SHOPS"):
return f"Order {self.identifier} ({self.shop.name}, {name})"
else:
return f"Order {self.identifier} ({name})"
@property
def codes(self):
return list(self._codes or [])
@codes.setter
def codes(self, value):
codes = []
for code in value:
if not isinstance(code, six.text_type):
raise TypeError("Error! `codes` must be a list of strings.")
codes.append(code)
self._codes = codes
[docs]
def cache_prices(self):
taxful_total = TaxfulPrice(0, self.currency)
taxless_total = TaxlessPrice(0, self.currency)
for line in self.lines.all().prefetch_related("taxes"):
taxful_total += line.taxful_price
taxless_total += line.taxless_price
self.taxful_total_price = taxful_total
self.taxless_total_price = taxless_total
def _cache_contact_values(self):
sources = [
self.shipping_address,
self.billing_address,
self.customer,
self.orderer,
]
fields = ("tax_number", "email", "phone")
for field in fields:
if getattr(self, field, None):
continue
for source in sources:
val = getattr(source, field, None)
if val:
setattr(self, field, val)
break
if not self.id and self.customer:
# These fields are used for reporting and should not
# change after create even if empty at the moment of ordering.
self.account_manager = getattr(self.customer, "account_manager", None)
self.tax_group = self.customer.tax_group
def _cache_contact_values_post_create(self):
if self.customer:
# These fields are used for reporting and should not
# change after create even if empty at the moment of ordering.
self.customer_groups.set(self.customer.groups.all())
def _cache_values(self):
self._cache_contact_values()
if not self.label:
self.label = settings.SHUUP_DEFAULT_ORDER_LABEL
if not self.currency:
self.currency = self.shop.currency
if not self.prices_include_tax:
self.prices_include_tax = self.shop.prices_include_tax
if not self.display_currency:
self.display_currency = self.currency
self.display_currency_rate = 1
if self.shipping_method_id and not self.shipping_method_name:
self.shipping_method_name = self.shipping_method.safe_translation_getter(
"name", default=self.shipping_method.identifier, any_language=True
)
if self.payment_method_id and not self.payment_method_name:
self.payment_method_name = self.payment_method.safe_translation_getter(
"name", default=self.payment_method.identifier, any_language=True
)
if not self.key:
self.key = get_random_string(32)
if not self.modified_by:
self.modified_by = self.creator
def _save_identifiers(self):
self.identifier = f"{get_order_identifier(self)}"
self.reference_number = get_reference_number(self)
super().save(
update_fields=(
"identifier",
"reference_number",
)
)
[docs]
def full_clean(self, exclude=None, validate_unique=True):
self._cache_values()
return super().full_clean(exclude, validate_unique)
[docs]
def save(self, *args, **kwargs):
if not self.creator_id:
if not settings.SHUUP_ALLOW_ANONYMOUS_ORDERS:
raise ValidationError(
"Error! Anonymous (userless) orders are not allowed "
"when `SHUUP_ALLOW_ANONYMOUS_ORDERS` is not enabled."
)
self._cache_values()
first_save = not self.pk
if self.status is None:
self.status = OrderStatus.objects.get_default_initial()
super().save(*args, **kwargs)
if first_save: # Have to do a double save the first time around to be able to save identifiers
self._save_identifiers()
self._cache_contact_values_post_create()
self.change_status(next_status=self.status, user=self.creator, first_save=True)
order_changed.send(type(self), order=self)
[docs]
def delete(self, using=None):
if not self.deleted:
self.deleted = True
self.add_log_entry("Success! Deleted (soft).", kind=LogEntryKind.DELETION)
# Bypassing local `save()` on purpose.
super().save(update_fields=("deleted",), using=using)
[docs]
def set_canceled(self):
if self.status.role != OrderStatusRole.CANCELED:
self.status = OrderStatus.objects.get_default_canceled()
self.save()
def _set_paid(self):
if self.payment_status != PaymentStatus.FULLY_PAID: # pragma: no branch
self.add_log_entry(_("Order was marked as paid."))
self.payment_status = PaymentStatus.FULLY_PAID
self.payment_date = local_now()
self.save()
def _set_partially_paid(self):
if self.payment_status != PaymentStatus.PARTIALLY_PAID:
self.add_log_entry(_("Order was marked as partially paid."))
self.payment_status = PaymentStatus.PARTIALLY_PAID
self.save()
[docs]
def is_paid(self):
return self.payment_status == PaymentStatus.FULLY_PAID
[docs]
def is_partially_paid(self):
return self.payment_status == PaymentStatus.PARTIALLY_PAID
[docs]
def is_deferred(self):
return self.payment_status == PaymentStatus.DEFERRED
[docs]
def is_not_paid(self):
return self.payment_status == PaymentStatus.NOT_PAID
[docs]
def get_total_paid_amount(self):
amounts = self.payments.values_list("amount_value", flat=True)
return Money(sum(amounts, Decimal(0)), self.currency)
[docs]
def get_total_unpaid_amount(self):
difference = self.taxful_total_price.amount - self.get_total_paid_amount()
return max(difference, Money(0, self.currency))
[docs]
def can_create_payment(self):
zero = Money(0, self.currency)
return not (self.is_paid() or self.is_canceled()) and self.get_total_unpaid_amount() > zero
[docs]
def create_payment(self, amount, payment_identifier=None, description=""):
"""
Create a payment with a given amount for this order.
If the order already has payments and sum of their amounts is
equal or greater than `self.taxful_total_price` and the order is not
a zero price order, an exception is raised.
If the end sum of all payments is equal or greater than
`self.taxful_total_price`, then the order is marked as paid.
:param amount:
Amount of the payment to be created.
:type amount: Money
:param payment_identifier:
Identifier of the created payment. If not set, default value
of `gateway_id:order_id:number` will be used (where `number` is
a number of payments in the order).
:type payment_identifier: str|None
:param description:
Description of the payment. Will be set to `method` property
of the created payment.
:type description: str
:returns: The created Payment object
:rtype: shuup.core.models.Payment
"""
assert isinstance(amount, Money)
assert amount.currency == self.currency
payments = self.payments.order_by("created_on")
total_paid_amount = self.get_total_paid_amount()
if total_paid_amount >= self.taxful_total_price.amount and self.taxful_total_price:
raise NoPaymentToCreateException(
f"Error! Order {self.pk} has already been fully paid ({total_paid_amount}"
+ f" >= {self.taxful_total_price})."
)
if not payment_identifier:
number = payments.count() + 1
payment_identifier = f"{self.id}:{number}" # type: ignore
payment = self.payments.create( # type: ignore
payment_identifier=payment_identifier,
amount_value=amount.value,
description=description,
)
if self.get_total_paid_amount() >= self.taxful_total_price.amount: # type: ignore
self._set_paid() # also calls save
else:
self._set_partially_paid()
payment_created.send(sender=type(self), order=self, payment=payment)
return payment
[docs]
def can_create_shipment(self):
return self.get_unshipped_products() and not self.is_canceled() and self.shipping_address
# TODO: Rethink either the usage of shipment parameter or renaming the method for 3.0
[docs]
@atomic
def create_shipment(self, product_quantities, supplier=None, shipment=None):
"""
Create a shipment for this order from `product_quantities`.
`product_quantities` is expected to be a dict, which maps Product instances to quantities.
Only quantities over 0 are taken into account, and if the mapping is empty or has no quantity value
over 0, `NoProductsToShipException` will be raised.
Orders without a shipping address defined, will raise `NoShippingAddressException`.
:param product_quantities: a dict mapping Product instances to quantities to ship.
:type product_quantities: dict[shuup.shop.models.Product, decimal.Decimal]
:param supplier: Optional Supplier for this product. No validation is made.
:param shipment: Optional unsaved Shipment for ShipmentProduct's. If not given
Shipment is created based on supplier parameter.
:raises: NoProductsToShipException, NoShippingAddressException
:return: Saved, complete Shipment object.
:rtype: shuup.core.models.Shipment
"""
if not product_quantities or not any(quantity > 0 for quantity in product_quantities.values()):
raise NoProductsToShipException(
"Error! No products to ship (`quantities` is empty or has no quantity over 0)."
)
if self.shipping_address is None:
raise NoShippingAddressException("Error! Shipping address is not defined for this order.")
assert supplier or shipment
if shipment:
assert shipment.order == self
else:
from ._shipments import Shipment
shipment = Shipment(order=self, supplier=supplier)
shipment.save()
if not supplier:
supplier = shipment.supplier
supplier.ship_products(shipment, product_quantities)
self.add_log_entry(_("Success! Shipment #%d was created.") % shipment.id)
self.update_shipping_status()
shipment_created.send(sender=type(self), order=self, shipment=shipment)
shipment_created_and_processed.send(sender=type(self), order=self, shipment=shipment)
return shipment
[docs]
def can_create_refund(self, supplier=None):
unrefunded_amount = self.get_total_unrefunded_amount(supplier)
unrefunded_quantity = self.get_total_unrefunded_quantity(supplier)
return (
(unrefunded_amount.value > 0 or unrefunded_quantity > 0)
and not self.is_canceled()
and (self.payment_status not in (PaymentStatus.NOT_PAID, PaymentStatus.CANCELED))
)
[docs]
@atomic
def create_refund(self, refund_data, created_by=None, supplier=None):
"""
Create a refund if passed a list of refund line data.
Refund line data is simply a list of dictionaries where
each dictionary contains data for a particular refund line.
Additionally, if the parent line is of `enum` type
`OrderLineType.PRODUCT` and the `restock_products` boolean
flag is set to `True`, the products will be restocked with the
exact amount set in the order supplier's `quantity` field.
:param refund_data: List of dicts containing refund data.
:type refund_data: [dict]
:param created_by: Refund creator's user instance, used for
adjusting supplier stock.
:type created_by: django.contrib.auth.User|None
"""
tax_module = taxing.get_tax_module()
refund_lines = tax_module.create_refund_lines(self, supplier, created_by, refund_data)
self.cache_prices()
self.save()
self.update_shipping_status()
self.update_payment_status()
refund_created.send(sender=type(self), order=self, refund_lines=refund_lines)
[docs]
def create_full_refund(self, restock_products=False, created_by=None):
"""
Create a full refund for entire order content, with the option of
restocking stocked products.
:param restock_products: Boolean indicating whether to also restock the products.
:param created_by: Refund creator's user instance, used for
adjusting supplier stock.
:type restock_products: bool|False
"""
if self.has_refunds():
raise NoRefundToCreateException
self.cache_prices()
line_data = [
{
"line": line,
"quantity": line.quantity,
"amount": line.taxful_price.amount,
"restock_products": restock_products,
}
for line in self.lines.filter(quantity__gt=0)
if line.type != OrderLineType.REFUND
]
self.create_refund(line_data, created_by)
[docs]
def get_total_refunded_amount(self, supplier=None):
refunds = self.lines.refunds()
if supplier:
refunds = refunds.filter(Q(parent_line__supplier=supplier) | Q(supplier=supplier))
total = sum([line.taxful_price.amount.value for line in refunds])
return Money(-total, self.currency)
[docs]
def get_total_unrefunded_amount(self, supplier=None):
if supplier:
total_refund_amount = sum(
[
line.max_refundable_amount.value
for line in self.lines.filter(supplier=supplier).exclude(type=OrderLineType.REFUND)
]
)
arbitrary_refunds = abs(
sum(
[
refund_line.taxful_price.value
for refund_line in self.lines.filter(
supplier=supplier,
parent_line__isnull=True,
type=OrderLineType.REFUND,
)
]
)
)
return (
Money(max(total_refund_amount - arbitrary_refunds, 0), self.currency)
if total_refund_amount
else Money(0, self.currency)
)
return max(self.taxful_total_price.amount, Money(0, self.currency))
[docs]
def get_total_unrefunded_quantity(self, supplier=None):
queryset = self.lines.all()
if supplier:
queryset = queryset.filter(supplier=supplier)
return sum([line.max_refundable_quantity for line in queryset])
[docs]
def get_total_tax_amount(self):
return sum((line.tax_amount for line in self.lines.all()), Money(0, self.currency))
[docs]
def has_refunds(self):
return self.lines.refunds().exists()
[docs]
def create_shipment_of_all_products(self, supplier=None):
"""
Create a shipment of all the products in this Order, no matter whether or
not any have been previously marked as shipped or not.
See the documentation for `create_shipment`.
:param supplier: The Supplier to use. If `None`, the first supplier in
the order is used. (If several are in the order, this fails.)
:return: Saved, complete Shipment object.
:rtype: shuup.shop.models.Shipment
"""
from ._products import ShippingMode
suppliers_to_product_quantities = defaultdict(lambda: defaultdict(lambda: 0))
lines = self.lines.filter(type=OrderLineType.PRODUCT, product__shipping_mode=ShippingMode.SHIPPED).values_list(
"supplier_id", "product_id", "quantity"
)
for supplier_id, product_id, quantity in lines:
if product_id:
suppliers_to_product_quantities[supplier_id][product_id] += quantity
if not suppliers_to_product_quantities:
raise NoProductsToShipException("Error! Could not find any products to ship.")
if supplier is None:
if len(suppliers_to_product_quantities) > 1: # pragma: no cover
raise ValueError(
"Error! `create_shipment_of_all_products` can be used only when there is a single supplier."
)
supplier_id, quantities = suppliers_to_product_quantities.popitem()
supplier = Supplier.objects.get(pk=supplier_id)
else:
quantities = suppliers_to_product_quantities[supplier.id]
products = {product.pk: product for product in Product.objects.filter(pk__in=quantities.keys())}
quantities = {products[product_id]: quantity for (product_id, quantity) in quantities.items()}
return self.create_shipment(quantities, supplier=supplier)
[docs]
def check_all_verified(self):
if not self.all_verified:
new_all_verified = not self.lines.filter(verified=False).exists()
if new_all_verified:
self.all_verified = True
if self.require_verification:
self.add_log_entry(_("All rows requiring verification have been verified."))
self.require_verification = False
self.save()
return self.all_verified
[docs]
def get_purchased_attachments(self):
from ._product_media import ProductMedia
if self.payment_status != PaymentStatus.FULLY_PAID:
return ProductMedia.objects.none()
prods = self.lines.exclude(product=None).values_list("product_id", flat=True)
return ProductMedia.objects.filter(product__in=prods, enabled=True, purchased=True)
[docs]
def get_tax_summary(self):
"""
:rtype: taxing.TaxSummary
"""
all_line_taxes = []
untaxed = TaxlessPrice(0, self.currency)
for line in self.lines.all():
line_taxes = list(line.taxes.all())
all_line_taxes.extend(line_taxes)
if not line_taxes:
untaxed += line.taxless_price
return taxing.TaxSummary.from_line_taxes(all_line_taxes, untaxed)
[docs]
def get_product_ids_and_quantities(self, supplier=None):
lines = self.lines.filter(type=OrderLineType.PRODUCT)
if supplier:
supplier_id = supplier if isinstance(supplier, six.integer_types) else supplier.pk
lines = lines.filter(supplier_id=supplier_id)
quantities = defaultdict(lambda: 0)
for product_id, quantity in lines.values_list("product_id", "quantity"):
quantities[product_id] += quantity
return dict(quantities)
[docs]
def has_products(self):
return self.lines.products().exists()
[docs]
def has_products_requiring_shipment(self, supplier=None):
from ._products import ShippingMode
lines = self.lines.products().filter(product__shipping_mode=ShippingMode.SHIPPED)
if supplier:
supplier_id = supplier if isinstance(supplier, six.integer_types) else supplier.pk
lines = lines.filter(supplier_id=supplier_id)
return lines.exists()
[docs]
def is_complete(self):
return self.status.role == OrderStatusRole.COMPLETE
[docs]
def can_set_complete(self):
return not (self.is_complete() or self.is_canceled() or bool(self.get_unshipped_products()))
[docs]
def is_fully_shipped(self):
return self.shipping_status == ShippingStatus.FULLY_SHIPPED
[docs]
def is_partially_shipped(self):
return self.shipping_status == ShippingStatus.PARTIALLY_SHIPPED
[docs]
def is_canceled(self):
return self.status.role == OrderStatusRole.CANCELED
[docs]
def can_set_canceled(self):
canceled = self.status.role == OrderStatusRole.CANCELED
paid = self.is_paid()
shipped = self.shipping_status != ShippingStatus.NOT_SHIPPED
return not (canceled or paid or shipped)
[docs]
def update_shipping_status(self):
status_before_update = self.shipping_status
shipments_count = self.shipments.all_except_deleted().out_only().count()
has_unshipped_products = bool(self.get_unshipped_products())
if shipments_count:
# get the number of sent shipments
sent_shipments_count = self.get_sent_shipments().out_only().count()
# nothing sent
if sent_shipments_count == 0:
self.shipping_status = ShippingStatus.NOT_SHIPPED
# fully sent and no unshipped products
elif shipments_count == sent_shipments_count and not has_unshipped_products:
self.shipping_status = ShippingStatus.FULLY_SHIPPED
else:
self.shipping_status = ShippingStatus.PARTIALLY_SHIPPED
else:
self.shipping_status = ShippingStatus.NOT_SHIPPED
if status_before_update != self.shipping_status:
self.add_log_entry(_(f"New shipping status is set to: {self.shipping_status}."))
self.save(update_fields=("shipping_status",))
[docs]
def update_payment_status(self):
status_before_update = self.payment_status
if self.get_total_unpaid_amount().value == 0:
self.payment_status = PaymentStatus.FULLY_PAID
elif self.get_total_paid_amount().value > 0:
self.payment_status = PaymentStatus.PARTIALLY_PAID
elif self.payment_status != PaymentStatus.DEFERRED: # Do not make deferred here not paid
self.payment_status = PaymentStatus.NOT_PAID
if status_before_update != self.payment_status:
self.add_log_entry(_(f"New payment status is set to: {self.payment_status}."))
self.save(update_fields=("payment_status",))
[docs]
def get_known_additional_data(self):
"""
Get a list of "known additional data" in this order's `payment_data`, `shipping_data` and `extra_data`.
The list is returned in the order the fields are specified in the settings entries for said known keys.
`dict(that_list)` can of course be used to "flatten" the list into a dict.
:return: list of 2-tuples.
"""
output = []
for data_dict, name_mapping in (
(self.payment_data, settings.SHUUP_ORDER_KNOWN_PAYMENT_DATA_KEYS),
(self.shipping_data, settings.SHUUP_ORDER_KNOWN_SHIPPING_DATA_KEYS),
(self.extra_data, settings.SHUUP_ORDER_KNOWN_EXTRA_DATA_KEYS),
):
if hasattr(data_dict, "get"):
for key, display_name in name_mapping:
if key in data_dict:
output.append((force_text(display_name), data_dict[key]))
return output
[docs]
def get_product_summary(self, supplier=None):
"""Return a dict of product IDs -> {ordered, unshipped, refunded, shipped, line_text, suppliers}"""
supplier_id = (supplier if isinstance(supplier, six.integer_types) else supplier.pk) if supplier else None
products = defaultdict(lambda: defaultdict(lambda: Decimal(0)))
def _append_suppliers_info(product_id, supplier):
if not products[product_id]["suppliers"]:
products[product_id]["suppliers"] = [supplier]
elif supplier not in products[product_id]["suppliers"]:
products[product_id]["suppliers"].append(supplier)
# Quantity for all orders
# Note! This contains all product lines so we do not need to worry
# about suppliers after this.
lines = self.lines.filter(type=OrderLineType.PRODUCT)
if supplier_id:
lines = lines.filter(supplier_id=supplier_id)
lines_values = lines.values_list("product_id", "text", "quantity", "supplier__name")
for product_id, line_text, quantity, supplier_name in lines_values:
products[product_id]["line_text"] = line_text
products[product_id]["ordered"] += quantity
_append_suppliers_info(product_id, supplier_name)
# Quantity to ship
for product_id, quantity in self._get_to_ship_quantities(supplier_id):
products[product_id]["unshipped"] += quantity
# Quantity shipped
for product_id, quantity in self._get_shipped_quantities(supplier_id):
products[product_id]["shipped"] += quantity
products[product_id]["unshipped"] -= quantity
# Quantity refunded
for product_id in self._get_refunded_product_ids(supplier_id):
refunds = self.lines.refunds().filter(parent_line__product_id=product_id)
refunded_quantity = refunds.aggregate(total=models.Sum("quantity"))["total"] or 0
products[product_id]["refunded"] = refunded_quantity
products[product_id]["unshipped"] = max(products[product_id]["unshipped"] - refunded_quantity, 0)
return products
def _get_to_ship_quantities(self, supplier_id):
from ._products import ShippingMode
lines_to_ship = self.lines.filter(type=OrderLineType.PRODUCT, product__shipping_mode=ShippingMode.SHIPPED)
if supplier_id:
lines_to_ship = lines_to_ship.filter(supplier_id=supplier_id)
return lines_to_ship.values_list("product_id", "quantity")
def _get_shipped_quantities(self, supplier_id):
from ._shipments import ShipmentProduct, ShipmentStatus
shipment_prods = ShipmentProduct.objects.filter(shipment__order=self).exclude(
shipment__status=ShipmentStatus.DELETED
)
if supplier_id:
shipment_prods = shipment_prods.filter(shipment__supplier_id=supplier_id)
return shipment_prods.values_list("product_id", "quantity")
def _get_refunded_product_ids(self, supplier_id):
refunded_prods = self.lines.refunds().filter(type=OrderLineType.REFUND, parent_line__type=OrderLineType.PRODUCT)
if supplier_id:
refunded_prods = refunded_prods.filter(parent_line__supplier_id=supplier_id)
return refunded_prods.distinct().values_list("parent_line__product_id", flat=True)
[docs]
def get_unshipped_products(self, supplier=None):
return {
product: summary_datum
for product, summary_datum in self.get_product_summary(supplier=supplier).items()
if summary_datum["unshipped"]
}
[docs]
def get_status_display(self):
return force_text(self.status)
[docs]
def get_payment_method_display(self):
return force_text(self.payment_method_name)
[docs]
def get_shipping_method_display(self):
return force_text(self.shipping_method_name)
[docs]
def get_tracking_codes(self):
return [shipment.tracking_code for shipment in self.shipments.all_except_deleted() if shipment.tracking_code]
[docs]
def get_sent_shipments(self):
return self.shipments.all_except_deleted().sent()
[docs]
def can_edit(self):
return (
settings.SHUUP_ALLOW_EDITING_ORDER
and not self.has_refunds()
and not self.is_canceled()
and not self.is_complete()
and self.shipping_status == ShippingStatus.NOT_SHIPPED
and self.payment_status == PaymentStatus.NOT_PAID
)
[docs]
def get_customer_name(self):
name_attrs = ["customer", "billing_address", "orderer", "shipping_address"]
for attr in name_attrs:
if getattr(self, f"{attr}_id"):
return getattr(self, attr).name
[docs]
def get_available_shipping_methods(self):
"""
Get available shipping methods.
:rtype: list[ShippingMethod]
"""
from shuup.core.models import ShippingMethod
product_ids = self.lines.products().values_list("id", flat=True)
return [
m
for m in ShippingMethod.objects.available(shop=self.shop, products=product_ids)
if m.is_available_for(self)
]
[docs]
def get_available_payment_methods(self):
"""
Get available payment methods.
:rtype: list[PaymentMethod]
"""
from shuup.core.models import PaymentMethod
product_ids = self.lines.products().values_list("id", flat=True)
return [
m for m in PaymentMethod.objects.available(shop=self.shop, products=product_ids) if m.is_available_for(self)
]
[docs]
def change_status(
self,
next_status: OrderStatus,
user: User = None,
description: str = None,
save=True,
first_save=False,
):
# validate next_status is valid or not
# if changing to the same status, then return immediately
if next_status == self.status and not first_save:
return
if first_save:
old_status = None
else:
old_status = self.status
if old_status and (next_status not in old_status.allowed_next_statuses.all()):
raise InvalidOrderStatusError(_("Error! Can not change to this status"))
# update new status of order
self.status = next_status
# create a new OrderStatusHistory entry
OrderStatusHistory.objects.create(
order=self,
previous_order_status=old_status,
next_order_status=next_status,
description=description,
creator=user,
)
if save:
self.save(update_fields=("status",))
# end OrderStatusHistory creation
order_status_changed.send(type(self), order=self, old_status=old_status, new_status=self.status)
OrderLogEntry = define_log_model(Order)