Source code for shuup.testing.factories.order_factory

# Order-related factory functions
import datetime
import random
import uuid

from django.db.transaction import atomic
from django.utils.timezone import now

from shuup.core.defaults.order_statuses import create_default_order_statuses
from shuup.core.models import Basket, Contact, Order, OrderLine, OrderLineTax, OrderLineType, OrderStatus, Product
from shuup.core.order_creator import OrderCreator, OrderSource
from shuup.core.shortcuts import update_order_line_from_product
from shuup.core.taxing.utils import stacked_value_added_taxes

from .shared import _get_pricing_context, create_random_address, get_address, get_default_shop


[docs] def get_initial_order_status(): create_default_order_statuses() return OrderStatus.objects.get_default_initial()
[docs] def get_completed_order_status(): create_default_order_statuses() return OrderStatus.objects.get_default_complete()
[docs] def create_random_order( customer=None, products=(), completion_probability=0, shop=None, random_products=True, create_payment_for_order_total=False, order_date=None, ) -> Order: if not customer: customer = Contact.objects.all().order_by("?").first() if not customer: raise ValueError("Error! No valid contacts.") if shop is None: shop = get_default_shop() pricing_context = _get_pricing_context(shop, customer) source = OrderSource(shop) source.customer = customer source.customer_comment = "Mock Order" default_billing_address = getattr(customer, "default_billing_address", None) default_shipping_address = getattr(customer, "default_shipping_address", None) if default_billing_address: source.billing_address = customer.default_billing_address if default_shipping_address: source.shipping_address = customer.default_shipping_address if not source.billing_address or not source.shipping_address: source.billing_address = create_random_address() source.shipping_address = create_random_address() source.order_date = order_date or (now() - datetime.timedelta(days=random.uniform(0, 400))) source.status = get_initial_order_status() if not products: try: products = list(Product.objects.listed(source.shop, customer).order_by("?")[:40]) except Exception: products = list(Product.objects.all().order_by("?")[:40]) if random_products: num_lines = random.randint(3, 10) else: num_lines = len(products) for i in range(num_lines): if random_products: product = random.choice(products) else: product = products[i] line_quantity = random.randint(1, 5) price_info = product.get_price_info(pricing_context, quantity=line_quantity) shop_product = product.get_shop_instance(source.shop) supplier = shop_product.get_supplier(source.customer, line_quantity, source.shipping_address) line = source.add_line( type=OrderLineType.PRODUCT, product=product, supplier=supplier, quantity=line_quantity, base_unit_price=price_info.base_unit_price, discount_amount=price_info.discount_amount, sku=product.sku, text=product.safe_translation_getter("name", any_language=True), ) assert line.price == price_info.price with atomic(): oc = OrderCreator() order = oc.create_order(source) if random.random() < completion_probability: lines = getattr(order, "lines", None) if not lines: raise ValueError("Order has no lines to complete.") suppliers = {line.supplier for line in lines.filter(supplier__isnull=False, quantity__gt=0)} for supplier in suppliers: order.create_shipment_of_all_products(supplier=supplier) if create_payment_for_order_total: order.create_payment(order.taxful_total_price) order.save() next_status = get_completed_order_status() user = getattr(customer, "user", None) if next_status is not None and user and hasattr(customer, "user"): order.change_status(next_status=next_status, user=user) return order
[docs] def create_empty_order(prices_include_tax=False, shop=None): from .service_factory import get_default_payment_method, get_default_shipping_method from .shared import get_shop order = Order( shop=(shop or get_shop(prices_include_tax=prices_include_tax)), payment_method=get_default_payment_method(), shipping_method=get_default_shipping_method(), billing_address=get_address(name="Mony Doge").to_immutable(), shipping_address=get_address(name="Shippy Doge").to_immutable(), order_date=now(), status=get_initial_order_status(), ) return order
[docs] def add_product_to_order( order, supplier, product, quantity, taxless_base_unit_price, tax_rate=0, pricing_context=None, ): from .tax_factory import get_test_tax if not pricing_context: pricing_context = _get_pricing_context(order.shop, order.customer) product_order_line = OrderLine(order=order) update_order_line_from_product( pricing_context, order_line=product_order_line, product=product, quantity=quantity, supplier=supplier, ) base_unit_price = order.shop.create_price(taxless_base_unit_price) if order.prices_include_tax: base_unit_price *= 1 + tax_rate try: product_order_line.base_unit_price = order.shop.create_price(base_unit_price) except Exception: pass product_order_line.save() taxes = [get_test_tax(tax_rate)] price = quantity * base_unit_price taxed_price = stacked_value_added_taxes(price, taxes) order_line_tax = OrderLineTax.from_tax( taxes[0], taxed_price.taxless.amount, order_line=product_order_line, ) order_line_tax.save() if hasattr(product_order_line, "taxes"): try: product_order_line.taxes.add(order_line_tax) except Exception: pass
[docs] def create_order_with_product( product, supplier, quantity, taxless_base_unit_price, tax_rate=0, n_lines=1, shop=None, ): order = create_empty_order(shop=shop) order.full_clean() order.save() pricing_context = _get_pricing_context(order.shop, order.customer) for _x in range(n_lines): add_product_to_order( order, supplier, product, quantity, taxless_base_unit_price, tax_rate, pricing_context, ) assert order.get_product_ids_and_quantities()[product.pk] == (quantity * n_lines), "Things got added" order.cache_prices() order.save() return order
[docs] def get_basket(shop=None): shop = shop or get_default_shop() return Basket.objects.create( key=uuid.uuid1().hex, shop=shop, prices_include_tax=shop.prices_include_tax, currency=shop.currency, )