import hashlib
import logging
import os
from datetime import datetime
from django.apps import apps
from django.contrib import messages
from django.db.models import Q
from django.http.response import Http404, HttpResponse, HttpResponseBadRequest
from django.shortcuts import redirect
from django.utils.translation import gettext_lazy as _
from django.views.generic import DetailView, FormView, TemplateView, View
from shuup.admin.shop_provider import get_shop
from shuup.admin.supplier_provider import get_supplier
from shuup.admin.toolbar import NewActionButton
from shuup.admin.utils.permissions import has_permission
from shuup.admin.utils.picotable import ChoicesFilter, Column, DateRangeFilter, Picotable
from shuup.admin.utils.views import PicotableListView
from shuup.apps.provides import get_provide_objects
from shuup.core.models import BackgroundTaskExecution, BackgroundTaskExecutionStatus
from shuup.core.tasks import LOGGER, run_task
from shuup.importer.admin_module.forms import ImportForm, ImportSettingsForm
from shuup.importer.exceptions import ImporterError
from shuup.importer.utils import get_import_file_path, get_importer, get_importer_choices
from shuup.importer.utils.importer import FileImporter, ImportMode
from shuup.utils.django_compat import reverse
logger = logging.getLogger(__name__)
IMPORTER_NAMES_MAP = {importer.identifier: importer.name for importer in get_provide_objects("importers")}
[docs]
class ImporterPicotable(Picotable):
[docs]
def get_verbose_name_plural(self):
return _("Data imports")
[docs]
class ImportProcessView(TemplateView):
template_name = "shuup/importer/admin/import_process.jinja"
[docs]
def dispatch(self, request, *args, **kwargs):
if not request.GET.get("importer"):
return HttpResponseBadRequest(_("Invalid importer."))
if not request.GET.get("n"):
return HttpResponseBadRequest(_("File is missing."))
return super().dispatch(request, *args, **kwargs)
[docs]
def post(self, request, *args, **kwargs):
mapping = {}
for field in request.POST.keys():
if field.startswith("remap["):
# remove the remap[] part
field_name = field.replace("remap[", "")[:-1]
mapping[field_name] = self.request.POST.getlist(field)
supplier = get_supplier(request)
shop = get_shop(request)
run_task(
"shuup.importer.tasks.import_file",
stored=True,
queue="data_import",
importer=request.GET["importer"],
import_mode=request.POST.get("import_mode") or ImportMode.CREATE_UPDATE.value,
file_name=request.GET["n"],
language=request.GET.get("lang"),
shop_id=shop.pk,
supplier_id=supplier.pk if supplier else None,
user_id=request.user.pk,
mapping=mapping,
)
messages.success(request, _("The import was queued!"))
return redirect(reverse("shuup_admin:importer.import"))
[docs]
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
file_importer = FileImporter(
importer=self.request.GET["importer"],
import_mode=ImportMode.CREATE_UPDATE,
file_name=self.request.GET["n"],
language=self.request.GET.get("lang"),
shop_id=get_shop(self.request),
supplier_id=get_supplier(self.request),
)
file_importer.prepare()
settings_form = ImportSettingsForm(data=self.request.POST if self.request.POST else None)
if settings_form.is_bound:
settings_form.is_valid()
context["data"] = file_importer.data
context["importer"] = file_importer.importer
context["form"] = settings_form
context["model_fields"] = file_importer.importer.get_fields_for_mapping()
context["visible_rows"] = file_importer.data.rows[1:5]
return context
[docs]
def get(self, request, *args, **kwargs):
try:
return self.render_to_response(self.get_context_data(**kwargs))
except ImporterError:
LOGGER.exception("Failed to process the file")
messages.error(request, _("Failed to process the file."))
return redirect(reverse("shuup_admin:importer.import"))
[docs]
class ImportView(FormView):
template_name = "shuup/importer/admin/import.jinja"
form_class = ImportForm
[docs]
def post(self, request, *args, **kwargs):
file = self.request.FILES["file"]
basename, ext = os.path.splitext(file.name)
import_name = "{}{}".format(
hashlib.sha256((f"{datetime.now()}").encode()).hexdigest(),
ext,
)
full_path = get_import_file_path(import_name)
if not os.path.isdir(os.path.dirname(full_path)):
os.makedirs(os.path.dirname(full_path))
with open(full_path, "wb+") as destination:
for chunk in file.chunks():
destination.write(chunk)
next_url = request.POST.get("next")
importer = request.POST.get("importer")
lang = request.POST.get("language")
return redirect(f"{next_url}?n={import_name}&importer={importer}&lang={lang}")
[docs]
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
# check whether the importer has a example file template
# if so, we also add a url to download the example file
importer = self.request.GET.get("importer")
# no importer passed, get the first choice available
if not importer:
importers = list(get_importer_choices(self.request.user))
if importers:
importer = importers[0][0]
if importer:
importer_cls = get_importer(importer)
context.update(importer_cls.get_help_context_data(self.request))
context["importer"] = importer_cls
return context
[docs]
class ExampleFileDownloadView(View):
[docs]
def get(self, request, *args, **kwargs):
importer = request.GET.get("importer")
file_name = request.GET.get("file_name")
if not importer or not file_name:
return HttpResponseBadRequest(_("Invalid parameters."))
importer_cls = get_importer(importer)
if not importer_cls or not importer_cls.has_example_file():
raise Http404(_("Invalid importer."))
example_file = importer_cls.get_example_file(file_name)
if not example_file:
raise Http404(_("Invalid file name."))
response = HttpResponse(content_type=example_file.content_type)
response["Content-Disposition"] = f"attachment; filename={example_file.file_name}"
data = importer_cls.get_example_file_content(example_file, request)
if not data:
raise Http404(_("File was not found."))
data.seek(0)
response.write(data.getvalue())
return response
[docs]
def get_imports_queryset(request):
# get only executions from tasks inside `data_import` queue
queryset = BackgroundTaskExecution.objects.select_related("task", "task__user").filter(task__queue="data_import")
if not has_permission(request.user, "importer.show-all-imports"):
shop = get_shop(request)
supplier = get_supplier(request)
queryset = queryset.filter(
Q(Q(task__shop=shop) | Q(task__shop__isnull=True)),
Q(task__supplier=supplier),
)
return queryset
[docs]
class ImportListView(PicotableListView):
picotable_class = ImporterPicotable
model = BackgroundTaskExecution
default_columns = [
Column(
"started_on",
_("Import date"),
sortable=True,
filter_config=DateRangeFilter(),
),
Column("importer", _("Importer"), sortable=False, display="get_importer"),
Column("import_mode", _("Import mode"), sortable=False, display="get_import_mode"),
Column("user", _("User"), sort_field="task__user", display="get_user"),
Column(
"status",
_("Status"),
sort_field="status",
filter_config=ChoicesFilter(BackgroundTaskExecutionStatus.choices()),
),
]
toolbar_buttons_provider_key = "import_list_toolbar_provider"
mass_actions_provider_key = "import_list_mass_actions_provider"
[docs]
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.columns = self.default_columns
[docs]
def get_importer(self, instance):
importer = instance.task.arguments["importer"]
return IMPORTER_NAMES_MAP.get(importer, importer)
[docs]
def get_user(self, instance):
return str(instance.task.user or "-")
[docs]
def get_import_mode(self, instance):
return ImportMode(instance.task.arguments["import_mode"]).label
[docs]
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context["title"] = _("Data imports")
return context
[docs]
def get_queryset(self):
return get_imports_queryset(self.request).defer("result", "error_log")
[docs]
def get_object_url(self, instance):
return reverse("shuup_admin:importer.import.detail", kwargs={"pk": instance.pk})
[docs]
def get_object_abstract(self, instance, item):
return [
{"text": item.get("importer"), "class": "header"},
{"title": _("Importdate"), "text": item.get("started_on")},
{"title": _("Mode"), "text": item.get("import_mode")},
{"title": _("User"), "text": item.get("user")},
{"title": _("Status"), "text": item.get("status")},
]
[docs]
class ImportDetailView(DetailView):
model = BackgroundTaskExecution
template_name = "shuup/importer/admin/import_process_complete.jinja"
[docs]
def get_queryset(self):
return get_imports_queryset(self.request)
[docs]
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
result = self.object.result
context["new_objects"] = []
context["updated_objects"] = []
context["log_messages"] = []
context["other_log_messages"] = []
if result:
context["log_messages"] = result.get("log_messages")
context["other_log_messages"] = result.get("other_log_messages")
new_objects = result.get("new_objects")
updated_objects = result.get("updated_objects")
if new_objects:
model = apps.get_model(new_objects[0]["model"])
pks = [obj["pk"] for obj in new_objects]
context["new_objects"] = model.objects.filter(pk__in=pks).order_by("pk")
if updated_objects:
model = apps.get_model(updated_objects[0]["model"])
pks = [obj["pk"] for obj in updated_objects]
context["updated_objects"] = model.objects.filter(pk__in=pks).order_by("pk")
return context