Skip to content

Metadata

Extracts field-level statistics from pandas DataFrames — data types, min/max values, distinct counts, and unique categorical values. Used by DiveConnector to populate the fields section of VisuSpec XML. Optimized for large datasets with configurable thresholds.

metadata

Metadata extraction and caching for Choregraph datasets. Provides MetadataExtractor for DataFrame analysis and Metadata for persistence.

FieldMetadata dataclass

FieldMetadata(id, name, data_type, min_value=None, max_value=None, is_unique=False, units='UNITLESS', distinct_count=-1, uniques='', info=None)

Metadata for a single DataFrame column.

ATTRIBUTE DESCRIPTION
id

Sequential field identifier (string).

TYPE: str

name

Column name from the DataFrame.

TYPE: str

data_type

One of INTEGER, FLOAT, DATETIME, STRING, BOOLEAN, OBJECT.

TYPE: str

min_value

Minimum value (numeric/datetime columns only).

TYPE: Optional[Any]

max_value

Maximum value (numeric/datetime columns only).

TYPE: Optional[Any]

is_unique

Whether all values in the column are unique.

TYPE: bool

units

Unit label (default "UNITLESS").

TYPE: str

distinct_count

Number of distinct values (-1 if unknown).

TYPE: int

uniques

Comma-separated string of unique values (categorical fields).

TYPE: str

DatasetStats dataclass

DatasetStats(id, name, row_count, fields, last_updated='', info=None)

Complete stats for a single dataset.

MetadataResult

Bases: UserDict

Wrapper around the dict of DatasetStats to allow formatting methods. Behaves exactly like a Dict[str, DatasetStats], but adds .format().

format

format(format_type='markdown', user_message='', detailed=True)

Format the metadata collection into a string representation.

PARAMETER DESCRIPTION
format_type

"markdown", "json"

TYPE: str DEFAULT: 'markdown'

user_message

Filter fields based on user query context.

TYPE: str DEFAULT: ''

detailed

Include all stats columns (min/max/uniques).

TYPE: bool DEFAULT: True

Source code in src/choregraph/metadata.py
def format(self, format_type: str = "markdown", user_message: str = "", detailed: bool = True) -> str:
    """
    Format the metadata collection into a string representation.

    Args:
        format_type: "markdown", "json"
        user_message: Filter fields based on user query context.
        detailed: Include all stats columns (min/max/uniques).
    """
    if format_type == "markdown":
        return self._to_markdown(user_message, detailed)
    elif format_type == "json":
         # Use asdict to convert dataclasses
         data_dict = {k: asdict(v) for k, v in self.data.items()}
         return json.dumps(data_dict, default=str, indent=2)
    else:
        raise ValueError(f"Unknown format: {format_type}")

to_api_format

to_api_format()

Convert to list-of-dicts format for the viz API (metadata.json).

RETURNS DESCRIPTION
list

List of dataset metadata dicts with keys: data_id, name, rows, fields.

Source code in src/choregraph/metadata.py
def to_api_format(self) -> list:
    """Convert to list-of-dicts format for the viz API (metadata.json).

    Returns:
        List of dataset metadata dicts with keys: data_id, name, rows, fields.
    """
    result = []
    for name, stats in self.data.items():
        fields_list = []
        for f in stats.fields:
            field_dict = {
                "field_id": f.id,
                "name": f.name,
                "data_type": f.data_type,
                "field_min": str(f.min_value) if f.min_value is not None else "0",
                "field_max": str(f.max_value) if f.max_value is not None else str(f.distinct_count or -1),
                "distinct_count": str(f.distinct_count or -1)
            }
            if f.uniques:
                field_dict["uniques"] = f.uniques
            fields_list.append(field_dict)

        result.append({
            "data_id": str(stats.id) if stats.id and str(stats.id) != "0" else name,
            "name": name,
            "rows": str(stats.row_count),
            "fields": fields_list
        })
    return result

from_datasets classmethod

from_datasets(datasets)

Build a MetadataResult from a list of dataset dicts.

Uses the same DatasetStats.from_dict() / FieldMetadata.from_dict() deserialization as Metadata.read_from_cache(), so both the workspace-based web flow and the stateless API produce identical objects.

PARAMETER DESCRIPTION
datasets

List of dicts with keys matching DatasetStats fields (id, name, row_count, fields).

TYPE: list

Source code in src/choregraph/metadata.py
@classmethod
def from_datasets(cls, datasets: list) -> "MetadataResult":
    """Build a MetadataResult from a list of dataset dicts.

    Uses the same ``DatasetStats.from_dict()`` / ``FieldMetadata.from_dict()``
    deserialization as ``Metadata.read_from_cache()``, so both the
    workspace-based web flow and the stateless API produce identical objects.

    Args:
        datasets: List of dicts with keys matching ``DatasetStats`` fields
            (``id``, ``name``, ``row_count``, ``fields``).
    """
    result = {}
    for ds in datasets:
        stats = DatasetStats.from_dict(ds)
        result[stats.name] = stats
    return cls(result)

MetadataExtractor

Analyzes a pandas DataFrame to extract metadata. Optimized for performance on large datasets.

extract classmethod

extract(df)

Extract field-level metadata from a DataFrame.

PARAMETER DESCRIPTION
df

Input DataFrame to analyze.

TYPE: DataFrame

RETURNS DESCRIPTION
List[FieldMetadata]

List of :class:FieldMetadata objects, one per column.

Source code in src/choregraph/metadata.py
@classmethod
def extract(cls, df: pd.DataFrame) -> List[FieldMetadata]:
    """Extract field-level metadata from a DataFrame.

    Args:
        df: Input DataFrame to analyze.

    Returns:
        List of :class:`FieldMetadata` objects, one per column.
    """
    from .dtype_inference import infer_dtypes
    infer_dtypes(df)

    metadata = []
    field_id = 1
    row_count = len(df)
    is_large = row_count > cls.LARGE_DATASET_THRESHOLD

    if is_large:
        print(f"[DEBUG] Large dataset detected (>{cls.LARGE_DATASET_THRESHOLD} rows), optimizations enabled")

    for column in df.columns:
        if str(column).startswith("Unnamed:"): #skip unnamed columns
            continue

        series = df[column]
        dtype = cls._map_dtype(series.dtype)

        distinct_count = -1
        if not is_large or dtype == "STRING":
            distinct_count = _safe_nunique(series)

        field_meta = FieldMetadata(
            id=str(field_id),
            name=str(column),
            data_type=dtype,
            distinct_count=distinct_count
        )

        if dtype in ("INTEGER", "FLOAT"):
            v_min = series.min()
            v_max = series.max()
            if pd.notna(v_min):
                field_meta.min_value = v_min
            if pd.notna(v_max):
                field_meta.max_value = v_max
        elif dtype == "DATETIME":
            m = series.min()
            M = series.max()
            if pd.notna(m):
                field_meta.min_value = _timestamp_to_ns(m)
            if pd.notna(M):
                field_meta.max_value = _timestamp_to_ns(M)
        elif dtype in ("OBJECT", "STRING"):
            distinct_count = _safe_nunique(series)
            field_meta.distinct_count = distinct_count
            uniques = _safe_unique(series)
            if distinct_count < cls.HIGH_UNIQUE_COUNT_THRESHOLD:
                uniques_str = str(uniques)
            else:
                uniques_str = str(uniques[:cls.HIGH_UNIQUE_COUNT_THRESHOLD-3])[:-1] + " ... " + str(uniques[-3:])[1:]
            field_meta.uniques = uniques_str

        if distinct_count == row_count:
            field_meta.is_unique = True
        elif not is_large:
            try:
                if series.is_unique:
                    field_meta.is_unique = True
            except TypeError:
                pass

        metadata.append(field_meta)
        field_id += 1

    return metadata

Metadata

Metadata(workspace_path)

Centralized manager for dataset metadata. Reads directly from catalogue_stats.json without in-memory caching.

Source code in src/choregraph/metadata.py
def __init__(self, workspace_path: Path):
    self.cache_path = workspace_path / "pipeline" / "cache" / "catalogue_stats.json"

update_stats

update_stats(name, df, dataset_id=None, dataset_type='input')

Calculate and store stats for a dataset.

PARAMETER DESCRIPTION
name

Dataset name (Kedro catalog key)

TYPE: str

df

The data to analyze (DataFrame, dict, or list)

dataset_id

Optional spec ID (input ID or output port ID)

TYPE: str DEFAULT: None

dataset_type

"input" or "output"

TYPE: str DEFAULT: 'input'

Source code in src/choregraph/metadata.py
def update_stats(self, name: str, df, dataset_id: str = None, dataset_type: str = "input"):
    """
    Calculate and store stats for a dataset.

    Args:
        name: Dataset name (Kedro catalog key)
        df: The data to analyze (DataFrame, dict, or list)
        dataset_id: Optional spec ID (input ID or output port ID)
        dataset_type: "input" or "output"
    """

    if isinstance(df, pd.DataFrame):
        row_count = len(df)
        fields = MetadataExtractor.extract(df)
        self.store_stats(name, fields, row_count, dataset_id=dataset_id, dataset_type=dataset_type)
    elif isinstance(df, (dict, list)):
        fields, dataset_info = self._describe_json_structure(df)
        carto = dataset_info.pop("_carto", None) if dataset_info else None
        row_count = carto.get("length", 0) if carto else (
            len(df) if isinstance(df, list) else len(df.keys())
        )
        self.store_stats(
            name, fields, row_count,
            dataset_id=dataset_id, dataset_type=dataset_type,
            dataset_info=dataset_info,
        )
    else:
        # Check for PIL Image (loaded by pillow.ImageDataset)
        try:
            from PIL import Image as PILImage
            if isinstance(df, PILImage.Image):
                fields = self._describe_image(df)
                self.store_stats(name, fields, 1, dataset_id=dataset_id, dataset_type=dataset_type)
                return
        except ImportError:
            pass
        return

store_stats

store_stats(name, fields, row_count, dataset_id=None, dataset_type='input', dataset_info=None)

Store pre-extracted stats for a dataset directly to JSON file.

PARAMETER DESCRIPTION
name

Dataset name (Kedro catalog key)

TYPE: str

fields

Pre-extracted field metadata list

TYPE: List[FieldMetadata]

row_count

Number of rows in the dataset

TYPE: int

dataset_id

Optional XML ID of the dataset

TYPE: str DEFAULT: None

dataset_type

"input" or "output"

TYPE: str DEFAULT: 'input'

dataset_info

Optional structural description (e.g. JSON cartography) stored under the info key and rendered by :meth:MetadataResult._to_markdown via info["extract_with"].

TYPE: Optional[Dict[str, Any]] DEFAULT: None

Source code in src/choregraph/metadata.py
def store_stats(self, name: str, fields: List[FieldMetadata], row_count: int, dataset_id: str = None, dataset_type: str = "input", dataset_info: Optional[Dict[str, Any]] = None):
    """
    Store pre-extracted stats for a dataset directly to JSON file.

    Args:
        name: Dataset name (Kedro catalog key)
        fields: Pre-extracted field metadata list
        row_count: Number of rows in the dataset
        dataset_id: Optional XML ID of the dataset
        dataset_type: "input" or "output"
        dataset_info: Optional structural description (e.g. JSON cartography)
            stored under the ``info`` key and rendered by
            :meth:`MetadataResult._to_markdown` via ``info["extract_with"]``.
    """
    # Load existing data from JSON
    existing_data = {"datasets": {}, "last_pipeline_run": ""}
    if self.cache_path.exists():
        try:
            with open(safe_path(self.cache_path), "r", encoding="utf-8") as f:
                existing_data = json.load(f)
        except Exception as e:
            print(f"[WARNING] Warning: could not read existing cache: {e}")

    # Ensure expected keys exist (handles legacy files initialized with "{}")
    existing_data.setdefault("datasets", {})
    existing_data.setdefault("last_pipeline_run", "")

    # Convert fields to serializable format
    fields_data = []
    for f in fields:
        field_dict = {
            "id": f.id,
            "name": f.name,
            "data_type": f.data_type,
            "min_value": f.min_value,
            "max_value": f.max_value,
            "is_unique": f.is_unique,
            "distinct_count": f.distinct_count,
            "uniques": f.uniques
        }
        fields_data.append(field_dict)

    # Remove any existing entry with the same dataset_id (under a different name)
    # to avoid duplicate IDs confusing downstream consumers
    if dataset_id is not None:
        to_remove = [
            existing_name for existing_name, existing_entry in existing_data["datasets"].items()
            if existing_entry.get("id") == dataset_id and existing_name != name
        ]
        for existing_name in to_remove:
            del existing_data["datasets"][existing_name]

    # Add/update the dataset entry
    entry: Dict[str, Any] = {
        "id": dataset_id,
        "type": dataset_type,
        "row_count": row_count,
        "fields": fields_data,
        "last_updated": datetime.now().isoformat()
    }
    if dataset_info:
        entry["info"] = dataset_info
    existing_data["datasets"][name] = entry
    existing_data["last_pipeline_run"] = datetime.now().isoformat()

    # Write back to JSON
    self.cache_path.parent.mkdir(parents=True, exist_ok=True)
    try:
        with open(safe_path(self.cache_path), "w", encoding="utf-8") as f:
            json.dump(existing_data, f, indent=2, default=str)
    except Exception as e:
        print(f"[ERROR] ERROR writing stats for '{name}': {e}")
        raise

write_raw_cache

write_raw_cache(json_string)

Write a raw JSON string directly to catalogue_stats.json.

Used by the API flow: the Toolkit sends the pre-built catalogue_stats and the server writes it as-is.

Source code in src/choregraph/metadata.py
def write_raw_cache(self, json_string: str) -> None:
    """Write a raw JSON string directly to catalogue_stats.json.

    Used by the API flow: the Toolkit sends the pre-built catalogue_stats
    and the server writes it as-is.
    """
    self.cache_path.parent.mkdir(parents=True, exist_ok=True)
    with open(safe_path(self.cache_path), "w", encoding="utf-8") as f:
        f.write(json_string)

read_from_cache

read_from_cache(dataset_ids=None)

Load stats directly from catalogue_stats.json.

PARAMETER DESCRIPTION
dataset_ids

If provided, only retrieves metadata for these specific dataset IDs. Accepts a single string or a list of strings. If None, retrieves all datasets.

TYPE: Optional[List[str]] DEFAULT: None

RETURNS DESCRIPTION
MetadataResult

MetadataResult (smart dict of dataset name -> DatasetStats)

Source code in src/choregraph/metadata.py
def read_from_cache(self, dataset_ids: Optional[List[str]] = None) -> MetadataResult:
    """
    Load stats directly from catalogue_stats.json.

    Args:
        dataset_ids: If provided, only retrieves metadata for these specific dataset IDs.
                     Accepts a single string or a list of strings.
                     If None, retrieves all datasets.

    Returns:
        MetadataResult (smart dict of dataset name -> DatasetStats)
    """
    if isinstance(dataset_ids, str):
        dataset_ids = [dataset_ids]


    if not self.cache_path.exists():
        print(f"[WARNING] Cache file does not exist, returning empty result")
        return MetadataResult({})

    try:
        with open(safe_path(self.cache_path), "r", encoding="utf-8") as f:
            data = json.load(f)

        datasets = data.get("datasets", {})
        result = {}

        for name, entry in datasets.items():
            entry_id = str(entry.get("id") or "0")
            if dataset_ids is not None and entry_id not in dataset_ids:
                continue
            result[name] = DatasetStats.from_dict(entry, name=name)

        return MetadataResult(result)

    except Exception as e:
        print(f"[ERROR] ERROR reading catalogue_stats.json: {e}")
        return MetadataResult({})

clear

clear()

Clear the JSON file on disk.

Source code in src/choregraph/metadata.py
def clear(self):
    """Clear the JSON file on disk."""

    if self.cache_path.exists():
        try:
            self.cache_path.unlink()
        except Exception as e:
            print(f"[DEBUG] Error deleting cache file: {e}")
    else:
        print(f"[DEBUG] Cache file does not exist, nothing to clear")

get

get(name)

Get stats for a specific dataset directly from JSON.

Source code in src/choregraph/metadata.py
def get(self, name: str) -> Optional[DatasetStats]:
    """Get stats for a specific dataset directly from JSON."""
    result = self.read_from_cache()
    return result.get(name)

__contains__

__contains__(name)

Check if a dataset exists in the JSON file.

Source code in src/choregraph/metadata.py
def __contains__(self, name: str) -> bool:
    """Check if a dataset exists in the JSON file."""
    return self.get(name) is not None

__len__

__len__()

Return the number of datasets in the JSON file.

Source code in src/choregraph/metadata.py
def __len__(self) -> int:
    """Return the number of datasets in the JSON file."""
    all_datasets = self.read_from_cache()
    return len(all_datasets)

remove_datasets

remove_datasets(names)

Remove datasets from catalogue_stats.json by name.

PARAMETER DESCRIPTION
names

Dataset names (filename stems) to remove.

TYPE: List[str]

RETURNS DESCRIPTION
int

Number of datasets actually removed.

Source code in src/choregraph/metadata.py
def remove_datasets(self, names: List[str]) -> int:
    """Remove datasets from catalogue_stats.json by name.

    Args:
        names: Dataset names (filename stems) to remove.

    Returns:
        Number of datasets actually removed.
    """
    if not self.cache_path.exists() or not names:
        return 0
    try:
        with open(safe_path(self.cache_path), "r", encoding="utf-8") as f:
            catalogue = json.load(f)
        removed = 0
        datasets = catalogue.get("datasets", {})
        for name in names:
            if datasets.pop(name, None) is not None:
                removed += 1
        if removed:
            with open(safe_path(self.cache_path), "w", encoding="utf-8") as f:
                json.dump(catalogue, f, indent=2, default=str, ensure_ascii=False)
        return removed
    except Exception:
        return 0

add_partition_field

add_partition_field(dataset_name, n_partitions, partition_label='partition')

Add virtual __partition__ field to a partitioned dataset's metadata.

The field doesn't exist in the actual data files — it represents the index of each partition (file) in the dataset.

PARAMETER DESCRIPTION
dataset_name

Name of the dataset in the catalogue.

TYPE: str

n_partitions

Number of partitions.

TYPE: int

partition_label

Semantic label (e.g. "time", "sheet", "slice").

TYPE: str DEFAULT: 'partition'

Source code in src/choregraph/metadata.py
def add_partition_field(self, dataset_name: str, n_partitions: int,
                       partition_label: str = "partition"):
    """Add virtual ``__partition__`` field to a partitioned dataset's metadata.

    The field doesn't exist in the actual data files — it represents
    the index of each partition (file) in the dataset.

    Args:
        dataset_name: Name of the dataset in the catalogue.
        n_partitions: Number of partitions.
        partition_label: Semantic label (e.g. "time", "sheet", "slice").
    """
    if not self.cache_path.exists():
        return
    try:
        with open(self.cache_path, "r", encoding="utf-8") as f:
            catalogue = json.load(f)
        ds = catalogue.get("datasets", {}).get(dataset_name)
        if not ds:
            return
        fields = ds.setdefault("fields", [])
        if any(f.get("name") == "__partition__" for f in fields):
            return
        next_id = str(max((int(f.get("id", 0)) for f in fields), default=0) + 1)
        fields.append({
            "id": next_id, "name": "__partition__", "data_type": "FLOAT",
            "min_value": 0.0, "max_value": float(n_partitions - 1), "distinct_count": n_partitions,
            "units": partition_label,
        })
        with open(self.cache_path, "w", encoding="utf-8") as f:
            json.dump(catalogue, f, indent=2, default=str, ensure_ascii=False)
    except Exception:
        pass

merge_datasets

merge_datasets(entries)

Merge pre-computed dataset entries into catalogue_stats.json.

Each entry should follow the catalogue_stats schema::

{
    "row_count": int,
    "fields": [{"id", "name", "data_type", ...}],
    "type": "input",
    ...
}
PARAMETER DESCRIPTION
entries

Dict of dataset_name -> stats dict.

TYPE: Dict[str, dict]

RETURNS DESCRIPTION
int

Number of datasets merged.

Source code in src/choregraph/metadata.py
def merge_datasets(self, entries: Dict[str, dict]) -> int:
    """Merge pre-computed dataset entries into catalogue_stats.json.

    Each entry should follow the catalogue_stats schema::

        {
            "row_count": int,
            "fields": [{"id", "name", "data_type", ...}],
            "type": "input",
            ...
        }

    Args:
        entries: Dict of dataset_name -> stats dict.

    Returns:
        Number of datasets merged.
    """
    if not entries:
        return 0
    self.cache_path.parent.mkdir(parents=True, exist_ok=True)
    catalogue = {"datasets": {}, "last_pipeline_run": ""}
    if self.cache_path.exists():
        try:
            with open(safe_path(self.cache_path), "r", encoding="utf-8") as f:
                catalogue = json.load(f)
        except Exception:
            pass
    catalogue.setdefault("datasets", {})
    merged = 0
    for name, entry in entries.items():
        # Validate minimal structure
        if "fields" in entry:
            catalogue["datasets"][name] = entry
            merged += 1
    if merged:
        with open(safe_path(self.cache_path), "w", encoding="utf-8") as f:
            json.dump(catalogue, f, indent=2, default=str, ensure_ascii=False)
    return merged

compute_file_stats

compute_file_stats(file_paths)

Compute metadata for one or more data files.

Accepts a single path or a list of paths. When multiple paths are given (e.g. all CSVs in a temporal group), tabular files are aggregated so that min/max/distinct reflect the full range across the group. Non-tabular formats only use the first path.

Supports CSV, TSV, Parquet, JSON, images (PNG/JPG/TIFF/BMP/WEBP/GIF), and MHD volumes. Returns a stats dict in the same format as catalogue_stats.json dataset entries, or None if the file type is unsupported.

This is a standalone function — no Kedro, no workspace, no DB needed.

PARAMETER DESCRIPTION
file_paths

Absolute path to a file, or a list of paths to aggregate over.

TYPE: str | List[str]

RETURNS DESCRIPTION
Optional[dict]

Dict with row_count, fields list, type, last_updated

Optional[dict]

keys, or None if unsupported.

Source code in src/choregraph/metadata.py
def compute_file_stats(file_paths: "str | List[str]") -> Optional[dict]:
    """Compute metadata for one or more data files.

    Accepts a single path or a list of paths.  When multiple paths are
    given (e.g. all CSVs in a temporal group), tabular files are
    aggregated so that min/max/distinct reflect the full range across
    the group.  Non-tabular formats only use the first path.

    Supports CSV, TSV, Parquet, JSON, images (PNG/JPG/TIFF/BMP/WEBP/GIF),
    and MHD volumes. Returns a stats dict in the same format as
    ``catalogue_stats.json`` dataset entries, or ``None`` if the file type
    is unsupported.

    This is a standalone function — no Kedro, no workspace, no DB needed.

    Args:
        file_paths: Absolute path to a file, or a list of paths to
            aggregate over.

    Returns:
        Dict with ``row_count``, ``fields`` list, ``type``, ``last_updated``
        keys, or ``None`` if unsupported.
    """
    import os
    import pandas as _pd

    paths = [file_paths] if isinstance(file_paths, str) else list(file_paths)
    if not paths:
        return None

    primary = paths[0]
    ext = os.path.splitext(primary)[1].lower()

    fields: Optional[List[FieldMetadata]] = None
    row_count = 0
    dataset_info: Optional[Dict[str, Any]] = None

    if ext in (".csv", ".tsv", ".parquet"):
        try:
            dfs = []
            for p in paths:
                df = _read_tabular(p)
                if df is not None:
                    dfs.append(df)
            if not dfs:
                return None
            df = _pd.concat(dfs, ignore_index=True) if len(dfs) > 1 else dfs[0]
            fields = MetadataExtractor.extract(df)
            row_count = len(df)
        except Exception:
            return None

    elif ext == ".json":
        try:
            with open(primary, "r", encoding="utf-8") as jf:
                json_data = json.load(jf)
        except (json.JSONDecodeError, OSError) as e:
            print(f"[WARNING] Failed to parse JSON {primary}: {e}")
            return None
        fields, dataset_info = Metadata._describe_json_structure(json_data)
        carto = dataset_info.pop("_carto", None) if dataset_info else None
        row_count = carto.get("length", 0) if carto else (
            len(json_data) if isinstance(json_data, list)
            else len(json_data.keys()) if isinstance(json_data, dict) else 0
        )

    elif ext in (".png", ".jpg", ".jpeg", ".tiff", ".tif", ".bmp", ".webp", ".gif"):
        try:
            from PIL import Image as PILImage
            img = PILImage.open(primary)
            fields = Metadata._describe_image(img)
            row_count = 1
        except Exception:
            return None

    elif ext == ".mhd":
        try:
            fields = Metadata._describe_mhd(primary)
            row_count = 1
        except Exception:
            return None

    elif ext == ".dcm":
        try:
            fields = Metadata._describe_dicom(primary)
            row_count = 1
        except Exception:
            return None

    elif ext == ".edf":
        try:
            fields, dataset_info = _describe_edf(primary)
            row_count = dataset_info.pop("_total_samples", 0)
        except Exception:
            return None

    else:
        return None

    if fields is None:
        return None

    result = {
        "row_count": row_count,
        "fields": [
            {
                "id": f.id,
                "name": f.name,
                "data_type": f.data_type,
                "min_value": f.min_value,
                "max_value": f.max_value,
                "is_unique": f.is_unique,
                "units": f.units,
                "distinct_count": f.distinct_count,
                "uniques": f.uniques,
                **({"info": f.info} if f.info else {}),
            }
            for f in fields
        ],
        "type": "input",
        "last_updated": datetime.now().isoformat(),
    }
    if dataset_info:
        result["info"] = dataset_info
    return result