Skip to content

Loaders

CSV sniffing and catalog load argument preparation utilities. Detects separators and headers for catalog.yml generation, ensuring input files are parsed correctly by Kedro datasets.

loaders

Data loading utilities for CSV sniffing, characterization, and catalog configuration.

Provides CSV dialect detection (separator, header), full heuristic CSV characterization (skip-line detection, field separator, record separator), and prepares load_args dictionaries for Kedro catalog.yml generation.

When heuristic detection fails or produces an unusual delimiter, an optional LLM fallback (Google Gemini) is attempted before returning hard-coded defaults.

LLMCsvCharacterization

Bases: BaseModel

Structured output returned by the LLM for CSV characterization.

set_csv_llm_delegate

set_csv_llm_delegate(fn)

Register (or clear with None) the CSV LLM characterization delegate.

The delegate takes the file's first sample lines and returns the same dict shape as :func:_llm_characterize_csv (or None on failure).

Source code in src/choregraph/loaders.py
def set_csv_llm_delegate(
    fn: Optional[Callable[[List[str]], Optional[dict]]],
) -> None:
    """Register (or clear with ``None``) the CSV LLM characterization delegate.

    The delegate takes the file's first sample lines and returns the same dict
    shape as :func:`_llm_characterize_csv` (or ``None`` on failure).
    """
    global _csv_llm_delegate
    _csv_llm_delegate = fn

split_csv_line

split_csv_line(line, delimiter)

Parse a CSV line with the given delimiter, considering quotes.

Source code in src/choregraph/loaders.py
def split_csv_line(line: str, delimiter: str) -> List[str]:
    """Parse a CSV line with the given delimiter, considering quotes."""
    reader = csv.reader(io.StringIO(line), delimiter=delimiter, quotechar='"')
    return next(reader)

detect_skip_lines_smart

detect_skip_lines_smart(lines, delimiter_list, tolerance=1, min_ok_lines=2)

Detect start of data by parsing full CSV logic (handling multiline quotes).

Tries each delimiter in delimiter_list and returns the index of the first line that begins a consistent run of rows with similar column counts.

PARAMETER DESCRIPTION
lines

Lines read from the CSV file (first ~50 lines).

TYPE: List[str]

delimiter_list

Delimiters to try (e.g. [",", ";", "\t", "|"]).

TYPE: List[str]

tolerance

Allowed difference in column count between consecutive rows.

TYPE: int DEFAULT: 1

min_ok_lines

Minimum consecutive consistent rows to confirm data start.

TYPE: int DEFAULT: 2

RETURNS DESCRIPTION
int

Number of lines to skip (0 if data starts at the beginning).

Source code in src/choregraph/loaders.py
def detect_skip_lines_smart(
    lines: List[str],
    delimiter_list: List[str],
    tolerance: int = 1,
    min_ok_lines: int = 2,
) -> int:
    """Detect start of data by parsing full CSV logic (handling multiline quotes).

    Tries each delimiter in *delimiter_list* and returns the index of the first
    line that begins a consistent run of rows with similar column counts.

    Args:
        lines: Lines read from the CSV file (first ~50 lines).
        delimiter_list: Delimiters to try (e.g. ``[",", ";", "\\t", "|"]``).
        tolerance: Allowed difference in column count between consecutive rows.
        min_ok_lines: Minimum consecutive consistent rows to confirm data start.

    Returns:
        Number of lines to skip (0 if data starts at the beginning).
    """
    full_content = "\n".join(lines)

    for delimiter in delimiter_list:
        try:
            reader = csv.reader(full_content.splitlines(keepends=True), delimiter=delimiter)
            parsed_rows = list(reader)

            if not parsed_rows:
                continue

            for i, row in enumerate(parsed_rows):
                current_len = len(row)
                if current_len <= 1:
                    continue

                ok = 0
                for j in range(i + 1, min(i + 6, len(parsed_rows))):
                    next_row_len = len(parsed_rows[j])
                    if abs(next_row_len - current_len) <= tolerance:
                        ok += 1

                if ok >= min_ok_lines:
                    return i
        except Exception:
            continue

    return 0

remove_skiplines

remove_skiplines(filepath, skip_lines, buffer_size=8192)

Delete the first skip_lines lines from a file using buffered I/O.

PARAMETER DESCRIPTION
filepath

Path to the file to modify.

TYPE: str

skip_lines

Number of lines to remove from the top.

TYPE: int

buffer_size

Read buffer size in bytes.

TYPE: int DEFAULT: 8192

RETURNS DESCRIPTION
bool

True if the file was modified, False otherwise.

Source code in src/choregraph/loaders.py
def remove_skiplines(filepath: str, skip_lines: int, buffer_size: int = 8192) -> bool:
    """Delete the first *skip_lines* lines from a file using buffered I/O.

    Args:
        filepath: Path to the file to modify.
        skip_lines: Number of lines to remove from the top.
        buffer_size: Read buffer size in bytes.

    Returns:
        ``True`` if the file was modified, ``False`` otherwise.
    """
    if skip_lines <= 0:
        return False

    temp_file = str(filepath) + ".tmp"
    try:
        with open(safe_path(filepath), "rb") as f_in:
            lines_skipped = 0
            while lines_skipped < skip_lines:
                line = f_in.readline()
                if not line:
                    return False
                lines_skipped += 1

            start_pos = f_in.tell()
            with open(safe_path(temp_file), "wb") as f_out:
                f_in.seek(start_pos)
                while True:
                    chunk = f_in.read(buffer_size)
                    if not chunk:
                        break
                    f_out.write(chunk)

        os.replace(temp_file, filepath)
        return True
    except Exception as e:
        logger.error(f"Error removing skip lines: {e}")
        if os.path.exists(temp_file):
            os.remove(safe_path(temp_file))
        return False

characterize_csv

characterize_csv(filepath)

Full heuristic CSV characterization with optional LLM fallback.

Detects field separator, record separator, header presence, and non-data preamble lines. When the csv.Sniffer heuristic fails, detects a separator outside the usual set [; , \t |], or produces inconsistent column counts across rows, the function attempts to characterize the file via an LLM. If the LLM is unavailable or also fails, safe fallback values are returned.

The LLM step always runs server-side: a service registers a delegate via :func:set_csv_llm_delegate (file_service registers one that calls the ai_service /ai/characterize_csv endpoint, the same pattern as Excel tidying via /ai/preprocess_excel), keeping provider credentials confined to the ai_service. When no delegate is registered (e.g. metadata stat reading, or a standalone call), the LLM step is simply skipped and the heuristic / safe-default values are returned.

PARAMETER DESCRIPTION
filepath

Path to the CSV file.

TYPE: str

RETURNS DESCRIPTION
dict

Dict with keys header, fieldSeparator, recordSeparator,

dict

skipLines, modified.

Source code in src/choregraph/loaders.py
def characterize_csv(filepath: str) -> dict:
    """Full heuristic CSV characterization with optional LLM fallback.

    Detects field separator, record separator, header presence, and
    non-data preamble lines.  When the ``csv.Sniffer`` heuristic fails,
    detects a separator outside the usual set ``[; , \\t |]``, **or**
    produces inconsistent column counts across rows, the function
    attempts to characterize the file via an LLM.
    If the LLM is unavailable or also fails, safe fallback values are
    returned.

    The LLM step always runs *server-side*: a service registers a delegate via
    :func:`set_csv_llm_delegate` (file_service registers one that calls the
    ai_service ``/ai/characterize_csv`` endpoint, the same pattern as Excel
    tidying via ``/ai/preprocess_excel``), keeping provider credentials confined
    to the ai_service.  When no delegate is registered (e.g. ``metadata`` stat
    reading, or a standalone call), the LLM step is simply skipped and the
    heuristic / safe-default values are returned.

    Args:
        filepath: Path to the CSV file.

    Returns:
        Dict with keys ``header``, ``fieldSeparator``, ``recordSeparator``,
        ``skipLines``, ``modified``.
    """
    def run_llm_fallback(sample_lines: List[str]) -> Optional[dict]:
        # Single LLM path: the registered server-side delegate, or nothing.
        if _csv_llm_delegate is None:
            logger.debug("characterize_csv: no CSV LLM delegate registered — skipping LLM step")
            return None
        return _csv_llm_delegate(sample_lines)
    result = {
        "header": False,
        "fieldSeparator": None,
        "recordSeparator": None,
        "skipLines": 0,
        "modified": False,
    }
    fallback = {
        "header": True,
        "fieldSeparator": ",",
        "recordSeparator": "\n",
        "skipLines": 0,
        "modified": False,
    }

    is_built_from_excel = bool(
        re.search(r"sheet\d+_table\d+\.csv", str(filepath), re.IGNORECASE)
    )

    usual_del = [";", ",", "\t", "|"]
    sample_size = 10240
    sample_lines: List[str] = []
    newlines = None
    logger.debug("characterize_csv: reading sample from %s", filepath)
    try:
        with open(safe_path(filepath), "r", encoding="utf-8", errors="replace", newline=None) as f:
            for _ in range(50):
                line = f.readline()
                if not line:
                    break
                sample_lines.append(line)
            f.seek(0)
            f.read(min(sample_size, os.path.getsize(filepath)))
            newlines = f.newlines
    except Exception as e:
        logger.warning(f"characterize_csv: could not read {filepath}: {e}")
        return fallback

    logger.debug("characterize_csv: detecting skip lines")
    result["skipLines"] = detect_skip_lines_smart(sample_lines, usual_del)
    useful_lines = sample_lines[
        result["skipLines"]: min(10 + result["skipLines"], len(sample_lines))
    ]

    try:
        sample = "".join(useful_lines)
        if is_built_from_excel:
            logger.debug("characterize_csv: Excel-origin file — forcing comma delimiter")
            result["fieldSeparator"] = ","
            csv.Sniffer().sniff(sample, delimiters=[","])
            result["header"] = csv.Sniffer().has_header(sample)
        else:
            logger.debug("characterize_csv: sniffing with usual delimiters")
            dialect = csv.Sniffer().sniff(sample, delimiters=usual_del)
            result["fieldSeparator"] = dialect.delimiter
            result["header"] = csv.Sniffer().has_header(sample)

        if isinstance(newlines, str):
            result["recordSeparator"] = newlines.encode().decode("unicode_escape")
        else:
            result["recordSeparator"] = dialect.lineterminator.encode().decode(
                "unicode_escape"
            )
    except Exception:
        # Heuristic failed entirely — try LLM fallback before returning defaults.
        logger.debug("characterize_csv: Sniffer failed — attempting LLM fallback")
        llm_result = run_llm_fallback(sample_lines)
        if llm_result is not None:
            logger.info("characterize_csv: using LLM characterization result")
            result = llm_result
        else:
            return fallback
        # Result already came from LLM — skip the Sniffer post-checks below.
        sniffer_succeeded = False
    else:
        sniffer_succeeded = True

    # If the Sniffer succeeded but detected an unusual separator, double-check
    # with the LLM (it may understand uncommon formats better).
    if sniffer_succeeded and result["fieldSeparator"] and result["fieldSeparator"] not in usual_del:
        logger.debug(
            "characterize_csv: unusual separator %r detected — attempting LLM fallback",
            result["fieldSeparator"],
        )
        llm_result = run_llm_fallback(sample_lines)
        if llm_result is not None:
            logger.info("characterize_csv: using LLM characterization for unusual separator")
            result = llm_result
        # Otherwise keep the Sniffer result as-is.

    # If Sniffer succeeded with a usual separator, verify consistency: parse
    # the useful lines with the detected separator and check that the column
    # count is stable across rows.  Inconsistency often means the true
    # separator was mis-identified (e.g. CSV with embedded JSON full of commas
    # being detected as comma-separated when it is actually semicolon-separated).
    elif sniffer_succeeded and result["fieldSeparator"]:
        if not _check_column_consistency(useful_lines, result["fieldSeparator"]):
            logger.info(
                "characterize_csv: inconsistent column counts with separator %r "
                "— trying alternative separators",
                result["fieldSeparator"],
            )
            # --- Heuristic: try the other usual separators first (no LLM needed) ---
            alt_sep = _find_best_separator(
                useful_lines, usual_del, exclude=result["fieldSeparator"]
            )
            if alt_sep:
                logger.info(
                    "characterize_csv: alternative separator %r gives consistent columns",
                    alt_sep,
                )
                result["fieldSeparator"] = alt_sep
                result["header"] = csv.Sniffer().has_header(
                    "".join(useful_lines)
                )
            else:
                # No usual separator works — try the LLM as last resort.
                logger.info(
                    "characterize_csv: no alternative separator found — attempting LLM fallback"
                )
                llm_result = run_llm_fallback(sample_lines)
                if llm_result is not None:
                    logger.info(
                        "characterize_csv: using LLM characterization after consistency check failure"
                    )
                    result = llm_result
                # Otherwise keep the Sniffer result as-is (best-effort).

    if result["skipLines"] > 0:
        modified = remove_skiplines(filepath, result["skipLines"])
        if modified:
            result["modified"] = True
            result["skipLines"] = 0

    return result

sniff_csv_options

sniff_csv_options(filepath)

Detect CSV separator and header row from a file sample.

Reads the first 2048 bytes of the file and uses :mod:csv.Sniffer to detect the dialect.

PARAMETER DESCRIPTION
filepath

Path to the CSV file.

TYPE: str

RETURNS DESCRIPTION
Dict[str, Any]

Dict with detected sep and header values (may be empty if

Dict[str, Any]

detection fails).

Source code in src/choregraph/loaders.py
def sniff_csv_options(filepath: str) -> Dict[str, Any]:
    """Detect CSV separator and header row from a file sample.

    Reads the first 2048 bytes of the file and uses :mod:`csv.Sniffer` to
    detect the dialect.

    Args:
        filepath: Path to the CSV file.

    Returns:
        Dict with detected ``sep`` and ``header`` values (may be empty if
        detection fails).
    """
    options = {}
    path = Path(filepath)
    if not path.exists():
        return options

    try:
        with open(safe_path(path), 'r', encoding='utf-8', errors='replace') as f:
            sample = f.read(2048)
            sniffer = csv.Sniffer()
            try:
                dialect = sniffer.sniff(sample)
                options['sep'] = dialect.delimiter
                if sniffer.has_header(sample):
                    options['header'] = 0
                else:
                    options['header'] = None
            except csv.Error:
                pass
    except Exception as e:
        logger.warning(f"Error sniffing CSV {filepath}: {e}")

    return options

prepare_load_args

prepare_load_args(fmt, location, options=None)

Prepare load_args for Kedro catalog.yml generation.

Merges user-provided options with auto-detected CSV settings.

PARAMETER DESCRIPTION
fmt

Data format string (e.g. "CSV", "JSON").

TYPE: str

location

File path for auto-detection.

TYPE: str

options

User-provided load options (take precedence over sniffed values).

TYPE: Dict[str, Any] DEFAULT: None

RETURNS DESCRIPTION
Dict[str, Any]

Dict of load arguments suitable for catalog.yml.

Source code in src/choregraph/loaders.py
def prepare_load_args(fmt: str, location: str, options: Dict[str, Any] = None) -> Dict[str, Any]:
    """Prepare ``load_args`` for Kedro catalog.yml generation.

    Merges user-provided options with auto-detected CSV settings.

    Args:
        fmt: Data format string (e.g. ``"CSV"``, ``"JSON"``).
        location: File path for auto-detection.
        options: User-provided load options (take precedence over sniffed values).

    Returns:
        Dict of load arguments suitable for ``catalog.yml``.
    """
    load_args = (options or {}).copy()
    # Strip metadata attributes that are stored in options but should NOT
    # be passed to pandas/kedro as load_args.
    for k in ("temporalFiles", "collectionTimeMode", "collectionTimeDelta"):
        load_args.pop(k, None)
    if fmt.upper() == "CSV":
        if "sep" not in load_args:
            sniffed = sniff_csv_options(location)
            if "sep" in sniffed: load_args["sep"] = sniffed["sep"]
            if "header" in sniffed: load_args["header"] = sniffed["header"]
    return load_args