Source code for shuup.importer.admin_module.import_views

import hashlib
import logging
import os
from datetime import datetime

from django.apps import apps
from django.contrib import messages
from django.db.models import Q
from django.http.response import Http404, HttpResponse, HttpResponseBadRequest
from django.shortcuts import redirect
from django.utils.translation import gettext_lazy as _
from django.views.generic import DetailView, FormView, TemplateView, View

from shuup.admin.shop_provider import get_shop
from shuup.admin.supplier_provider import get_supplier
from shuup.admin.toolbar import NewActionButton
from shuup.admin.utils.permissions import has_permission
from shuup.admin.utils.picotable import ChoicesFilter, Column, DateRangeFilter, Picotable
from shuup.admin.utils.views import PicotableListView
from shuup.apps.provides import get_provide_objects
from shuup.core.models import BackgroundTaskExecution, BackgroundTaskExecutionStatus
from shuup.core.tasks import LOGGER, run_task
from shuup.importer.admin_module.forms import ImportForm, ImportSettingsForm
from shuup.importer.exceptions import ImporterError
from shuup.importer.utils import get_import_file_path, get_importer, get_importer_choices
from shuup.importer.utils.importer import FileImporter, ImportMode
from shuup.utils.django_compat import reverse

logger = logging.getLogger(__name__)


IMPORTER_NAMES_MAP = {importer.identifier: importer.name for importer in get_provide_objects("importers")}


[docs] class ImporterPicotable(Picotable):
[docs] def get_verbose_name_plural(self): return _("Data imports")
[docs] class ImportProcessView(TemplateView): template_name = "shuup/importer/admin/import_process.jinja"
[docs] def dispatch(self, request, *args, **kwargs): if not request.GET.get("importer"): return HttpResponseBadRequest(_("Invalid importer.")) if not request.GET.get("n"): return HttpResponseBadRequest(_("File is missing.")) return super().dispatch(request, *args, **kwargs)
[docs] def post(self, request, *args, **kwargs): mapping = {} for field in request.POST.keys(): if field.startswith("remap["): # remove the remap[] part field_name = field.replace("remap[", "")[:-1] mapping[field_name] = self.request.POST.getlist(field) supplier = get_supplier(request) shop = get_shop(request) run_task( "shuup.importer.tasks.import_file", stored=True, queue="data_import", importer=request.GET["importer"], import_mode=request.POST.get("import_mode") or ImportMode.CREATE_UPDATE.value, file_name=request.GET["n"], language=request.GET.get("lang"), shop_id=shop.pk, supplier_id=supplier.pk if supplier else None, user_id=request.user.pk, mapping=mapping, ) messages.success(request, _("The import was queued!")) return redirect(reverse("shuup_admin:importer.import"))
[docs] def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) file_importer = FileImporter( importer=self.request.GET["importer"], import_mode=ImportMode.CREATE_UPDATE, file_name=self.request.GET["n"], language=self.request.GET.get("lang"), shop_id=get_shop(self.request), supplier_id=get_supplier(self.request), ) file_importer.prepare() settings_form = ImportSettingsForm(data=self.request.POST if self.request.POST else None) if settings_form.is_bound: settings_form.is_valid() context["data"] = file_importer.data context["importer"] = file_importer.importer context["form"] = settings_form context["model_fields"] = file_importer.importer.get_fields_for_mapping() context["visible_rows"] = file_importer.data.rows[1:5] return context
[docs] def get(self, request, *args, **kwargs): try: return self.render_to_response(self.get_context_data(**kwargs)) except ImporterError: LOGGER.exception("Failed to process the file") messages.error(request, _("Failed to process the file.")) return redirect(reverse("shuup_admin:importer.import"))
[docs] class ImportView(FormView): template_name = "shuup/importer/admin/import.jinja" form_class = ImportForm
[docs] def post(self, request, *args, **kwargs): file = self.request.FILES["file"] basename, ext = os.path.splitext(file.name) import_name = "{}{}".format( hashlib.sha256((f"{datetime.now()}").encode()).hexdigest(), ext, ) full_path = get_import_file_path(import_name) if not os.path.isdir(os.path.dirname(full_path)): os.makedirs(os.path.dirname(full_path)) with open(full_path, "wb+") as destination: for chunk in file.chunks(): destination.write(chunk) next_url = request.POST.get("next") importer = request.POST.get("importer") lang = request.POST.get("language") return redirect(f"{next_url}?n={import_name}&importer={importer}&lang={lang}")
[docs] def get_form_kwargs(self): kwargs = super().get_form_kwargs() initial = kwargs.get("initial", {}) initial["importer"] = self.request.GET.get("importer", initial.get("initial")) kwargs.update({"request": self.request, "initial": initial}) return kwargs
[docs] def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) # check whether the importer has a example file template # if so, we also add a url to download the example file importer = self.request.GET.get("importer") # no importer passed, get the first choice available if not importer: importers = list(get_importer_choices(self.request.user)) if importers: importer = importers[0][0] if importer: importer_cls = get_importer(importer) context.update(importer_cls.get_help_context_data(self.request)) context["importer"] = importer_cls return context
[docs] class ExampleFileDownloadView(View):
[docs] def get(self, request, *args, **kwargs): importer = request.GET.get("importer") file_name = request.GET.get("file_name") if not importer or not file_name: return HttpResponseBadRequest(_("Invalid parameters.")) importer_cls = get_importer(importer) if not importer_cls or not importer_cls.has_example_file(): raise Http404(_("Invalid importer.")) example_file = importer_cls.get_example_file(file_name) if not example_file: raise Http404(_("Invalid file name.")) response = HttpResponse(content_type=example_file.content_type) response["Content-Disposition"] = f"attachment; filename={example_file.file_name}" data = importer_cls.get_example_file_content(example_file, request) if not data: raise Http404(_("File was not found.")) data.seek(0) response.write(data.getvalue()) return response
[docs] def get_imports_queryset(request): # get only executions from tasks inside `data_import` queue queryset = BackgroundTaskExecution.objects.select_related("task", "task__user").filter(task__queue="data_import") if not has_permission(request.user, "importer.show-all-imports"): shop = get_shop(request) supplier = get_supplier(request) queryset = queryset.filter( Q(Q(task__shop=shop) | Q(task__shop__isnull=True)), Q(task__supplier=supplier), ) return queryset
[docs] class ImportListView(PicotableListView): picotable_class = ImporterPicotable model = BackgroundTaskExecution default_columns = [ Column( "started_on", _("Import date"), sortable=True, filter_config=DateRangeFilter(), ), Column("importer", _("Importer"), sortable=False, display="get_importer"), Column("import_mode", _("Import mode"), sortable=False, display="get_import_mode"), Column("user", _("User"), sort_field="task__user", display="get_user"), Column( "status", _("Status"), sort_field="status", filter_config=ChoicesFilter(BackgroundTaskExecutionStatus.choices()), ), ] toolbar_buttons_provider_key = "import_list_toolbar_provider" mass_actions_provider_key = "import_list_mass_actions_provider"
[docs] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.columns = self.default_columns
[docs] def get_importer(self, instance): importer = instance.task.arguments["importer"] return IMPORTER_NAMES_MAP.get(importer, importer)
[docs] def get_user(self, instance): return str(instance.task.user or "-")
[docs] def get_import_mode(self, instance): return ImportMode(instance.task.arguments["import_mode"]).label
[docs] def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) context["title"] = _("Data imports") return context
[docs] def get_toolbar(self): toolbar = super().get_toolbar() toolbar.append( NewActionButton( url=reverse("shuup_admin:importer.import.new"), text=_("Import file"), icon="fa fa-upload", ) ) return toolbar
[docs] def get_queryset(self): return get_imports_queryset(self.request).defer("result", "error_log")
[docs] def get_object_url(self, instance): return reverse("shuup_admin:importer.import.detail", kwargs={"pk": instance.pk})
[docs] def get_object_abstract(self, instance, item): return [ {"text": item.get("importer"), "class": "header"}, {"title": _("Importdate"), "text": item.get("started_on")}, {"title": _("Mode"), "text": item.get("import_mode")}, {"title": _("User"), "text": item.get("user")}, {"title": _("Status"), "text": item.get("status")}, ]
[docs] class ImportDetailView(DetailView): model = BackgroundTaskExecution template_name = "shuup/importer/admin/import_process_complete.jinja"
[docs] def get_queryset(self): return get_imports_queryset(self.request)
[docs] def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) result = self.object.result context["new_objects"] = [] context["updated_objects"] = [] context["log_messages"] = [] context["other_log_messages"] = [] if result: context["log_messages"] = result.get("log_messages") context["other_log_messages"] = result.get("other_log_messages") new_objects = result.get("new_objects") updated_objects = result.get("updated_objects") if new_objects: model = apps.get_model(new_objects[0]["model"]) pks = [obj["pk"] for obj in new_objects] context["new_objects"] = model.objects.filter(pk__in=pks).order_by("pk") if updated_objects: model = apps.get_model(updated_objects[0]["model"]) pks = [obj["pk"] for obj in updated_objects] context["updated_objects"] = model.objects.filter(pk__in=pks).order_by("pk") return context