Source code for csvsmith.tools.filter_rows

from __future__ import annotations

import csv
from pathlib import Path
from typing import Generator


[docs] class DropRowsBySubstring: """ Filter CSV rows by removing rows whose selected column contains a target substring. """ FILTERED_SUFFIX = ".filtered.csv" def __init__( self, csv_path: Path | str, column_name: str, unwanted_text: str, *, case_sensitive: bool = True, keep_header: bool = True, ) -> None: self.csv_path = Path(csv_path) self.column_name = str(column_name) self.unwanted_text = str(unwanted_text) self.case_sensitive = case_sensitive self.keep_header = keep_header def _find_column_index(self, header: list[str]) -> int: try: return header.index(self.column_name) except ValueError as exc: raise ValueError(f"Column not found in CSV header: {self.column_name!r}") from exc def _row_contains_unwanted_text(self, row: list[str], column_index: int) -> bool: if column_index >= len(row): return False cell_value = row[column_index] needle = self.unwanted_text if not self.case_sensitive: return needle.casefold() in cell_value.casefold() return needle in cell_value def _row_is_kept(self, row: list[str], column_index: int) -> bool: return not self._row_contains_unwanted_text(row, column_index) def _iter_rows_to_write(self) -> Generator[list[str], None, None]: with self.csv_path.open("r", newline="", encoding="utf-8") as file: reader = csv.reader(file) header = next(reader, None) if header is None: return column_index = self._find_column_index(header) if self.keep_header: yield header for row in reader: if not row: continue if self._row_is_kept(row, column_index): yield row
[docs] def iter_kept_rows(self) -> Generator[list[str], None, None]: yield from self._iter_rows_to_write()
[docs] def write_filtered_rows(self) -> None: output_path = self.csv_path.with_suffix(self.FILTERED_SUFFIX) if output_path == self.csv_path: raise ValueError("Output path would overwrite the input file") with output_path.open("w", newline="", encoding="utf-8") as output_file: writer = csv.writer(output_file) for row in self._iter_rows_to_write(): writer.writerow(row)
CSVCleaner = DropRowsBySubstring
[docs] def main(csv_path: Path | str, column_name: str, unwanted_text: str) -> None: cleaner = DropRowsBySubstring(csv_path, column_name, unwanted_text) cleaner.write_filtered_rows()