from http import HTTPStatus
from typing import Any, Dict, Iterable, List, Optional, Tuple, Type, Union
from django.apps import apps
from django.contrib.auth import get_user_model
from django.core.exceptions import FieldDoesNotExist
from django.db import models
from django.db.models import Q
from django.http import HttpResponse, JsonResponse
from django.utils.translation import gettext_lazy as _
from django.views.generic import TemplateView
from shuup.admin.supplier_provider import get_supplier
from shuup.admin.utils.object_selector import get_object_selector_permission_name
from shuup.admin.utils.permissions import has_permission
from shuup.apps.provides import get_provide_objects
from shuup.core.models import (
Carrier,
Category,
Contact,
Product,
ProductMode,
Shop,
ShopProduct,
ShopProductVisibility,
Supplier,
)
from shuup.utils.django_compat import force_text
def _field_exists(model, field):
try:
model._meta.get_field(field)
return True
except FieldDoesNotExist:
return False
[docs]
class MultiselectAjaxView(TemplateView):
"""
This view is deprecated and it will be removed on version 3.
"""
model = None
search_fields: List[str] = []
result_limit = 20
[docs]
def init_search_fields(self, cls):
"""
Configure the fields to use for searching.
If the `cls` object has a search_fields attribute, it will be used,
otherwise, the class will be inspected and the attribute
`name` or `translations__name` will mainly be used.
Other fields will be used for already known `cls` instances.
"""
if hasattr(cls, "search_fields"):
self.search_fields = cls.search_fields
return
self.search_fields = []
key = "%sname" % ("translations__" if hasattr(cls, "translations") else "")
self.search_fields.append(key)
if issubclass(cls, Carrier):
self.search_fields.append("base_translations__name")
self.search_fields.remove("name")
if issubclass(cls, Contact):
self.search_fields.append("email")
if issubclass(cls, Product):
self.search_fields.append("sku")
self.search_fields.append("barcode")
if issubclass(cls, ShopProduct):
self.search_fields.append("product__translations__name")
user_model = get_user_model()
if issubclass(cls, user_model):
if _field_exists(user_model, "username"):
self.search_fields.append("username")
if _field_exists(user_model, "email"):
self.search_fields.append("email")
if not _field_exists(user_model, "name"):
self.search_fields.remove("name")
[docs]
def get_data(self, request, *args, **kwargs): # noqa
model_name = request.GET.get("model")
if not model_name:
return []
cls = apps.get_model(model_name)
qs = cls.objects.all()
shop = request.shop
# if shop is informed, make sure user has access to it
if request.GET.get("shop"):
query_shop = Shop.objects.get_for_user(request.user).filter(pk=request.GET["shop"]).first()
if query_shop:
shop = query_shop
search_mode = request.GET.get("searchMode")
qs = self._filter_query(request, cls, qs, shop, search_mode)
self.init_search_fields(cls)
if not self.search_fields:
return [{"id": None, "name": _("Couldn't get selections for %s.") % model_name}]
if request.GET.get("search"):
query = Q()
keyword = request.GET.get("search", "").strip()
for field in self.search_fields:
query |= Q(**{f"{field}__icontains": keyword})
if issubclass(cls, Contact) or issubclass(cls, get_user_model()):
query &= Q(is_active=True)
qs = qs.filter(query)
if search_mode and issubclass(cls, Product):
if search_mode == "main":
qs = qs.filter(
mode__in=[
ProductMode.SIMPLE_VARIATION_PARENT,
ProductMode.VARIABLE_VARIATION_PARENT,
ProductMode.NORMAL,
]
)
elif search_mode == "parent_product":
qs = qs.filter(
mode__in=[
ProductMode.SIMPLE_VARIATION_PARENT,
ProductMode.VARIABLE_VARIATION_PARENT,
]
)
elif search_mode == "sellable_mode_only":
qs = qs.exclude(
Q(
mode__in=[
ProductMode.SIMPLE_VARIATION_PARENT,
ProductMode.VARIABLE_VARIATION_PARENT,
]
)
| Q(deleted=True)
| Q(shop_products__visibility=ShopProductVisibility.NOT_VISIBLE)
).filter(shop_products__purchasable=True)
sales_units = request.GET.get("salesUnits")
if sales_units and issubclass(cls, Product):
qs = qs.filter(sales_unit__translations__symbol__in=sales_units.strip().split(","))
qs = qs.distinct()
return sorted(
[{"id": obj.id, "name": force_text(obj)} for obj in qs[: self.result_limit]],
key=lambda x: x["name"],
)
def _filter_query(self, request, cls, qs, shop, search_mode=None):
# the supplier provider returned a valid supplier
# make sure to filter the search by the current supplier
supplier = get_supplier(request)
if search_mode == "visible" and issubclass(cls, Category):
qs = cls.objects.all_visible(self.request.customer, shop=self.request.shop)
elif search_mode == "enabled" and issubclass(cls, Supplier):
qs = cls.objects.enabled(shop=shop)
elif hasattr(cls.objects, "all_except_deleted"):
qs = cls.objects.all_except_deleted(shop=shop)
elif hasattr(cls.objects, "get_for_user"):
qs = cls.objects.get_for_user(self.request.user)
if issubclass(cls, Product):
qs = qs.filter(shop_products__shop=shop)
if supplier:
qs = qs.filter(shop_products__suppliers=supplier)
related_fields = [
models.OneToOneField,
models.ForeignKey,
models.ManyToManyField,
]
# Get all relation fields and check whether this models has
# relation to Shop mode, if so, filter by the current shop
allowed_shop_fields = ["shop", "shops"]
shop_related_fields = [
field
for field in cls._meta.get_fields()
if type(field) in related_fields and field.related_model == Shop and field.name in allowed_shop_fields
]
for shop_field in shop_related_fields:
qs = qs.filter(**{shop_field.name: shop})
if supplier:
allowed_supplier_fields = ["supplier", "suppliers"]
supplier_related_fields = [
field
for field in cls._meta.get_fields()
if (
type(field) in related_fields
and field.related_model == Supplier
and field.name in allowed_supplier_fields
)
]
for supplier_field in supplier_related_fields:
qs = qs.filter(**{supplier_field.name: supplier})
return qs
[docs]
def get(self, request, *args, **kwargs):
return JsonResponse({"results": self.get_data(request, *args, **kwargs)})
[docs]
class ObjectSelectorView(TemplateView):
"""
Base class for responding to searches from select2 components.
"""
[docs]
def get(self, request, *args, **kwargs):
parameters = request.GET.dict()
selector = parameters.pop("selector", "")
if not selector:
HttpResponse(_("Selector not found."), status=HTTPStatus.BAD_REQUEST)
search_term = parameters.pop("q", "").strip()
user = request.user
shop = request.GET.get("shop")
if shop:
query_shop = Shop.objects.get_for_user(request.user).filter(pk=request.GET["shop"]).first()
if query_shop:
shop = query_shop
else:
shop = Shop.objects.get_for_user(request.user).first()
supplier = get_supplier(request)
if not (selector and search_term):
return JsonResponse({}, status=HTTPStatus.BAD_REQUEST) # Error 400
for admin_object_selector_class in sorted(
get_provide_objects("admin_object_selector"),
key=lambda provides: provides.ordering,
):
if not issubclass(admin_object_selector_class, BaseAdminObjectSelector):
continue
if not admin_object_selector_class.handles_selector(selector):
continue
admin_object_selector = admin_object_selector_class(selector, shop, user, supplier)
if not admin_object_selector.has_permission():
return JsonResponse({}, status=HTTPStatus.NOT_ACCEPTABLE) # Error 406
data = admin_object_selector.get_objects(search_term, **parameters)
return JsonResponse({"results": data})
return JsonResponse({}, status=HTTPStatus.NOT_FOUND) # Error 404
[docs]
class BaseAdminObjectSelector:
search_limit = 20
model: Optional[Type[Any]] = None
[docs]
def __init__(self, selector, shop, user, supplier=None, *args, **kwargs):
self.selector = selector
self.shop = shop
self.user = user
self.supplier = supplier
[docs]
@classmethod
def get_selector_for_model(cls, model):
return f"{model._meta.app_label}.{model._meta.model_name}"
[docs]
@classmethod
def handles_selector(cls, selector) -> bool:
return selector == cls.get_selector_for_model(cls.model)
[docs]
@classmethod
def handle_subclass_selector(cls, selector, parent_model):
try:
app_name, model_name = selector.split(".")
Model = apps.get_model(app_label=app_name, model_name=model_name)
return isinstance(Model, type) and issubclass(Model, parent_model)
except LookupError:
return False
[docs]
def has_permission(self) -> bool:
return has_permission(self.user, get_object_selector_permission_name(self.model))
[docs]
def get_objects(self, search_term, *args, **kwargs) -> Union[Iterable[Tuple[int, str]], List[Dict[str, Any]]]:
raise NotImplementedError()