Source code for xmm.pipeline.loaders.csv

import csv

from flask_babel import lazy_gettext as _

from xmm.models.fields import BooleanField, FileField, IntegerField, StringField
from xmm.pipeline import PipelineError

from .base import Loader


[docs]class CSVLoader(Loader): """Load CSV data.""" file_extension = 'csv' mime_type = 'application/csv' params = [ { 'key': 'filename', 'label': _('Dateiname'), 'datatype': FileField.type_name, }, { 'key': 'header', 'label': _('Header'), 'datatype': BooleanField.type_name, }, { 'key': 'quoting', 'label': _('Quoting'), 'datatype': IntegerField.type_name, 'default': csv.QUOTE_NONNUMERIC, 'choices': [ (csv.QUOTE_MINIMAL, _('Anführungszeichen minimal')), (csv.QUOTE_ALL, _('Anführungszeichen alle')), (csv.QUOTE_NONNUMERIC, _('Anführungszeichen nicht-numerisch')), (csv.QUOTE_NONE, _('Anführungszeichen nie')), ] }, { 'key': 'delimiter', 'label': _('Trennzeichen'), 'default': ';', 'datatype': StringField.type_name, }, { 'key': 'quotechar', 'label': _('Quotation-Zeichen'), 'default': '"', 'datatype': StringField.type_name, }, ]
[docs] def __init__(self, header=None, **kwargs): """ Create a new CSV loader instance. :param bool header: Does the data source have a header row? :keyword bool header_as_keys: Use the first row as dict keys for each object. :keyword csv.Dialect dialect: A csv writing dialect, defaults to :class:`csv.excel`. :keyword int quoting: A quoting method, one of QUOTE_MINIMAL (0), QUOTE_ALL (1), QUOTE_NONNUMERIC (2) or QUOTE_NONE (3), defaults to ``2``. :keyword str delimiter: Overwrite dialect's `delimiter` attribute, defaults to ``';'``. :keyword str quotechar: Overwrite dialect's `quotechar` attribute, defaults to ``'"'``. """ self._reader = None self._columns = None self.header_as_keys = kwargs.pop('header_as_keys', False) self.has_header = self.header_as_keys or header self.input_data = kwargs.pop('input_data', None) self.dialect = None self.dialect_args = kwargs
def get_reader(self, input_data=None): if input_data: self.input_data = input_data self.input_data.seek(0) self.sniff_csv() return csv.reader(self.input_data, self.dialect) def sniff_csv(self): try: sample = self.input_data.read(4096) except UnicodeDecodeError as e: raise PipelineError(userfriendly_message=_('Datei-Codierung stimmt nicht.'), original_exception=e) self.input_data.seek(0) sniffer = csv.Sniffer() try: if self.has_header is None: self.has_header = sniffer.has_header(sample) self.dialect = sniffer.sniff(sample) except csv.Error: self.dialect = csv.excel if self.dialect_args: for param, value in self.dialect_args.items(): if value and hasattr(self.dialect, param): setattr(self.dialect, param, value) def analyze(self, file_path): analysis = super().analyze(file_path) analysis['dialect'] = { k: v for k, v in self.dialect.__dict__.items() if not k.startswith('_') } return analysis def get_count(self, input_data=None): reader = self.get_reader(input_data) # assume first row is a header row if self.has_header: next(reader) return sum(1 for row in reader) def get_columns(self): if not self.has_header: return [] reader = self.get_reader() try: # assume first row is a header row first_row = next(reader) return first_row except StopIteration: return [] def load(self, input_data): """Load CSV data.""" reader = self.get_reader(input_data) header = [] if self.has_header: header = next(reader) for row in reader: if self.header_as_keys: yield zip(header, row) else: yield zip(range(len(row)), row)