Skip to content

Core Transforms

The core transform function library — 50 DataFrame operations registered in TRANSFORM_REGISTRY. These functions are used by the Builder when constructing Kedro pipeline nodes from the XML specification.

All functions follow a consistent pattern: accept a DataFrame (and parameters), return a DataFrame (or scalar). Functions with return_mask=True support return both a filtered result and a boolean mask.

library

Transform function library -- the extensible registry of data operations.

Defines 50+ DataFrame transform functions organized by category (filtering, aggregation, column/row operations, calculations, multi-input joins, advanced transformations, JSON extraction). All functions are registered in :data:TRANSFORM_REGISTRY, which the builder uses to look up implementations when constructing Kedro pipeline nodes from an XML specification.

JsonTooDeepError

Bases: ValueError

Raised by :func:cartograph_json when input nests deeper than :data:MAX_JSON_DEPTH.

calculate_min

calculate_min(df=None, column=None, input_list=None)

Calculate the minimum value from a DataFrame column or a list.

PARAMETER DESCRIPTION
df

Input DataFrame (mutually exclusive with input_list).

TYPE: DataFrame DEFAULT: None

column

Column name to compute the minimum of.

TYPE: str DEFAULT: None

input_list

Plain Python list to compute the minimum of.

TYPE: list DEFAULT: None

RETURNS DESCRIPTION
float

The minimum value as a scalar.

RAISES DESCRIPTION
ValueError

If neither df nor input_list is provided, or if column is not found in the DataFrame.

Source code in src/choregraph/library.py
def calculate_min(df: pd.DataFrame = None, column: str = None, input_list: list = None) -> float:
    """Calculate the minimum value from a DataFrame column or a list.

    Args:
        df: Input DataFrame (mutually exclusive with ``input_list``).
        column: Column name to compute the minimum of.
        input_list: Plain Python list to compute the minimum of.

    Returns:
        The minimum value as a scalar.

    Raises:
        ValueError: If neither ``df`` nor ``input_list`` is provided, or if
            ``column`` is not found in the DataFrame.
    """
    if df is not None:
        if column not in df.columns:
            raise ValueError(f"Column '{column}' not found.")
        min_val = df[column].min()
    elif input_list is not None:
        if not input_list:
            min_val = None
        else:
            min_val = min(input_list)
    else:
        raise ValueError("Either 'df' or 'input_list' must be provided.")

    return min_val

calculate_max

calculate_max(df=None, column=None, input_list=None)

Calculate the maximum value from a DataFrame column or a list.

PARAMETER DESCRIPTION
df

Input DataFrame (mutually exclusive with input_list).

TYPE: DataFrame DEFAULT: None

column

Column name to compute the maximum of.

TYPE: str DEFAULT: None

input_list

Plain Python list to compute the maximum of.

TYPE: list DEFAULT: None

RETURNS DESCRIPTION
float

The maximum value as a scalar.

RAISES DESCRIPTION
ValueError

If neither df nor input_list is provided, or if column is not found in the DataFrame.

Source code in src/choregraph/library.py
def calculate_max(df: pd.DataFrame = None, column: str = None, input_list: list = None) -> float:
    """Calculate the maximum value from a DataFrame column or a list.

    Args:
        df: Input DataFrame (mutually exclusive with ``input_list``).
        column: Column name to compute the maximum of.
        input_list: Plain Python list to compute the maximum of.

    Returns:
        The maximum value as a scalar.

    Raises:
        ValueError: If neither ``df`` nor ``input_list`` is provided, or if
            ``column`` is not found in the DataFrame.
    """
    if df is not None:
        if column not in df.columns:
            raise ValueError(f"Column '{column}' not found.")
        max_val = df[column].max()
    elif input_list is not None:
        if not input_list:
            max_val = None
        else:
            max_val = max(input_list)
    else:
        raise ValueError("Either 'df' or 'input_list' must be provided.")

    return max_val

filter_less_than

filter_less_than(df, column, value, return_mask=False)

Filter rows where column < value.

PARAMETER DESCRIPTION
df

Input DataFrame.

TYPE: DataFrame

column

Column name to compare.

TYPE: str

value

Threshold value.

TYPE: float

return_mask

If True, return a dict with both the filtered DataFrame and a boolean mask.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
Union[DataFrame, Dict[str, Any]]

Filtered DataFrame, or {"result": DataFrame, "mask": DataFrame}

Union[DataFrame, Dict[str, Any]]

when return_mask is True.

Source code in src/choregraph/library.py
def filter_less_than(df: pd.DataFrame, column: str, value: float, return_mask: bool = False) -> Union[pd.DataFrame, Dict[str, Any]]:
    """Filter rows where ``column < value``.

    Args:
        df: Input DataFrame.
        column: Column name to compare.
        value: Threshold value.
        return_mask: If True, return a dict with both the filtered DataFrame
            and a boolean mask.

    Returns:
        Filtered DataFrame, or ``{"result": DataFrame, "mask": DataFrame}``
        when *return_mask* is True.
    """
    if column not in df.columns:
        raise ValueError(f"Column '{column}' not found in DataFrame.")

    mask = df[column] < _coerce_filter_value(df, column, value)
    if return_mask:
        return {"result": df[mask].copy(), "mask": mask.to_frame()}
    return df[mask].copy()

filter_greater_than

filter_greater_than(df, column, value, return_mask=False)

Filter rows where column > value.

PARAMETER DESCRIPTION
df

Input DataFrame.

TYPE: DataFrame

column

Column name to compare.

TYPE: str

value

Threshold value.

TYPE: float

return_mask

If True, return a dict with both the filtered DataFrame and a boolean mask.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
Union[DataFrame, Dict[str, Any]]

Filtered DataFrame, or {"result": DataFrame, "mask": DataFrame}

Union[DataFrame, Dict[str, Any]]

when return_mask is True.

Source code in src/choregraph/library.py
def filter_greater_than(df: pd.DataFrame, column: str, value: float, return_mask: bool = False) -> Union[pd.DataFrame, Dict[str, Any]]:
    """Filter rows where ``column > value``.

    Args:
        df: Input DataFrame.
        column: Column name to compare.
        value: Threshold value.
        return_mask: If True, return a dict with both the filtered DataFrame
            and a boolean mask.

    Returns:
        Filtered DataFrame, or ``{"result": DataFrame, "mask": DataFrame}``
        when *return_mask* is True.
    """
    if column not in df.columns:
        raise ValueError(f"Column '{column}' not found in DataFrame.")

    mask = df[column] > _coerce_filter_value(df, column, value)
    if return_mask:
        return {"result": df[mask].copy(), "mask": mask.to_frame()}
    return df[mask].copy()

filter_in_range

filter_in_range(df, column, min_value, max_value, return_mask=False)

Filter rows where min_value <= column <= max_value.

PARAMETER DESCRIPTION
df

Input DataFrame.

TYPE: DataFrame

column

Column name to compare.

TYPE: str

min_value

Lower bound of the range (inclusive).

TYPE: float

max_value

Upper bound of the range (inclusive).

TYPE: float

return_mask

If True, return a dict with both the filtered DataFrame and a boolean mask.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
Union[DataFrame, Dict[str, Any]]

Filtered DataFrame, or {"result": DataFrame, "mask": DataFrame}

Union[DataFrame, Dict[str, Any]]

when return_mask is True.

Source code in src/choregraph/library.py
def filter_in_range(df: pd.DataFrame, column: str, min_value: float, max_value: float, return_mask: bool = False) -> Union[pd.DataFrame, Dict[str, Any]]:
    """Filter rows where ``min_value <= column <= max_value``.

    Args:
        df: Input DataFrame.
        column: Column name to compare.
        min_value: Lower bound of the range (inclusive).
        max_value: Upper bound of the range (inclusive).
        return_mask: If True, return a dict with both the filtered DataFrame
            and a boolean mask.

    Returns:
        Filtered DataFrame, or ``{"result": DataFrame, "mask": DataFrame}``
        when *return_mask* is True.
    """
    if column not in df.columns:
        raise ValueError(f"Column '{column}' not found in DataFrame.")

    mask = (df[column] >= _coerce_filter_value(df, column, min_value)) & (df[column] <= _coerce_filter_value(df, column, max_value))
    if return_mask:
        return {"result": df[mask].copy(), "mask": mask.to_frame()}
    return df[mask].copy()

filter_equal

filter_equal(df, column, value, return_mask=False)

Filter rows where column == value.

Works with both numeric and string columns. Numeric conversion is attempted automatically when the column dtype is numeric.

PARAMETER DESCRIPTION
df

Input DataFrame.

TYPE: DataFrame

column

Column name to compare.

TYPE: str

value

Value to match (string; auto-converted for numeric columns).

TYPE: str

return_mask

If True, return a dict with both the filtered DataFrame and a boolean mask.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
Union[DataFrame, Dict[str, Any]]

Filtered DataFrame, or {"result": DataFrame, "mask": DataFrame}

Union[DataFrame, Dict[str, Any]]

when return_mask is True.

Source code in src/choregraph/library.py
def filter_equal(df: pd.DataFrame, column: str, value: str, return_mask: bool = False) -> Union[pd.DataFrame, Dict[str, Any]]:
    """Filter rows where ``column == value``.

    Works with both numeric and string columns. Numeric conversion is
    attempted automatically when the column dtype is numeric.

    Args:
        df: Input DataFrame.
        column: Column name to compare.
        value: Value to match (string; auto-converted for numeric columns).
        return_mask: If True, return a dict with both the filtered DataFrame
            and a boolean mask.

    Returns:
        Filtered DataFrame, or ``{"result": DataFrame, "mask": DataFrame}``
        when *return_mask* is True.
    """
    if column not in df.columns:
        raise ValueError(f"Column '{column}' not found in DataFrame.")

    # Try to convert value to match column dtype
    col_dtype = df[column].dtype
    try:
        if pd.api.types.is_numeric_dtype(col_dtype):
            value = float(value)
    except (ValueError, TypeError):
        pass  # Keep as string

    mask = df[column] == value
    if return_mask:
        return {"result": df[mask].copy(), "mask": mask.to_frame()}
    return df[mask].copy()

filter_not_equal

filter_not_equal(df, column, value, return_mask=False)

Filter rows where column != value.

Works with both numeric and string columns. Numeric conversion is attempted automatically when the column dtype is numeric.

PARAMETER DESCRIPTION
df

Input DataFrame.

TYPE: DataFrame

column

Column name to compare.

TYPE: str

value

Value to exclude (string; auto-converted for numeric columns).

TYPE: str

return_mask

If True, return a dict with both the filtered DataFrame and a boolean mask.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
Union[DataFrame, Dict[str, Any]]

Filtered DataFrame, or {"result": DataFrame, "mask": DataFrame}

Union[DataFrame, Dict[str, Any]]

when return_mask is True.

Source code in src/choregraph/library.py
def filter_not_equal(df: pd.DataFrame, column: str, value: str, return_mask: bool = False) -> Union[pd.DataFrame, Dict[str, Any]]:
    """Filter rows where ``column != value``.

    Works with both numeric and string columns. Numeric conversion is
    attempted automatically when the column dtype is numeric.

    Args:
        df: Input DataFrame.
        column: Column name to compare.
        value: Value to exclude (string; auto-converted for numeric columns).
        return_mask: If True, return a dict with both the filtered DataFrame
            and a boolean mask.

    Returns:
        Filtered DataFrame, or ``{"result": DataFrame, "mask": DataFrame}``
        when *return_mask* is True.
    """
    if column not in df.columns:
        raise ValueError(f"Column '{column}' not found in DataFrame.")

    # Try to convert value to match column dtype
    col_dtype = df[column].dtype
    try:
        if pd.api.types.is_numeric_dtype(col_dtype):
            value = float(value)
    except (ValueError, TypeError):
        pass  # Keep as string

    mask = df[column] != value
    if return_mask:
        return {"result": df[mask].copy(), "mask": mask.to_frame()}
    return df[mask].copy()

get_top_n

get_top_n(df, column, n, return_mask=False)

Return the top n rows by column value (descending).

PARAMETER DESCRIPTION
df

Input DataFrame.

TYPE: DataFrame

column

Column name to rank by.

TYPE: str

n

Number of rows to keep.

TYPE: int

return_mask

If True, return a dict with the result and a boolean mask.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
Union[DataFrame, Dict[str, Any]]

DataFrame with the top n rows, or {"result": DataFrame, "mask": DataFrame}.

Source code in src/choregraph/library.py
def get_top_n(df: pd.DataFrame, column: str, n: int, return_mask: bool = False) -> Union[pd.DataFrame, Dict[str, Any]]:
    """Return the top *n* rows by column value (descending).

    Args:
        df: Input DataFrame.
        column: Column name to rank by.
        n: Number of rows to keep.
        return_mask: If True, return a dict with the result and a boolean mask.

    Returns:
        DataFrame with the top *n* rows, or ``{"result": DataFrame, "mask": DataFrame}``.
    """
    if column not in df.columns:
        raise ValueError(f"Column '{column}' not found in DataFrame.")

    top_indices = df[column].nlargest(int(n)).index

    if return_mask:
        return {"result": df.loc[top_indices].copy(), "mask": pd.Series(df.index.isin(top_indices), index=df.index).to_frame()}

    return df.loc[top_indices].copy()

get_top_percentage

get_top_percentage(df, column, fraction, return_mask=False)

Return the top fraction of rows by column value (descending).

PARAMETER DESCRIPTION
df

Input DataFrame.

TYPE: DataFrame

column

Column name to rank by.

TYPE: str

fraction

Fraction of rows to keep (0.0–1.0).

TYPE: float

return_mask

If True, return a dict with the result and a boolean mask.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
Union[DataFrame, Dict[str, Any]]

DataFrame with the top rows, or {"result": DataFrame, "mask": DataFrame}.

Source code in src/choregraph/library.py
def get_top_percentage(df: pd.DataFrame, column: str, fraction: float, return_mask: bool = False) -> Union[pd.DataFrame, Dict[str, Any]]:
    """Return the top fraction of rows by column value (descending).

    Args:
        df: Input DataFrame.
        column: Column name to rank by.
        fraction: Fraction of rows to keep (0.0–1.0).
        return_mask: If True, return a dict with the result and a boolean mask.

    Returns:
        DataFrame with the top rows, or ``{"result": DataFrame, "mask": DataFrame}``.
    """
    if column not in df.columns:
        raise ValueError(f"Column '{column}' not found in DataFrame.")
    fraction = float(fraction)
    n = fraction * len(df)
    n = max(1, n) if fraction > 0 else 0
    top_indices = df[column].nlargest(int(n)).index

    if return_mask:
        return {"result": df.loc[top_indices].copy(), "mask": pd.Series(df.index.isin(top_indices), index=df.index).to_frame()}

    return df.loc[top_indices].copy()

get_bottom_n

get_bottom_n(df, column, n, return_mask=False)

Return the bottom n rows by column value (ascending).

PARAMETER DESCRIPTION
df

Input DataFrame.

TYPE: DataFrame

column

Column name to rank by.

TYPE: str

n

Number of rows to keep.

TYPE: int

return_mask

If True, return a dict with the result and a boolean mask.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
Union[DataFrame, Dict[str, Any]]

DataFrame with the bottom n rows, or {"result": DataFrame, "mask": DataFrame}.

Source code in src/choregraph/library.py
def get_bottom_n(df: pd.DataFrame, column: str, n: int, return_mask: bool = False) -> Union[pd.DataFrame, Dict[str, Any]]:
    """Return the bottom *n* rows by column value (ascending).

    Args:
        df: Input DataFrame.
        column: Column name to rank by.
        n: Number of rows to keep.
        return_mask: If True, return a dict with the result and a boolean mask.

    Returns:
        DataFrame with the bottom *n* rows, or ``{"result": DataFrame, "mask": DataFrame}``.
    """
    if column not in df.columns:
        raise ValueError(f"Column '{column}' not found in DataFrame.")

    bottom_indices = df[column].nsmallest(int(n)).index

    if return_mask:
        return {"result": df.loc[bottom_indices].copy(), "mask": pd.Series(df.index.isin(bottom_indices), index=df.index).to_frame()}

    return df.loc[bottom_indices].copy()

get_bottom_percentage

get_bottom_percentage(df, column, fraction, return_mask=False)

Return the bottom fraction of rows by column value (ascending).

PARAMETER DESCRIPTION
df

Input DataFrame.

TYPE: DataFrame

column

Column name to rank by.

TYPE: str

fraction

Fraction of rows to keep (0.0–1.0).

TYPE: float

return_mask

If True, return a dict with the result and a boolean mask.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
Union[DataFrame, Dict[str, Any]]

DataFrame with the bottom rows, or {"result": DataFrame, "mask": DataFrame}.

Source code in src/choregraph/library.py
def get_bottom_percentage(df: pd.DataFrame, column: str, fraction: float, return_mask: bool = False) -> Union[pd.DataFrame, Dict[str, Any]]:
    """Return the bottom fraction of rows by column value (ascending).

    Args:
        df: Input DataFrame.
        column: Column name to rank by.
        fraction: Fraction of rows to keep (0.0–1.0).
        return_mask: If True, return a dict with the result and a boolean mask.

    Returns:
        DataFrame with the bottom rows, or ``{"result": DataFrame, "mask": DataFrame}``.
    """
    if column not in df.columns:
        raise ValueError(f"Column '{column}' not found in DataFrame.")

    fraction = float(fraction)
    n = fraction * len(df)
    n = max(1, n) if fraction > 0 else 0
    bottom_indices = df[column].nsmallest(int(n)).index

    if return_mask:
        return {"result": df.loc[bottom_indices].copy(), "mask": pd.Series(df.index.isin(bottom_indices), index=df.index).to_frame()}

    return df.loc[bottom_indices].copy()

aggregate_mean

aggregate_mean(df, group_columns=None, suffix=None)

Calculates the mean of all numeric columns, optionally grouped.

PARAMETER DESCRIPTION
df

Input DataFrame

TYPE: DataFrame

group_columns

Optional column(s) to group by

TYPE: Union[list, str] DEFAULT: None

suffix

Optional suffix to add to the aggregated column names

TYPE: str DEFAULT: None

RETURNS DESCRIPTION
DataFrame

Aggregated DataFrame with mean values per group (or a single-row

DataFrame

DataFrame if ungrouped).

Source code in src/choregraph/library.py
def aggregate_mean(df: pd.DataFrame, group_columns: Union[list, str] = None, suffix: str = None) -> pd.DataFrame:
    """
    Calculates the mean of all numeric columns, optionally grouped.

    Args:
        df: Input DataFrame
        group_columns: Optional column(s) to group by
        suffix: Optional suffix to add to the aggregated column names

    Returns:
        Aggregated DataFrame with mean values per group (or a single-row
        DataFrame if ungrouped).
    """
    if group_columns:
        if isinstance(group_columns, str):
            group_columns = [group_columns]

        missing_cols = [col for col in group_columns if col not in df.columns]
        if missing_cols:
            raise ValueError(f"Group columns {missing_cols} not found in DataFrame.")

        result = df.groupby(group_columns).mean(numeric_only=True).reset_index()
    else:
        result = df.mean(numeric_only=True).to_frame().T

    if suffix:
        # Identify numeric columns (the ones that were aggregated)
        # In the grouped case, they are everything except group_columns
        # In the non-grouped case, they are everything
        agg_cols = [c for c in result.columns if group_columns is None or c not in group_columns]
        rename_map = {c: f"{c}{suffix}" for c in agg_cols}
        result = result.rename(columns=rename_map)

    return result

aggregate_count

aggregate_count(df, group_columns=None)

Returns the number of rows, optionally grouped. Only returns the grouping columns and a 'count' column.

PARAMETER DESCRIPTION
df

Input DataFrame

TYPE: DataFrame

group_columns

Optional column(s) to group by

TYPE: Union[list, str] DEFAULT: None

RETURNS DESCRIPTION
DataFrame

DataFrame with grouping columns and a count column (or a single-row

DataFrame

DataFrame with the total row count if ungrouped).

Source code in src/choregraph/library.py
def aggregate_count(df: pd.DataFrame, group_columns: Union[list, str] = None) -> pd.DataFrame:
    """
    Returns the number of rows, optionally grouped.
    Only returns the grouping columns and a 'count' column.

    Args:
        df: Input DataFrame
        group_columns: Optional column(s) to group by

    Returns:
        DataFrame with grouping columns and a ``count`` column (or a single-row
        DataFrame with the total row count if ungrouped).
    """
    if group_columns:
        if isinstance(group_columns, str):
            group_columns = [group_columns]

        missing_cols = [col for col in group_columns if col not in df.columns]
        if missing_cols:
            raise ValueError(f"Group columns {missing_cols} not found in DataFrame.")

        result = df.groupby(group_columns).size().reset_index(name='count')
        return result

    return pd.DataFrame({'count': [len(df)]})

aggregate_sum

aggregate_sum(df, group_columns=None, suffix=None)

Calculates the sum of all numeric columns, optionally grouped.

PARAMETER DESCRIPTION
df

Input DataFrame

TYPE: DataFrame

group_columns

Optional column(s) to group by

TYPE: Union[list, str] DEFAULT: None

suffix

Optional suffix to add to the aggregated column names

TYPE: str DEFAULT: None

RETURNS DESCRIPTION
DataFrame

Aggregated DataFrame with summed values per group (or a single-row

DataFrame

DataFrame if ungrouped).

Source code in src/choregraph/library.py
def aggregate_sum(df: pd.DataFrame, group_columns: Union[list, str] = None, suffix: str = None) -> pd.DataFrame:
    """
    Calculates the sum of all numeric columns, optionally grouped.

    Args:
        df: Input DataFrame
        group_columns: Optional column(s) to group by
        suffix: Optional suffix to add to the aggregated column names

    Returns:
        Aggregated DataFrame with summed values per group (or a single-row
        DataFrame if ungrouped).
    """
    if group_columns:
        if isinstance(group_columns, str):
            group_columns = [group_columns]

        missing_cols = [col for col in group_columns if col not in df.columns]
        if missing_cols:
            raise ValueError(f"Group columns {missing_cols} not found in DataFrame.")

        result = df.groupby(group_columns).sum(numeric_only=True).reset_index()
    else:
        result = df.sum(numeric_only=True).to_frame().T

    if suffix:
        agg_cols = [c for c in result.columns if group_columns is None or c not in group_columns]
        rename_map = {c: f"{c}{suffix}" for c in agg_cols}
        result = result.rename(columns=rename_map)

    return result

aggregate_median

aggregate_median(df, group_columns=None, suffix=None)

Calculates the median of all numeric columns, optionally grouped.

PARAMETER DESCRIPTION
df

Input DataFrame

TYPE: DataFrame

group_columns

Optional column(s) to group by

TYPE: Union[list, str] DEFAULT: None

suffix

Optional suffix to add to the aggregated column names

TYPE: str DEFAULT: None

RETURNS DESCRIPTION
DataFrame

Aggregated DataFrame with median values per group (or a single-row

DataFrame

DataFrame if ungrouped).

Source code in src/choregraph/library.py
def aggregate_median(df: pd.DataFrame, group_columns: Union[list, str] = None, suffix: str = None) -> pd.DataFrame:
    """
    Calculates the median of all numeric columns, optionally grouped.

    Args:
        df: Input DataFrame
        group_columns: Optional column(s) to group by
        suffix: Optional suffix to add to the aggregated column names

    Returns:
        Aggregated DataFrame with median values per group (or a single-row
        DataFrame if ungrouped).
    """
    if group_columns:
        if isinstance(group_columns, str):
            group_columns = [group_columns]

        missing_cols = [col for col in group_columns if col not in df.columns]
        if missing_cols:
            raise ValueError(f"Group columns {missing_cols} not found in DataFrame.")

        result = df.groupby(group_columns).median(numeric_only=True).reset_index()
    else:
        result = df.median(numeric_only=True).to_frame().T

    if suffix:
        agg_cols = [c for c in result.columns if group_columns is None or c not in group_columns]
        rename_map = {c: f"{c}{suffix}" for c in agg_cols}
        result = result.rename(columns=rename_map)

    return result

hierarchical_rollup

hierarchical_rollup(df, path_columns=None, value_column=None, root_label='Total')

Transform tabular data into hierarchical parent-child-value long format.

Takes N hierarchical columns (broadest to most specific) and produces a DataFrame with path-based ids, parent references, and aggregated values. Supports arbitrary hierarchy depth.

A synthetic root node (root_label) is always prepended so that the output has a single root — required by Plotly Treemap / Sunburst.

All numeric columns (except path_columns) are automatically summed at each hierarchy level and preserved in the output alongside a count column. This allows downstream channels (e.g. color) to reference any aggregated numeric variable.

The output serves both Partition (Treemap/Sunburst) and Flow (Sankey) marks: - Partition reads: ids=id, labels=last_part(id), parents=parent, values=value - Flow reads: source=parent, target=id, value=value (skip root rows)

PARAMETER DESCRIPTION
df

Input DataFrame with hierarchical columns.

TYPE: DataFrame

path_columns

Ordered list of column names defining hierarchy levels (broadest to most specific). e.g. ["continent", "country", "city"]. Also accepts a comma-separated string.

TYPE: Union[list, str] DEFAULT: None

value_column

Column to aggregate as the primary value (sum). If None, counts rows.

TYPE: str DEFAULT: None

root_label

Label for the synthetic root node (default "Total").

TYPE: str DEFAULT: 'Total'

RETURNS DESCRIPTION
DataFrame

DataFrame with columns: target, source, value, count, and one column

DataFrame

per extra numeric field (summed). target is the node's own path-

DataFrame

based identifier; source is the parent's identifier (empty for

DataFrame

the synthetic root). The (source, target, value) triple is the shape

DataFrame

that sankey/chord flow marks consume directly.

Source code in src/choregraph/library.py
def hierarchical_rollup(df: pd.DataFrame, path_columns: Union[list, str] = None,
                        value_column: str = None,
                        root_label: str = "Total") -> pd.DataFrame:
    """Transform tabular data into hierarchical parent-child-value long format.

    Takes N hierarchical columns (broadest to most specific) and produces a
    DataFrame with path-based ids, parent references, and aggregated values.
    Supports arbitrary hierarchy depth.

    A synthetic root node (``root_label``) is always prepended so that the
    output has a single root — required by Plotly Treemap / Sunburst.

    All numeric columns (except path_columns) are automatically summed at each
    hierarchy level and preserved in the output alongside a ``count`` column.
    This allows downstream channels (e.g. color) to reference any aggregated
    numeric variable.

    The output serves both Partition (Treemap/Sunburst) and Flow (Sankey) marks:
    - Partition reads: ids=id, labels=last_part(id), parents=parent, values=value
    - Flow reads: source=parent, target=id, value=value (skip root rows)

    Args:
        df: Input DataFrame with hierarchical columns.
        path_columns: Ordered list of column names defining hierarchy levels
            (broadest to most specific). e.g. ["continent", "country", "city"].
            Also accepts a comma-separated string.
        value_column: Column to aggregate as the primary ``value`` (sum).
            If None, counts rows.
        root_label: Label for the synthetic root node (default ``"Total"``).

    Returns:
        DataFrame with columns: target, source, value, count, and one column
        per extra numeric field (summed). ``target`` is the node's own path-
        based identifier; ``source`` is the parent's identifier (empty for
        the synthetic root). The (source, target, value) triple is the shape
        that sankey/chord flow marks consume directly.
    """
    if isinstance(path_columns, str):
        path_columns = [c.strip() for c in path_columns.split(',')]

    if not path_columns:
        raise ValueError("path_columns must be a non-empty list of column names.")

    missing = [c for c in path_columns if c not in df.columns]
    if missing:
        raise ValueError(f"Columns {missing} not found in DataFrame.")

    use_count = value_column is None or value_column not in df.columns

    # Identify all numeric columns that are not part of the hierarchy
    numeric_cols = [
        c for c in df.select_dtypes(include="number").columns
        if c not in path_columns
    ]

    # Build rows as dicts for easy column accumulation
    rows = []
    seen = set()

    for depth in range(len(path_columns)):
        cols = path_columns[:depth + 1]

        # Aggregate: count + sum of all numeric columns at this grouping level
        grouped = df.groupby(cols, observed=True)
        counts = grouped.size().reset_index(name='count')

        if numeric_cols:
            sums = grouped[numeric_cols].sum().reset_index()
            merged = counts.merge(sums, on=cols)
        else:
            merged = counts

        for _, row in merged.iterrows():
            path_parts = [str(row[c]) for c in cols]
            node_id = "/".join(path_parts)

            if node_id in seen:
                continue
            seen.add(node_id)

            parent_id = "/".join(path_parts[:-1]) if depth > 0 else root_label

            node = {
                "target": node_id,
                "source": parent_id,
                "count": int(row["count"]),
            }

            if use_count:
                node["value"] = int(row["count"])
            else:
                node["value"] = row[value_column]

            for nc in numeric_cols:
                col_name = f"{nc}_sum" if nc != value_column else nc
                node[col_name] = row[nc]

            rows.append(node)

    # Prepend synthetic root — single parent for all top-level nodes
    top_level = [r for r in rows if r["source"] == root_label]
    root_row = {
        "target": root_label,
        "source": "",
        "count": sum(r["count"] for r in top_level),
        "value": sum(r["value"] for r in top_level),
    }
    # Sum extra numeric columns for root
    extra_cols = [k for k in rows[0] if k not in ("target", "source", "count", "value")] if rows else []
    for ec in extra_cols:
        root_row[ec] = sum(r[ec] for r in top_level)

    return pd.DataFrame([root_row] + rows)

add_label

add_label(df, label, value)

Add a new column with a constant value. Args: df: Input DataFrame label: Name of the new column to add value: Value to fill in the new column (can be any scalar or object) Returns: DataFrame with the new column added.

Source code in src/choregraph/library.py
def add_label(df: pd.DataFrame, label: str, value: Any) -> pd.DataFrame:
    """ Add a new column with a constant value. Args: df: Input DataFrame label: Name of the new column to add value: Value to fill in the new column (can be any scalar or object) Returns: DataFrame with the new column added. """ 
    if label in df.columns: raise ValueError(f"Column '{label}' already exists in DataFrame.") 
    df = df.copy() 
    df[label] = value 
    return df

select_columns

select_columns(df, columns)

Extract/select only the specified columns from the DataFrame.

PARAMETER DESCRIPTION
df

Input DataFrame

TYPE: DataFrame

columns

Column name(s) to keep. Can be a single string or a list.

TYPE: Union[list, str]

RETURNS DESCRIPTION
DataFrame

DataFrame with only the specified columns.

Source code in src/choregraph/library.py
def select_columns(df: pd.DataFrame, columns: Union[list, str]) -> pd.DataFrame:
    """
    Extract/select only the specified columns from the DataFrame.

    Args:
        df: Input DataFrame
        columns: Column name(s) to keep. Can be a single string or a list.

    Returns:
        DataFrame with only the specified columns.
    """
    if isinstance(columns, str):
        columns = [columns]

    missing = [c for c in columns if c not in df.columns]
    if missing:
        raise ValueError(f"Columns not found: {missing}. Available: {list(df.columns)}")

    return df[columns].copy()

drop_columns

drop_columns(df, columns)

Remove the specified columns from the DataFrame.

PARAMETER DESCRIPTION
df

Input DataFrame

TYPE: DataFrame

columns

Column name(s) to drop. Can be a single string or a list.

TYPE: Union[list, str]

RETURNS DESCRIPTION
DataFrame

DataFrame without the specified columns.

Source code in src/choregraph/library.py
def drop_columns(df: pd.DataFrame, columns: Union[list, str]) -> pd.DataFrame:
    """
    Remove the specified columns from the DataFrame.

    Args:
        df: Input DataFrame
        columns: Column name(s) to drop. Can be a single string or a list.

    Returns:
        DataFrame without the specified columns.
    """
    if isinstance(columns, str):
        columns = [columns]

    missing = [c for c in columns if c not in df.columns]
    if missing:
        raise ValueError(f"Columns not found: {missing}. Available: {list(df.columns)}")

    return df.drop(columns=columns).copy()

rename_column

rename_column(df, old_name, new_name)

Rename a column in the DataFrame.

PARAMETER DESCRIPTION
df

Input DataFrame

TYPE: DataFrame

old_name

Current column name

TYPE: str

new_name

New column name

TYPE: str

RETURNS DESCRIPTION
DataFrame

DataFrame with the column renamed.

Source code in src/choregraph/library.py
def rename_column(df: pd.DataFrame, old_name: str, new_name: str) -> pd.DataFrame:
    """
    Rename a column in the DataFrame.

    Args:
        df: Input DataFrame
        old_name: Current column name
        new_name: New column name

    Returns:
        DataFrame with the column renamed.
    """
    if old_name not in df.columns:
        raise ValueError(f"Column '{old_name}' not found. Available: {list(df.columns)}")

    return df.rename(columns={old_name: new_name}).copy()

count_rows

count_rows(df)

Return the total number of rows in the DataFrame.

PARAMETER DESCRIPTION
df

Input DataFrame.

TYPE: DataFrame

RETURNS DESCRIPTION
int

Row count as an integer scalar.

Source code in src/choregraph/library.py
def count_rows(df: pd.DataFrame) -> int:
    """Return the total number of rows in the DataFrame.

    Args:
        df: Input DataFrame.

    Returns:
        Row count as an integer scalar.
    """
    return len(df)

slice_rows

slice_rows(df, start=None, stop=None)

Keep only a specific range of rows by positional index.

PARAMETER DESCRIPTION
df

Input DataFrame.

TYPE: DataFrame

start

Start index (inclusive). None means from the beginning.

TYPE: int DEFAULT: None

stop

Stop index (exclusive). None means to the end.

TYPE: int DEFAULT: None

RETURNS DESCRIPTION
DataFrame

Sliced DataFrame.

Source code in src/choregraph/library.py
def slice_rows(df: pd.DataFrame, start: int = None, stop: int = None) -> pd.DataFrame:
    """Keep only a specific range of rows by positional index.

    Args:
        df: Input DataFrame.
        start: Start index (inclusive). None means from the beginning.
        stop: Stop index (exclusive). None means to the end.

    Returns:
        Sliced DataFrame.
    """
    return df.iloc[start:stop].copy()

sort_values

sort_values(df, columns, ascending=True)

Sort the DataFrame by one or more columns.

PARAMETER DESCRIPTION
df

Input DataFrame.

TYPE: DataFrame

columns

Column name(s) to sort by.

TYPE: Union[list, str]

ascending

Sort order. True for ascending, False for descending.

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
DataFrame

Sorted DataFrame.

Source code in src/choregraph/library.py
def sort_values(df: pd.DataFrame, columns: Union[list, str], ascending: bool = True) -> pd.DataFrame:
    """Sort the DataFrame by one or more columns.

    Args:
        df: Input DataFrame.
        columns: Column name(s) to sort by.
        ascending: Sort order. True for ascending, False for descending.

    Returns:
        Sorted DataFrame.
    """
    if isinstance(columns, str):
        columns = [columns]

    missing = [c for c in columns if c not in df.columns]
    if missing:
        raise ValueError(f"Sort columns not found: {missing}")

    return df.sort_values(by=columns, ascending=ascending).copy()

sample_rows

sample_rows(df, n=None, fraction=None, seed=None)

Take a random sample of rows from the DataFrame.

PARAMETER DESCRIPTION
df

Input DataFrame.

TYPE: DataFrame

n

Exact number of rows to sample (mutually exclusive with fraction).

TYPE: int DEFAULT: None

fraction

Fraction of rows to sample (0.0–1.0).

TYPE: float DEFAULT: None

seed

Random seed for reproducibility.

TYPE: int DEFAULT: None

RETURNS DESCRIPTION
DataFrame

Sampled DataFrame.

Source code in src/choregraph/library.py
def sample_rows(df: pd.DataFrame, n: int = None, fraction: float = None, seed: int = None) -> pd.DataFrame:
    """Take a random sample of rows from the DataFrame.

    Args:
        df: Input DataFrame.
        n: Exact number of rows to sample (mutually exclusive with *fraction*).
        fraction: Fraction of rows to sample (0.0–1.0).
        seed: Random seed for reproducibility.

    Returns:
        Sampled DataFrame.
    """
    return df.sample(n=n, frac=fraction, random_state=seed).copy()

calc_distance

calc_distance(df, x_col, y_col, ref_x, ref_y, target_col='distance')

Calculate Euclidean distance from a reference point.

Adds a new column with the distance from (ref_x, ref_y) to each row's (x_col, y_col) values.

PARAMETER DESCRIPTION
df

Input DataFrame.

TYPE: DataFrame

x_col

Column containing X coordinates.

TYPE: str

y_col

Column containing Y coordinates.

TYPE: str

ref_x

Reference X coordinate.

TYPE: float

ref_y

Reference Y coordinate.

TYPE: float

target_col

Name of the new distance column.

TYPE: str DEFAULT: 'distance'

RETURNS DESCRIPTION
DataFrame

DataFrame with the distance column added.

Source code in src/choregraph/library.py
def calc_distance(df: pd.DataFrame, x_col: str, y_col: str, ref_x: float, ref_y: float, target_col: str = "distance") -> pd.DataFrame:
    """Calculate Euclidean distance from a reference point.

    Adds a new column with the distance from ``(ref_x, ref_y)`` to each row's
    ``(x_col, y_col)`` values.

    Args:
        df: Input DataFrame.
        x_col: Column containing X coordinates.
        y_col: Column containing Y coordinates.
        ref_x: Reference X coordinate.
        ref_y: Reference Y coordinate.
        target_col: Name of the new distance column.

    Returns:
        DataFrame with the distance column added.
    """
    if x_col not in df.columns or y_col not in df.columns:
        raise ValueError(f"Columns '{x_col}' and/or '{y_col}' not found in DataFrame.")

    df = df.copy()
    df[target_col] = np.sqrt((df[x_col] - ref_x)**2 + (df[y_col] - ref_y)**2)
    return df

calc_ratio

calc_ratio(df, numerator_col, denominator_col)

Calculates the ratio between two columns in the same DataFrame. Creates a new column named 'ratio' containing numerator_col / denominator_col.

PARAMETER DESCRIPTION
df

Input DataFrame

TYPE: DataFrame

numerator_col

Column name for the numerator

TYPE: str

denominator_col

Column name for the denominator

TYPE: str

RETURNS DESCRIPTION
DataFrame

DataFrame with a new 'ratio' column added.

Source code in src/choregraph/library.py
def calc_ratio(df: pd.DataFrame, numerator_col: str, denominator_col: str) -> pd.DataFrame:
    """
    Calculates the ratio between two columns in the same DataFrame.
    Creates a new column named 'ratio' containing numerator_col / denominator_col.

    Args:
        df: Input DataFrame
        numerator_col: Column name for the numerator
        denominator_col: Column name for the denominator

    Returns:
        DataFrame with a new 'ratio' column added.
    """
    if numerator_col not in df.columns:
        raise ValueError(f"Numerator column '{numerator_col}' not found in DataFrame.")
    if denominator_col not in df.columns:
        raise ValueError(f"Denominator column '{denominator_col}' not found in DataFrame.")

    df = df.copy()
    df["ratio"] = df[numerator_col] / df[denominator_col]
    return df

join

join(dfs=None, on=None, how='inner', **kwargs)

Join multiple DataFrames on a common key.

Collects inputs from dfs (list or single DataFrame) and any DataFrames passed as keyword arguments (named ports from the pipeline). When column name conflicts occur, columns are suffixed with the source name (the kwargs key from the pipeline) instead of generic _left / _right.

PARAMETER DESCRIPTION
dfs

Primary DataFrame(s) to join.

TYPE: Union[list, DataFrame] DEFAULT: None

on

Column name(s) to join on.

TYPE: str DEFAULT: None

how

Join type — 'inner', 'left', 'right', or 'outer'.

TYPE: str DEFAULT: 'inner'

**kwargs

Additional DataFrames passed by name.

DEFAULT: {}

RETURNS DESCRIPTION
DataFrame

Merged DataFrame.

Source code in src/choregraph/library.py
def join(dfs: Union[list, pd.DataFrame] = None, on: str = None, how: str = 'inner', **kwargs) -> pd.DataFrame:
    """Join multiple DataFrames on a common key.

    Collects inputs from *dfs* (list or single DataFrame) and any DataFrames
    passed as keyword arguments (named ports from the pipeline).  When column
    name conflicts occur, columns are suffixed with the source name (the
    kwargs key from the pipeline) instead of generic ``_left`` / ``_right``.

    Args:
        dfs: Primary DataFrame(s) to join.
        on: Column name(s) to join on.
        how: Join type — ``'inner'``, ``'left'``, ``'right'``, or ``'outer'``.
        **kwargs: Additional DataFrames passed by name.

    Returns:
        Merged DataFrame.
    """
    from collections import Counter

    all_dfs = []
    names = []

    # Collect from positional/main argument
    if dfs is not None:
        if isinstance(dfs, list):
            all_dfs.extend(dfs)
            names.extend([f"table_{i}" for i in range(len(dfs))])
        elif isinstance(dfs, pd.DataFrame):
            all_dfs.append(dfs)
            names.append("dfs")

    # Collect any DataFrames passed via keyword arguments (named ports)
    for k, v in kwargs.items():
        if isinstance(v, pd.DataFrame):
            all_dfs.append(v)
            names.append(k)

    if not all_dfs:
        raise ValueError("No DataFrames provided for join.")

    if len(all_dfs) == 1:
        return all_dfs[0].copy()

    # Align merge-key dtypes to avoid Int64/object mismatches
    if on is not None:
        key_cols = on if isinstance(on, list) else [on]
        for col in key_cols:
            dtypes = {df[col].dtype for df in all_dfs if col in df.columns}
            if len(dtypes) > 1:
                for df in all_dfs:
                    if col in df.columns:
                        df[col] = df[col].astype(str)

    # Identify non-key columns that appear in multiple DataFrames
    if on is not None:
        key_col_set = set(on if isinstance(on, list) else [on])
    else:
        # When on=None, pandas merges on all common columns — those are the keys
        key_col_set = set(all_dfs[0].columns)
        for df in all_dfs[1:]:
            key_col_set &= set(df.columns)

    col_counts = Counter()
    for df in all_dfs:
        col_counts.update(set(df.columns) - key_col_set)
    conflicting = {col for col, count in col_counts.items() if count > 1}

    # Pre-rename conflicting columns with source names so all merges are clean
    if conflicting:
        renamed_dfs = []
        for df, name in zip(all_dfs, names):
            rename_map = {col: f"{col}_{name}" for col in conflicting if col in df.columns}
            renamed_dfs.append(df.rename(columns=rename_map) if rename_map else df.copy())
    else:
        renamed_dfs = [df.copy() for df in all_dfs]

    result = renamed_dfs[0]
    for i in range(1, len(renamed_dfs)):
        result = pd.merge(result, renamed_dfs[i], on=on, how=how)

    return result

union

union(dfs=None, ignore_index=True, **kwargs)

Vertically stack (union) multiple DataFrames.

Collects inputs from dfs (list or single DataFrame) and any DataFrames passed as keyword arguments.

PARAMETER DESCRIPTION
dfs

Primary DataFrame(s) to concatenate.

TYPE: Union[list, DataFrame] DEFAULT: None

ignore_index

If True, reset the index in the result.

TYPE: bool DEFAULT: True

**kwargs

Additional DataFrames passed by name.

DEFAULT: {}

RETURNS DESCRIPTION
DataFrame

Concatenated DataFrame.

Source code in src/choregraph/library.py
def union(dfs: Union[list, pd.DataFrame] = None, ignore_index: bool = True, **kwargs) -> pd.DataFrame:
    """Vertically stack (union) multiple DataFrames.

    Collects inputs from *dfs* (list or single DataFrame) and any DataFrames
    passed as keyword arguments.

    Args:
        dfs: Primary DataFrame(s) to concatenate.
        ignore_index: If True, reset the index in the result.
        **kwargs: Additional DataFrames passed by name.

    Returns:
        Concatenated DataFrame.
    """
    all_dfs = []

    # Collect from positional/main argument
    if dfs is not None:
        if isinstance(dfs, list):
            all_dfs.extend(dfs)
        elif isinstance(dfs, pd.DataFrame):
            all_dfs.append(dfs)

    # Collect any DataFrames passed via keyword arguments (named ports)
    for v in kwargs.values():
        if isinstance(v, pd.DataFrame):
            all_dfs.append(v)

    if not all_dfs:
        return pd.DataFrame()

    return pd.concat(all_dfs, ignore_index=ignore_index)

melt

melt(df, id_columns=None, value_columns=None, var_name='variable', value_name='value')

Unpivot a wide DataFrame into long format.

Converts columns into rows, turning a wide table (one column per metric) into a long table with a variable column and a value column.

PARAMETER DESCRIPTION
df

Input DataFrame in wide format.

TYPE: DataFrame

id_columns

Column(s) to keep as identifiers (not melted). Accepts a single string or a list. If None, all non-value columns are used.

TYPE: Union[list, str] DEFAULT: None

value_columns

Column(s) to unpivot. Accepts a single string or a list. If None, all columns not in id_columns are melted.

TYPE: Union[list, str] DEFAULT: None

var_name

Name for the new column holding the former column headers.

TYPE: str DEFAULT: 'variable'

value_name

Name for the new column holding the values.

TYPE: str DEFAULT: 'value'

RETURNS DESCRIPTION
DataFrame

Long-format DataFrame.

Examples:

Wide input::

| date    | price_cape | price_panama |
| 2024-01 | 100        | 200          |

melt(df, id_columns="date", var_name="source", value_name="price")::

| date    | source       | price |
| 2024-01 | price_cape   | 100   |
| 2024-01 | price_panama | 200   |
Source code in src/choregraph/library.py
def melt(df: pd.DataFrame, id_columns: Union[list, str] = None,
         value_columns: Union[list, str] = None,
         var_name: str = "variable", value_name: str = "value") -> pd.DataFrame:
    """Unpivot a wide DataFrame into long format.

    Converts columns into rows, turning a wide table (one column per metric)
    into a long table with a ``variable`` column and a ``value`` column.

    Args:
        df: Input DataFrame in wide format.
        id_columns: Column(s) to keep as identifiers (not melted).
            Accepts a single string or a list. If None, all non-value columns
            are used.
        value_columns: Column(s) to unpivot. Accepts a single string or a list.
            If None, all columns not in *id_columns* are melted.
        var_name: Name for the new column holding the former column headers.
        value_name: Name for the new column holding the values.

    Returns:
        Long-format DataFrame.

    Examples:
        Wide input::

            | date    | price_cape | price_panama |
            | 2024-01 | 100        | 200          |

        ``melt(df, id_columns="date", var_name="source", value_name="price")``::

            | date    | source       | price |
            | 2024-01 | price_cape   | 100   |
            | 2024-01 | price_panama | 200   |
    """
    if isinstance(id_columns, str):
        id_columns = [c.strip() for c in id_columns.split(",")]
    if isinstance(value_columns, str):
        value_columns = [c.strip() for c in value_columns.split(",")]

    if id_columns:
        missing = [c for c in id_columns if c not in df.columns]
        if missing:
            raise ValueError(f"id_columns not found: {missing}. Available: {list(df.columns)}")

    if value_columns:
        missing = [c for c in value_columns if c not in df.columns]
        if missing:
            raise ValueError(f"value_columns not found: {missing}. Available: {list(df.columns)}")

    return pd.melt(
        df,
        id_vars=id_columns,
        value_vars=value_columns,
        var_name=var_name,
        value_name=value_name,
    )

arithmetic_op

arithmetic_op(df, left_column, right_column=None, constant=None, operator='ADD', output_column='result')

Apply an arithmetic operation between a column and another column or constant.

PARAMETER DESCRIPTION
df

Input DataFrame.

TYPE: DataFrame

left_column

Column name for the left operand.

TYPE: str

right_column

Column name for the right operand (mutually exclusive with constant).

TYPE: Optional[str] DEFAULT: None

constant

Scalar value for the right operand.

TYPE: Optional[float] DEFAULT: None

operator

One of 'ADD', 'SUB', 'PROD', 'DIV'.

TYPE: str DEFAULT: 'ADD'

output_column

Name of the result column.

TYPE: str DEFAULT: 'result'

RETURNS DESCRIPTION
DataFrame

DataFrame with the computed column added.

Source code in src/choregraph/library.py
def arithmetic_op(df: pd.DataFrame, left_column: str, right_column: Optional[str] = None,
                  constant: Optional[float] = None, operator: str = 'ADD',
                  output_column: str = 'result') -> pd.DataFrame:
    """Apply an arithmetic operation between a column and another column or constant.

    Args:
        df: Input DataFrame.
        left_column: Column name for the left operand.
        right_column: Column name for the right operand (mutually exclusive
            with *constant*).
        constant: Scalar value for the right operand.
        operator: One of ``'ADD'``, ``'SUB'``, ``'PROD'``, ``'DIV'``.
        output_column: Name of the result column.

    Returns:
        DataFrame with the computed column added.
    """

    if df is None or df.empty:
        return pd.DataFrame()

    res_df = df.copy()

    # 1. Validation et Conversion de la colonne de GAUCHE
    if left_column not in res_df.columns:
        raise ValueError(f"Column '{left_column}' not found")

    # On force la conversion en numérique (important pour éviter ton erreur !)
    left_val = pd.to_numeric(res_df[left_column], errors='coerce')

    # 2. Validation et Conversion de la partie de DROITE
    if right_column:
        if right_column not in res_df.columns:
            raise ValueError(f"Column '{right_column}' not found")
        right_val = pd.to_numeric(res_df[right_column], errors='coerce')
    elif constant is not None:
        right_val = constant
    else:
        raise ValueError("Either right_column or constant must be provided")

    # 3. Opérations avec les méthodes Pandas (plus safe que les opérateurs standards)
    if operator == 'ADD':
        res_df[output_column] = left_val.add(right_val)
    elif operator == 'SUB':
        res_df[output_column] = left_val.sub(right_val)
    elif operator == 'PROD':
        res_df[output_column] = left_val.mul(right_val)
    elif operator == 'DIV':
        res_df[output_column] = left_val.div(right_val)
    else:
        raise ValueError(f"Unsupported operator: {operator}. Use ADD, SUB, PROD, or DIV.")

    return res_df

normalize_column

normalize_column(df, column, method='minmax', output_column=None)

Normalize a numeric column using min-max scaling or z-score standardization.

  • 'minmax': (x - min) / (max - min)
  • 'zscore': (x - mean) / std
PARAMETER DESCRIPTION
df

Input DataFrame.

TYPE: DataFrame

column

Column to normalize.

TYPE: str

method

Normalization method — 'minmax' or 'zscore'.

TYPE: str DEFAULT: 'minmax'

output_column

Name of the result column (defaults to "{column}_norm").

TYPE: Optional[str] DEFAULT: None

RETURNS DESCRIPTION
DataFrame

DataFrame with the normalized column added.

Source code in src/choregraph/library.py
def normalize_column(df: pd.DataFrame, column: str, method: str = 'minmax',
                     output_column: Optional[str] = None) -> pd.DataFrame:
    """Normalize a numeric column using min-max scaling or z-score standardization.

    - ``'minmax'``: ``(x - min) / (max - min)``
    - ``'zscore'``: ``(x - mean) / std``

    Args:
        df: Input DataFrame.
        column: Column to normalize.
        method: Normalization method — ``'minmax'`` or ``'zscore'``.
        output_column: Name of the result column (defaults to
            ``"{column}_norm"``).

    Returns:
        DataFrame with the normalized column added.
    """
    if df is None or df.empty:
        return pd.DataFrame()

    res_df = df.copy()
    col_name = output_column if output_column else f"{column}_norm"

    if column not in res_df.columns:
        raise ValueError(f"Column '{column}' not found in DataFrame")

    if method == 'minmax':
        mi = res_df[column].min()
        ma = res_df[column].max()
        if ma - mi == 0:
            res_df[col_name] = 0.0
        else:
            res_df[col_name] = (res_df[column] - mi) / (ma - mi)
    elif method == 'zscore':
        mu = res_df[column].mean()
        sigma = res_df[column].std()
        if sigma == 0 or pd.isna(sigma):
            res_df[col_name] = 0.0
        else:
            res_df[col_name] = (res_df[column] - mu) / sigma
    else:
        raise ValueError(f"Unsupported normalization method: {method}")

    return res_df

discretize

discretize(df, column, bins=5, strategy='uniform', output_column=None, labels=None)

Discretize a continuous column into bins.

  • 'uniform': Equal-width bins.
  • 'quantile': Equal-frequency bins.
PARAMETER DESCRIPTION
df

Input DataFrame.

TYPE: DataFrame

column

Column to discretize.

TYPE: str

bins

Number of bins.

TYPE: int DEFAULT: 5

strategy

Binning strategy — 'uniform' or 'quantile'.

TYPE: str DEFAULT: 'uniform'

output_column

Name of the result column (defaults to "{column}_bin").

TYPE: Optional[str] DEFAULT: None

labels

Optional list of label names for the bins (e.g. ["low", "medium", "high"]). Must have length equal to bins. When omitted the bins are labelled with integers.

TYPE: Optional[list] DEFAULT: None

RETURNS DESCRIPTION
DataFrame

DataFrame with the binned column added.

Source code in src/choregraph/library.py
def discretize(df: pd.DataFrame, column: str, bins: int = 5,
               strategy: str = 'uniform', output_column: Optional[str] = None,
               labels: Optional[list] = None) -> pd.DataFrame:
    """Discretize a continuous column into bins.

    - ``'uniform'``: Equal-width bins.
    - ``'quantile'``: Equal-frequency bins.

    Args:
        df: Input DataFrame.
        column: Column to discretize.
        bins: Number of bins.
        strategy: Binning strategy — ``'uniform'`` or ``'quantile'``.
        output_column: Name of the result column (defaults to
            ``"{column}_bin"``).
        labels: Optional list of label names for the bins (e.g.
            ``["low", "medium", "high"]``).  Must have length equal to
            *bins*.  When omitted the bins are labelled with integers.

    Returns:
        DataFrame with the binned column added.
    """
    if df is None or df.empty:
        return pd.DataFrame()

    res_df = df.copy()
    col_name = output_column if output_column else f"{column}_bin"

    if column not in res_df.columns:
        raise ValueError(f"Column '{column}' not found in DataFrame")

    if labels is not None and len(labels) != bins:
        raise ValueError(
            f"labels length ({len(labels)}) must equal bins ({bins})"
        )

    bin_labels = labels if labels is not None else False

    if strategy == 'uniform':
        res_df[col_name] = pd.cut(res_df[column], bins=bins, labels=bin_labels)
    elif strategy == 'quantile':
        res_df[col_name] = pd.qcut(res_df[column], q=bins, labels=bin_labels, duplicates='drop')
    else:
        raise ValueError(f"Unsupported discretization strategy: {strategy}")

    return res_df

flatten_json

flatten_json(data, root_key=None, columns=None)

Convert arbitrary JSON structures into a flat DataFrame.

Auto-detects common JSON-to-table patterns and applies the best flattening strategy:

  1. Array of objects [{col: val, ...}, ...]pd.DataFrame(data) directly.
  2. Dict of paired arrays {key: [[x, y], ...], ...} (e.g. CoinGecko market data) → join arrays on shared first column, one column per key.
  3. Dict of simple arrays {key: [v1, v2, ...], ...} → one column per key (all same length).
  4. Keyed array of objects (when root_key is provided) {root_key: [{...}, ...]} → flattens the inner list.
  5. Nested / complexpd.json_normalize() as fallback.
PARAMETER DESCRIPTION
data

Loaded JSON data (dict or list).

TYPE: Union[dict, list]

root_key

Optional top-level key to drill into before flattening.

TYPE: str DEFAULT: None

columns

Optional comma-separated column names to assign to the resulting DataFrame (useful for unnamed arrays).

TYPE: str DEFAULT: None

RETURNS DESCRIPTION
DataFrame

A flat :class:~pandas.DataFrame.

Examples:

>>> flatten_json([{"a": 1, "b": 2}, {"a": 3, "b": 4}])
   a  b
0  1  2
1  3  4
>>> flatten_json({"prices": [[1, 100], [2, 200]],
...               "volumes": [[1, 50], [2, 60]]})
   timestamp  prices  volumes
0          1     100       50
1          2     200       60
Source code in src/choregraph/library.py
def flatten_json(
    data: Union[dict, list],
    root_key: str = None,
    columns: str = None,
) -> pd.DataFrame:
    """Convert arbitrary JSON structures into a flat DataFrame.

    Auto-detects common JSON-to-table patterns and applies the best
    flattening strategy:

    1. **Array of objects** ``[{col: val, ...}, ...]``
       → ``pd.DataFrame(data)`` directly.
    2. **Dict of paired arrays** ``{key: [[x, y], ...], ...}``
       (e.g. CoinGecko market data) → join arrays on shared first column,
       one column per key.
    3. **Dict of simple arrays** ``{key: [v1, v2, ...], ...}``
       → one column per key (all same length).
    4. **Keyed array of objects** (when *root_key* is provided)
       ``{root_key: [{...}, ...]}`` → flattens the inner list.
    5. **Nested / complex** → ``pd.json_normalize()`` as fallback.

    Args:
        data: Loaded JSON data (dict or list).
        root_key: Optional top-level key to drill into before flattening.
        columns: Optional comma-separated column names to assign to the
            resulting DataFrame (useful for unnamed arrays).

    Returns:
        A flat :class:`~pandas.DataFrame`.

    Examples:
        >>> flatten_json([{"a": 1, "b": 2}, {"a": 3, "b": 4}])
           a  b
        0  1  2
        1  3  4

        >>> flatten_json({"prices": [[1, 100], [2, 200]],
        ...               "volumes": [[1, 50], [2, 60]]})
           timestamp  prices  volumes
        0          1     100       50
        1          2     200       60
    """
    # --- drill into root_key if given (supports dotted paths like "result.XXBTZUSD") ---
    if root_key and isinstance(data, dict):
        for part in root_key.split("."):
            if isinstance(data, dict) and part in data:
                data = data[part]
            else:
                break

    # --- Pattern 1: list at the top level ---
    if isinstance(data, list):
        df = _flatten_list(data)
        return _apply_column_names(df, columns)

    # --- dict at the top level ---
    if isinstance(data, dict):
        # Pattern 2: dict of paired arrays {key: [[x,y], ...], ...}
        # All values are lists of equal-length lists (>=2 elements each)
        values = list(data.values())
        keys = list(data.keys())

        if len(values) > 0 and all(
            isinstance(v, list) and len(v) > 0 and isinstance(v[0], list)
            for v in values
        ):
            return _flatten_dict_of_paired_arrays(keys, values, columns)

        # Pattern 3: dict of simple (flat) arrays {key: [v1, v2, ...], ...}
        if len(values) > 0 and all(
            isinstance(v, list)
            and len(v) > 0
            and all(_is_primitive(item) for item in v)
            for v in values
        ):
            lengths = {len(v) for v in values}
            if len(lengths) == 1:
                return pd.DataFrame(data)

        # Pattern 4: dict with a single key whose value is a list
        if len(keys) == 1 and isinstance(values[0], list):
            df = _flatten_list(values[0])
            return _apply_column_names(df, columns)

        # Pattern 5: nested / complex dict → json_normalize fallback
        try:
            df = pd.json_normalize(data, sep="_")
            if not df.empty:
                return _apply_column_names(df, columns)
        except Exception:
            pass

    # Last resort: wrap scalar / unhandled in single-cell DataFrame
    return pd.DataFrame([{"value": json.dumps(data) if not _is_primitive(data) else data}])

remove_required_keys

remove_required_keys(data)

Parcourt récursivement le dictionnaire pour supprimer les clés 'required'.

Source code in src/choregraph/library.py
def remove_required_keys(data):
    """Parcourt récursivement le dictionnaire pour supprimer les clés 'required'."""
    if isinstance(data, dict):
        return {k: remove_required_keys(v) for k, v in data.items() if k != 'required'}
    elif isinstance(data, list):
        return [remove_required_keys(item) for item in data]
    else:
        return data

cartograph_json

cartograph_json(data, max_chars=5000, max_items=200)

Produce a structural cartography of a JSON document for the LLM.

Uses genson <https://github.com/wolverdude/genson>_ to infer a JSON Schema from the loaded data (one "skeleton" merging every record), then renders it as a compact ASCII hierarchy that the planning LLM embeds via :attr:DatasetStats.info["extract_with"] (rendered by :meth:MetadataResult._to_markdown).

PARAMETER DESCRIPTION
data

Loaded JSON value (dict, list, or primitive).

TYPE: Any

max_chars

Upper bound on the rendered tree length. Truncated with ... when exceeded.

TYPE: int DEFAULT: 5000

RETURNS DESCRIPTION
dict

{"schema": <genson schema dict>, "rendered": <str>, "length": <int>, "is_tabular": <bool>, "tabular_fields": [{"name": str, "dtype": str}, ...], "leaf_fields": [{"name": str, "dtype": str, "required": bool}, ...]}

Source code in src/choregraph/library.py
def cartograph_json(data: Any, max_chars: int = 5000, max_items: int = 200) -> dict:
    """Produce a structural cartography of a JSON document for the LLM.

    Uses `genson <https://github.com/wolverdude/genson>`_ to infer a JSON
    Schema from the loaded data (one "skeleton" merging every record), then
    renders it as a compact ASCII hierarchy that the planning LLM embeds via
    :attr:`DatasetStats.info["extract_with"]` (rendered by
    :meth:`MetadataResult._to_markdown`).

    Args:
        data: Loaded JSON value (dict, list, or primitive).
        max_chars: Upper bound on the rendered tree length. Truncated with
            ``...`` when exceeded.

    Returns:
        ``{"schema": <genson schema dict>, "rendered": <str>,
           "length": <int>, "is_tabular": <bool>,
           "tabular_fields": [{"name": str, "dtype": str}, ...],
           "leaf_fields": [{"name": str, "dtype": str, "required": bool}, ...]}``
    """
    if _exceeds_json_depth(data, MAX_JSON_DEPTH):
        raise JsonTooDeepError(
            f"JSON nests deeper than {MAX_JSON_DEPTH} levels"
        )

    from genson import SchemaBuilder
    builder = SchemaBuilder()
    sampled = _sample_for_schema(data, max_items) if max_items else data
    if isinstance(sampled, (dict, list)):
        builder.add_object(sampled)
    else:
        builder.add_object({"value": sampled})
    schema = builder.to_schema()

    rendered = json.dumps(remove_required_keys(schema), separators=(",", ":"))
    if len(rendered) > max_chars:
        rendered = rendered[: max_chars - 3].rstrip() + "..."
    length = _infer_root_length(data, schema)
    is_tabular, fields = _detect_tabular(data, schema)
    leaf_fields = _collect_leaf_fields(schema)

    return {
        "schema": schema,
        "rendered": rendered,
        "length": length,
        "is_tabular": is_tabular,
        "tabular_fields": fields,
        "leaf_fields": leaf_fields,
    }

execute_code

execute_code(code, **kwargs)

Execute Python code with one or more DataFrame inputs.

All input DataFrames are available in the code by their port name. The code must assign its result to a variable named result. All scientific Python libraries installed in the environment are available. System, IO, and network modules are blocked.

Source code in src/choregraph/library.py
def execute_code(code: str, **kwargs) -> pd.DataFrame:
    """Execute Python code with one or more DataFrame inputs.

    All input DataFrames are available in the code by their port name.
    The code must assign its result to a variable named ``result``.
    All scientific Python libraries installed in the environment are available.
    System, IO, and network modules are blocked.
    """
    _validate_code_safety(code)

    # Use a single namespace for both globals and locals so that functions
    # defined in the code can access top-level variables (pd, np, inputs).
    # With separate dicts, nested def/class scopes only search globals, not
    # the exec locals — causing "name 'pd' is not defined" errors.
    namespace = {'__builtins__': __builtins__, 'pd': pd, 'np': np,
                 'true': True, 'false': False, 'null': None}
    for name, value in kwargs.items():
        if isinstance(value, pd.DataFrame):
            namespace[name] = value.copy()
        else:
            namespace[name] = wrap_json_input(name, value)

    exec(code, namespace)

    result = namespace.get('result')
    if result is None:
        raise ValueError(
            "Code must assign output to a variable named 'result'. "
            "Example: result = df[df['x'] > 5]"
        )
    if not isinstance(result, pd.DataFrame):
        if isinstance(result, pd.Series):
            result = result.to_frame()
        else:
            raise TypeError(f"'result' must be a DataFrame, got {type(result).__name__}")
    return result

concat_partitions

concat_partitions(partitioned)

Concatenate a PartitionedDataset into a single DataFrame.

Loads every partition in sorted key order, tags each row with a __partition__ column (float index: 0.0, 1.0, …), and concatenates them into one DataFrame. Use this before applying transforms that need global context (consistent bin edges, global aggregates, etc.).

Pair with :func:split_partitions to restore the partitioned structure after the transform.

PARAMETER DESCRIPTION
partitioned

Kedro PartitionedDataset dict {key: callable_or_df}.

TYPE: dict

RETURNS DESCRIPTION
DataFrame

Single DataFrame with an added __partition__ column.

Source code in src/choregraph/library.py
def concat_partitions(partitioned: dict) -> pd.DataFrame:
    """Concatenate a PartitionedDataset into a single DataFrame.

    Loads every partition in sorted key order, tags each row with a
    ``__partition__`` column (float index: 0.0, 1.0, …), and concatenates
    them into one DataFrame.  Use this before applying transforms that need
    global context (consistent bin edges, global aggregates, etc.).

    Pair with :func:`split_partitions` to restore the partitioned structure
    after the transform.

    Args:
        partitioned: Kedro PartitionedDataset dict ``{key: callable_or_df}``.

    Returns:
        Single DataFrame with an added ``__partition__`` column.
    """
    sorted_keys = sorted(partitioned.keys())
    frames = []
    for i, key in enumerate(sorted_keys):
        loader = partitioned[key]
        df = (loader() if callable(loader) else loader).copy()
        df["__partition__"] = float(i)
        frames.append(df)
    return pd.concat(frames, ignore_index=True) if frames else pd.DataFrame()