import csv
from flask_babel import lazy_gettext as _
from xmm.models.fields import BooleanField, FileField, IntegerField, StringField
from xmm.pipeline import PipelineError
from .base import Loader
[docs]class CSVLoader(Loader):
"""Load CSV data."""
file_extension = 'csv'
mime_type = 'application/csv'
params = [
{
'key': 'filename',
'label': _('Dateiname'),
'datatype': FileField.type_name,
},
{
'key': 'header',
'label': _('Header'),
'datatype': BooleanField.type_name,
},
{
'key': 'quoting',
'label': _('Quoting'),
'datatype': IntegerField.type_name,
'default': csv.QUOTE_NONNUMERIC,
'choices': [
(csv.QUOTE_MINIMAL, _('Anführungszeichen minimal')),
(csv.QUOTE_ALL, _('Anführungszeichen alle')),
(csv.QUOTE_NONNUMERIC, _('Anführungszeichen nicht-numerisch')),
(csv.QUOTE_NONE, _('Anführungszeichen nie')),
]
},
{
'key': 'delimiter',
'label': _('Trennzeichen'),
'default': ';',
'datatype': StringField.type_name,
},
{
'key': 'quotechar',
'label': _('Quotation-Zeichen'),
'default': '"',
'datatype': StringField.type_name,
},
]
[docs] def __init__(self, header=None, **kwargs):
"""
Create a new CSV loader instance.
:param bool header: Does the data source have a header row?
:keyword bool header_as_keys: Use the first row as dict keys for each object.
:keyword csv.Dialect dialect: A csv writing dialect, defaults to :class:`csv.excel`.
:keyword int quoting: A quoting method, one of QUOTE_MINIMAL (0), QUOTE_ALL (1),
QUOTE_NONNUMERIC (2) or QUOTE_NONE (3), defaults to ``2``.
:keyword str delimiter: Overwrite dialect's `delimiter` attribute, defaults to ``';'``.
:keyword str quotechar: Overwrite dialect's `quotechar` attribute, defaults to ``'"'``.
"""
self._reader = None
self._columns = None
self.header_as_keys = kwargs.pop('header_as_keys', False)
self.has_header = self.header_as_keys or header
self.input_data = kwargs.pop('input_data', None)
self.dialect = None
self.dialect_args = kwargs
def get_reader(self, input_data=None):
if input_data:
self.input_data = input_data
self.input_data.seek(0)
self.sniff_csv()
return csv.reader(self.input_data, self.dialect)
def sniff_csv(self):
try:
sample = self.input_data.read(4096)
except UnicodeDecodeError as e:
raise PipelineError(userfriendly_message=_('Datei-Codierung stimmt nicht.'), original_exception=e)
self.input_data.seek(0)
sniffer = csv.Sniffer()
try:
if self.has_header is None:
self.has_header = sniffer.has_header(sample)
self.dialect = sniffer.sniff(sample)
except csv.Error:
self.dialect = csv.excel
if self.dialect_args:
for param, value in self.dialect_args.items():
if value and hasattr(self.dialect, param):
setattr(self.dialect, param, value)
def analyze(self, file_path):
analysis = super().analyze(file_path)
analysis['dialect'] = {
k: v for k, v in self.dialect.__dict__.items()
if not k.startswith('_')
}
return analysis
def get_count(self, input_data=None):
reader = self.get_reader(input_data)
# assume first row is a header row
if self.has_header:
next(reader)
return sum(1 for row in reader)
def get_columns(self):
if not self.has_header:
return []
reader = self.get_reader()
try:
# assume first row is a header row
first_row = next(reader)
return first_row
except StopIteration:
return []
def load(self, input_data):
"""Load CSV data."""
reader = self.get_reader(input_data)
header = []
if self.has_header:
header = next(reader)
for row in reader:
if self.header_as_keys:
yield zip(header, row)
else:
yield zip(range(len(row)), row)