Source code for csvsmith.utils.io
from __future__ import annotations
import csv
from contextlib import contextmanager
from pathlib import Path
from typing import Any, Hashable, Iterable, Iterator, Mapping, Sequence
from openpyxl import load_workbook
from openpyxl.workbook.workbook import Workbook
from openpyxl.worksheet.worksheet import Worksheet
Row = dict[str, Any]
from collections import Counter
[docs]
def count_duplicates_sorted(
items: Iterable[Hashable],
threshold: int = 2,
reverse: bool = True,
) -> list[tuple[Hashable, int]]:
"""Count items and return those occurring at least `threshold` times."""
counter = Counter(items)
duplicates = [(key, count) for key, count in counter.items() if count >= threshold]
duplicates.sort(key=lambda x: x[1], reverse=reverse)
return duplicates
@contextmanager
def _open_worksheet(excel_path: str | Path, *, sheet_name: str | None = None) -> Iterator[Worksheet]:
"""Yield a worksheet from an Excel workbook and close the workbook afterward."""
workbook: Workbook = load_workbook(Path(excel_path), read_only=True, data_only=True)
try:
worksheet = workbook[sheet_name] if sheet_name else workbook.active
yield worksheet
finally:
workbook.close()
[docs]
def iter_worksheet_rows(worksheet: Worksheet) -> Iterable[list[str]]:
"""Yield worksheet rows as CSV-ready strings."""
for row in worksheet.iter_rows():
yield ["" if cell.value is None else str(cell.value) for cell in row]
[docs]
def write_worksheet_to_csv(worksheet: Worksheet, csv_path: str | Path) -> None:
"""Write worksheet rows to a CSV file."""
csv_path = Path(csv_path)
csv_path.parent.mkdir(parents=True, exist_ok=True)
with csv_path.open("w", newline="", encoding="utf-8") as file:
writer = csv.writer(file)
writer.writerows(iter_worksheet_rows(worksheet))
[docs]
def read_csv_rows(csv_path: Path | str, encoding: str = "utf-8-sig") -> list[Row]:
"""Read a CSV file into a list of row dictionaries."""
path = Path(csv_path)
with path.open("r", encoding=encoding, newline="") as fp:
reader = csv.DictReader(fp)
return list(reader)
[docs]
def write_csv_rows(
csv_path: Path | str,
rows: Sequence[Mapping[str, object]],
*,
fieldnames: Sequence[str],
encoding: str = "utf-8-sig",
) -> None:
"""Write row dictionaries to a CSV file."""
path = Path(csv_path)
with path.open("w", encoding=encoding, newline="") as fp:
writer = csv.DictWriter(fp, fieldnames=fieldnames)
writer.writeheader()
for row in rows:
writer.writerow(row)