Source code for fastiot.util.csv_reader

import csv
from typing import List, Dict, Callable, Optional

from fastiot.exceptions import CSVError

csv.register_dialect('strict', delimiter=',', quotechar='"', quoting=csv.QUOTE_ALL, strict=True, skipinitialspace=False)


[docs]class CSVReader: """ A class for csv file reading. It uses python-lib's csv package and does more strict checks on how the csv file must be formatted. Raises a CSVError if the file is incorrectly formatted. Example usage: .. code-block:: python with CSVReader('my_file.csv', required_fields=['my_field1', 'my_field2'], optional_fields=['my_optional_field2']) as reader: for data_row in reader: print(data_row['my_field1']) print(data_row['my_field2']) print(data_row.get('my_optional_field2', 'unset')) """
[docs] def __init__(self, filename: str, required_fields: Optional[List[str]] = None, optional_fields: Optional[List[str]] = None, checks: Optional[Dict[str, Callable[[str], bool]]] = None, do_allow_arbitrary_fields: bool = False): """ Constructor for csv reader. :param filename: The filename of the csv file. Must be relative to workdir. :param required_fields: Specify required field names which must be in the csv header line. If not, it will raise a CSVError during parsing. :param optional_fields: Specify optional field names. If the csv file contains field names, which are not specified, it will raise a CSVError during parsing. :param checks: Specify checks for fields as a mapping of field names to callables. The callables should return true if a given value is valid, false otherwise. :param do_allow_arbitrary_fields: If true, it will consider all possible field names as optional. Use this option with caution because it will prevent possible errors from being detected. """ if required_fields is None: required_fields = [] if optional_fields is None: optional_fields = [] if checks is None: checks = {} self.filename = filename self._required_fields = required_fields self._optional_fields = optional_fields self._checks = checks self._do_allow_arbitrary_fields = do_allow_arbitrary_fields self.file = None
def __enter__(self): # Upon entering we want to read through the hole csv file and check if its correctly formatted. # We do so, to be able to detect multiple errors at once and output them properly. self.file = open(self.filename, encoding='utf-8-sig') reader = csv.reader(self.file, dialect='strict') invalid_lines_log_msgs: List[str] = [] for row in reader: if reader.line_num == 1: self._parse_header_line(row) else: if len(row) != len(self._header_fields): invalid_lines_log_msgs.append( f"Line {reader.line_num}, Actual number of columns: {len(row)}, " f"expected {len(self._header_fields)} columns" ) else: invalid_fields_for_current_row = self._get_invalid_fields(row) for header_field, cell in invalid_fields_for_current_row.items(): invalid_lines_log_msgs.append( f"Line {reader.line_num}, Checks failed for header field '{header_field}'. " f"Incorrect value '{cell}'." ) if len(invalid_lines_log_msgs) > 0: invalid_lines_log_msg = "\n".join(invalid_lines_log_msgs) raise CSVError(f"Following error(s) occurred during parsing of file '{self.filename}': \n" f"{invalid_lines_log_msg}") return self def __exit__(self, exc_type, exc_val, exc_tb): self.file.close() def __iter__(self): self.file.close() self.file = open(self.filename, encoding='utf-8-sig') self._csv_reader = csv.reader(self.file, dialect='strict') self._csv_reader_iter = self._csv_reader.__iter__() return self def __next__(self) -> Dict[str, str]: row = self._csv_reader_iter.__next__() if self._csv_reader.line_num == 1: # skip header line row = self._csv_reader_iter.__next__() result = dict(zip(self._header_fields, row)) # we assume that no error checks are needed here because we checked csv file before return result def _parse_header_line(self, row: List[str]): headers_temp = list(row) for required_field in self._required_fields: if required_field not in headers_temp: raise CSVError(f"Error parsing file '{self.filename}' Line 1: Required field '{required_field}' " f"not found in csv header-line") headers_temp.remove(required_field) for optional_field in self._optional_fields: if optional_field in headers_temp: headers_temp.remove(optional_field) if len(headers_temp) > 0 and self._do_allow_arbitrary_fields is False: unrecognized_fields = ', '.join([f"'{field}'" for field in headers_temp]) raise CSVError(f"Error parsing file '{self.filename}' Line 1: Unrecognized header fields " f"{unrecognized_fields}.") self._header_fields = row def _get_invalid_fields(self, row: List[str]) -> Dict[str, str]: invalid_fields = {} for header_field, cell in zip(self._header_fields, row): if header_field in self._checks.keys(): if self._checks[header_field](cell) is False: invalid_fields[header_field] = cell return invalid_fields