Skip to content

Choregraph (Facade)

The main entry point for pipeline lifecycle management. Orchestrates XML spec parsing, Kedro project generation, pipeline execution, data caching, and DIVE VisuSpec export.

This is the only class exported from the choregraph package — all interaction starts here.

choregraph

Choregraph facade -- the main entry point for pipeline lifecycle management.

This module provides the :class:Choregraph class which orchestrates XML spec parsing, Kedro project generation, pipeline execution, data caching, and DIVE VisuSpec export. It delegates to the parser, builder, wrapper, and connectors modules internally.

Choregraph

Choregraph(xml_spec=None, external_inputs=None, workspace_path=None, kedro_viz=True)

Main facade for Choregraph pipeline lifecycle management.

Orchestrates XML spec parsing, Kedro project generation, pipeline execution, data caching, and DIVE VisuSpec export. Supports both programmatic pipeline construction (via :meth:add_input / :meth:add_node) and loading from XML.

Can be used as a context manager::

with Choregraph(xml_spec="pipeline.xml") as cg:
    cg.run()
    df = cg.get_dataset("my_output")
Source code in src/choregraph/choregraph.py
def __init__(self, xml_spec: Union[str, Path] = None, external_inputs: Dict[str, Any] = None, workspace_path: Union[str, Path] = None, kedro_viz: bool = True):
    self.workspace_path = Path(workspace_path) if workspace_path else None
    self.external_inputs = external_inputs or {}
    self.kedro_viz = kedro_viz
    self._listeners = []

    # RAM Caches (Read-Through)
    self._data_cache = {}      
    self._spec_hash = None
    self._last_run_hash = None

    # ID -> Name mapping (internal, independent of visuspec)
    self._id_name_map = {}

    # Catalog Cache (Performance)
    self._catalog_instance = None

    # Auto-detect existing spec
    if xml_spec is None and self.workspace_path:
        default_xml = self.workspace_path / "choregraph.xml"
        if default_xml.exists():
            logger.info(f"Auto-detected existing spec at {default_xml}")
            xml_spec = default_xml

    if xml_spec:
        self.spec = ChoregraphSpecParser.parse(xml_spec)
    else:
        self.spec = ChoregraphSpec()

    self.transform_registry = TRANSFORM_REGISTRY.copy()

    # We assume the wrapper exists or will be built on run/add
    if self.workspace_path and not (self.workspace_path / "pipeline").exists():
      self._ensure_wrapper()

get_xsd

get_xsd()

Get the XSD content as a string (bundled with the package).

Source code in src/choregraph/choregraph.py
def get_xsd(self) -> str:
    """Get the XSD content as a string (bundled with the package)."""
    xsd_path = Path(__file__).parent / "TransformGraph.xsd"
    return xsd_path.read_text(encoding="utf-8")

run

run(lazy=True)

Execute the pipeline using a Kedro session.

Generates Kedro project files, dumps external inputs to disk, and runs the pipeline via SequentialRunner. Supports lazy evaluation — if the spec and input files haven't changed, cached results are returned.

PARAMETER DESCRIPTION
lazy

If True, skip execution when the spec hash is unchanged.

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
bool

A tuple (success, error_message) where success is True

str

when the pipeline executed (or was skipped) without error, and

Tuple[bool, str]

error_message contains the failure description otherwise.

RAISES DESCRIPTION
ValueError

If workspace_path is not set.

Source code in src/choregraph/choregraph.py
def run(self, lazy: bool = True) -> Tuple[bool, str]:
    """Execute the pipeline using a Kedro session.

    Generates Kedro project files, dumps external inputs to disk, and runs
    the pipeline via ``SequentialRunner``. Supports lazy evaluation — if the
    spec and input files haven't changed, cached results are returned.

    Args:
        lazy: If True, skip execution when the spec hash is unchanged.

    Returns:
        A tuple ``(success, error_message)`` where *success* is ``True``
        when the pipeline executed (or was skipped) without error, and
        *error_message* contains the failure description otherwise.

    Raises:
        ValueError: If ``workspace_path`` is not set.
    """
    if not self.workspace_path:
        raise ValueError("Workspace path required for execution.")

    current_hash = self._get_project_hash()
    # print(f"Current project hash: {current_hash}")
    # print(f"Last run hash: {self._last_run_hash}")
    # 1. Lazy Check
    if lazy and self._last_run_hash == current_hash:
        logger.info("Pipeline inputs/spec unchanged. Skipping run.")
        # self._emit("graph_update", {})
        self._emit("status", {"status": "completed"})
        return (True, "")

    # # 1b. Hash changed - purge cached parquet files to prevent stale data
    # data_dir = self.workspace_path / "pipeline" / "data"
    # if data_dir.exists():
    #     for parquet_file in data_dir.glob("*.parquet"):
    #         try:
    #             parquet_file.unlink()
    #             logger.info(f"Purged stale parquet: {parquet_file.name}")
    #         except Exception as e:
    #             logger.warning(f"Failed to delete {parquet_file}: {e}")

    self._emit("status", {"status": "running"})

    # 2. Sync Wrapper (Source of Truth)
    self._ensure_wrapper()

    # 3. Dump file-backed inputs to disk; collect MEMORY inputs for hook injection
    inputs_dir = self._inputs_dir
    inputs_dir.mkdir(parents=True, exist_ok=True)

    memory_datasets = {}  # {catalog_name: DataFrame} — injected via hook
    for input_id, data in self.external_inputs.items():
        inp = next((i for i in self.spec.inputs if str(i.id) == str(input_id)), None)
        is_memory = inp and getattr(inp, "format", "").upper() == "MEMORY"

        if is_memory and isinstance(data, pd.DataFrame):
            catalog_name = self.spec.get_name(input_id)
            memory_datasets[catalog_name] = data
        elif isinstance(data, pd.DataFrame):
            file_path = inputs_dir / f"{input_id}.parquet"
            data.to_parquet(file_path)
        elif isinstance(data, (dict, list)):
            file_path = safe_path(inputs_dir / f"{input_id}.json")
            with open(file_path, 'w', encoding='utf-8') as f:
                json.dump(data, f, indent=4)
        else:
            logger.warning(f"Unsupported input data type for {input_id}: {type(data)}")

    # 4. HOT RELOAD FIX
    self._purge_wrapper_modules()

    # 5. Run Session
    if not self.spec.nodes:
        logger.info("Pipeline is empty (no nodes). Skipping Kedro run.")
        self._last_run_hash = current_hash
        # Invalidate catalog because wrapper might have changed inputs
        self._catalog_instance = None

        self._data_cache.clear()

        # No Kedro session means MetadataStatsHook won't fire.
        # Populate stats for each input so catalogue_stats.json is filled.
        # Also populate _data_cache for MEMORY inputs so get_dataset() works.
        #
        # Use cached stats when available: history switches reset the run
        # hash (spec changed) but the underlying input data is the same.
        # Only recompute for inputs missing from the cache or MEMORY inputs
        # (which can change between runs).
        metadata = self._datasets_metadata
        if metadata is not None:
            cached = metadata.read_from_cache()
            catalog = self._get_catalog()
            for inp in self.spec.inputs:
                name = self.spec.get_name(inp.id)
                fmt = getattr(inp, "format", "").upper()

                # MHD/ZRAW: not in catalog — parse header directly
                if fmt in ("MHD",):
                    if name not in cached or not getattr(cached[name], "fields", None):
                        loc = inp.location
                        if loc and not Path(loc).is_absolute() and self.workspace_path:
                            loc = str((Path(self.workspace_path) / loc).resolve())
                        fields = metadata._describe_mhd(loc)
                        metadata.store_stats(name, fields, 0, dataset_id=str(inp.id))
                    continue
                if fmt == "ZRAW":
                    continue  # companion file, skip entirely

                # MEMORY inputs: always recompute (data can change between runs)
                if fmt == "MEMORY" and str(inp.id) in self.external_inputs:
                    data = self.external_inputs[str(inp.id)]
                    self._data_cache[name] = data
                    metadata.update_stats(name, data, dataset_id=str(inp.id))
                    continue

                # Skip inputs that already have cached stats
                has_cached = name in cached and getattr(cached[name], "fields", None)
                if has_cached:
                    # Ensure partition field is present for temporal inputs
                    temporal_files = inp.options.get("temporalFiles")
                    if temporal_files:
                        n = len(temporal_files.split("|"))
                        label = "time" if inp.options.get("collectionTimeMode") else "partition"
                        metadata.add_partition_field(name, n, partition_label=label)
                    continue

                try:
                    temporal_files = inp.options.get("temporalFiles")
                    if temporal_files:
                        n = len(temporal_files.split("|"))
                        # Load the PartitionedDataset ONCE and iterate
                        # partitions directly.  The old code called
                        # get_dataset(id, time=t) per timestep which
                        # triggered N redundant catalog.load() calls —
                        # each rescanning the directory and rebuilding
                        # the full lazy dict.
                        partitioned = catalog.load(name)
                        if isinstance(partitioned, dict) and partitioned and all(callable(v) for v in partitioned.values()):
                            dfs = []
                            for key in sorted(partitioned.keys()):
                                try:
                                    df_t = partitioned[key]()
                                    if isinstance(df_t, pd.DataFrame):
                                        dfs.append(df_t)
                                except Exception:
                                    pass
                            if dfs:
                                full_df = pd.concat(dfs, ignore_index=True)
                                metadata.update_stats(name, full_df, dataset_id=str(inp.id))
                        else:
                            metadata.update_stats(name, partitioned, dataset_id=str(inp.id))
                        label = "time" if inp.options.get("collectionTimeMode") else "partition"
                        metadata.add_partition_field(name, n, partition_label=label)
                    else:
                        data = self.get_dataset(str(inp.id))
                        metadata.update_stats(name, data, dataset_id=str(inp.id))
                except Exception as e:
                    logger.warning(f"Could not extract stats for input '{name}': {e}")

        self._emit("graph_update", {})
        self._emit("status", {"status": "completed"})
        return (True, "")

    wrapper_path = self.workspace_path / "pipeline"


    with pushd(wrapper_path):
        try:
            # Bootstrap and settings.py (with hooks) are loaded here
            bootstrap_project(wrapper_path)

            with KedroSession.create(project_path=wrapper_path, env="local") as session:
                # Inject in-memory datasets before pipeline runs
                if memory_datasets:
                    from .hooks import DataInjectionHook
                    session._hook_manager.register(DataInjectionHook(memory_datasets))

                # Clean dtypes on load so union/concat never mixes types
                from .hooks import DtypeInferenceHook
                session._hook_manager.register(DtypeInferenceHook())

                # Register MetadataStatsHook to capture stats during execution
                if self._datasets_metadata is not None:
                    from .hooks import MetadataStatsHook
                    stats_hook = MetadataStatsHook(self._datasets_metadata, self.spec)
                    session._hook_manager.register(stats_hook)

                # Register ExecutionStatusHook for UI updates (if listeners registered)
                # if self._listeners:
                #     from .hooks import ExecutionStatusHook
                #     excluded_nodes = set()
                #     # for n in self.spec.nodes:
                    #     if not n.visibility:
                    #         label_text = n.label if n.label else n.type
                    #         sanitized_label = "".join(c if c.isalnum() or c in ("-", "_", ".") else "_" for c in label_text)
                    #         excluded_nodes.add(sanitized_label)

                    # status_hook = ExecutionStatusHook(
                    #     on_update=lambda status: self._emit("node_status", status),
                    #     excluded_nodes=excluded_nodes
                    # )
                    # session._hook_manager.register(status_hook)
                runner=SequentialRunner(is_async=True)
                session.run(runner=runner)

        except Exception as e:
            logger.error(f"Kedro Run failed: {e}")
            self._emit("status", {"error": str(e)})
            return (False, str(e))


    # 8. Update State
    self._last_run_hash = current_hash

    self._emit("graph_update", {})
    self._emit("status", {"status": "completed"})

    # Clear data cache to prevent stale data (metadata is persisted in JSON).
    # Keep _catalog_instance alive — catalog.yml hasn't changed since
    # _ensure_wrapper() was called at the start of run().
    self._data_cache.clear()

    # Re-populate cache for MEMORY inputs — Kedro's MemoryDataset is
    # per-session, so data is lost when the session closes.
    for inp in self.spec.inputs:
        if getattr(inp, "format", "").upper() == "MEMORY":
            input_id = str(inp.id)
            if input_id in self.external_inputs:
                name = self.spec.get_name(input_id)
                self._data_cache[name] = self.external_inputs[input_id]

    return (True, "")

get_dataset

get_dataset(data_id, time=None)

Load a dataset by ID.

For PartitionedDataset entries (temporal collections, etc.): - time=None: loads the first partition (representative). - time=N: loads the Nth partition.

Returns whatever the underlying dataset produces (DataFrame, Image, dict, etc.).

Source code in src/choregraph/choregraph.py
def get_dataset(self, data_id: str, time: int | None = None) -> Any:
    """Load a dataset by ID.

    For ``PartitionedDataset`` entries (temporal collections, etc.):
    - ``time=None``: loads the first partition (representative).
    - ``time=N``: loads the Nth partition.

    Returns whatever the underlying dataset produces (DataFrame, Image, dict, etc.).
    """
    # 1. Rebuild mapping if empty (lazy init)
    if not self._id_name_map:
        self._rebuild_id_map()

    # 2. Resolve ID -> name
    real_name = self.spec.get_name(data_id)
    if real_name == data_id and data_id in self._id_name_map:
        real_name = self._id_name_map[data_id]

    # 3. Check cache
    cache_key = f"{real_name}:t{time}" if time is not None else real_name
    if cache_key in self._data_cache:
        return self._data_cache[cache_key]

    # 4. Load via DataCatalog
    logger.info(f"Loading {real_name} (ID: {data_id}, time={time}) via catalog...")
    catalog = self._get_catalog()

    names_to_try = [real_name, f"params:{real_name}"]
    for sfx in self._CATALOG_SUFFIXES:
        names_to_try.append(f"{real_name}{sfx}")

    last_err = None
    for name in names_to_try:
        try:
            data = catalog.load(name)

            # Temporal PartitionedDataset: Dict[str, Callable] — load specific partition
            if time is not None and isinstance(data, dict) and data and all(callable(v) for v in data.values()):
                sorted_keys = sorted(data.keys())
                idx = max(0, min(time, len(sorted_keys) - 1))
                result = data[sorted_keys[idx]]()
                self._data_cache[cache_key] = result
                return result

            # PartitionedDataset without time param: load first partition
            # (only for temporal inputs — other PartitionedDatasets like Excel
            # multi-sheet return the full dict for downstream pipeline handling)
            if isinstance(data, dict) and data and all(callable(v) for v in data.values()):
                # Check if this input has temporalFiles (temporal collection)
                inp = next((i for i in self.spec.inputs if self.spec.get_name(i.id) == real_name), None)
                if inp and inp.options.get("temporalFiles"):
                    sorted_keys = sorted(data.keys())
                    result = data[sorted_keys[0]]()
                    self._data_cache[cache_key] = result
                    return result

            self._data_cache[cache_key] = data
            return data
        except Exception as e:
            last_err = e
            continue

    logger.error(f"Failed to load dataset {real_name}: {last_err}")
    raise last_err

list_data

list_data()

List all available datasets including dynamically generated multi-table outputs.

Source code in src/choregraph/choregraph.py
def list_data(self) -> List[str]:
    """List all available datasets including dynamically generated multi-table outputs."""
    cat = self._get_catalog()
    base_list = []
    if hasattr(cat, "list"):
        base_list = cat.list()
    else:
        base_list = list(cat)

    # Also include promoted/external inputs and partitioned outputs
    if self._inputs_dir.exists():
        for parquet_file in self._inputs_dir.glob("*.parquet"):
            dataset_name = parquet_file.stem
            if dataset_name not in base_list:
                base_list.append(dataset_name)

    # Also check partitioned folders that haven't been processed yet
    if self._data_dir.exists():
        for partitioned_dir in self._data_dir.glob("*_partitioned"):
            if partitioned_dir.is_dir():
                for parquet_file in partitioned_dir.glob("*.parquet"):
                    dataset_name = parquet_file.stem
                    if dataset_name not in base_list:
                        base_list.append(dataset_name)

    return base_list

get_id_for_name

get_id_for_name(name)

Reverse lookup: get ID for a dataset name.

Source code in src/choregraph/choregraph.py
def get_id_for_name(self, name: str) -> Optional[str]:
    """Reverse lookup: get ID for a dataset name."""
    if not self._id_name_map:
        self._rebuild_id_map()
    return next((k for k, v in self._id_name_map.items() if v == name), None)

get_datasets_metadata

get_datasets_metadata()

Get full datasets metadata from catalogue_stats.json.

Delegates to :meth:MetadataResult.to_api_format.

Source code in src/choregraph/choregraph.py
def get_datasets_metadata(self) -> List[Dict[str, Any]]:
    """Get full datasets metadata from catalogue_stats.json.

    Delegates to :meth:`MetadataResult.to_api_format`.
    """
    if self._datasets_metadata is None:
        return []
    return self._datasets_metadata.read_from_cache().to_api_format()

update_from_spec

update_from_spec(xml_spec)

Replace the current pipeline specification by parsing new XML.

PARAMETER DESCRIPTION
xml_spec

Path to an XML file or an XML string.

TYPE: Union[str, Path]

Source code in src/choregraph/choregraph.py
def update_from_spec(self, xml_spec: Union[str, Path]):
    """Replace the current pipeline specification by parsing new XML.

    Args:
        xml_spec: Path to an XML file or an XML string.
    """
    self.spec = ChoregraphSpecParser.parse(xml_spec)
    self._last_run_hash = None
    self._emit("graph_update", None)

export_to_xml

export_to_xml(save_to_path)

Serialize the current pipeline specification to an XML file.

PARAMETER DESCRIPTION
save_to_path

Destination file path for the XML output.

TYPE: Union[str, Path]

Source code in src/choregraph/choregraph.py
def export_to_xml(self, save_to_path: Union[str, Path]):
    """Serialize the current pipeline specification to an XML file.

    Args:
        save_to_path: Destination file path for the XML output.
    """
    from .versions import TRANSFORMGRAPH_SCHEMA_VERSION

    root = etree.Element("choregraph")
    root.set("schemaVersion", TRANSFORMGRAPH_SCHEMA_VERSION)
    inputs_elem = etree.SubElement(root, "inputs")
    pipeline_elem = etree.SubElement(root, "pipeline")

    for inp in self.spec.inputs:
        input_attrs = {
            "id": str(inp.id),
            "label": str(inp.label) if inp.label is not None else "",
            "location": str(inp.location) if inp.location is not None else "",
            "format": str(inp.format) if inp.format is not None else "",
            "visibility": "true" if inp.visibility else "false"
        }
        if inp.url:
            input_attrs["url"] = str(inp.url)
        for k, v in inp.options.items():
            input_attrs[str(k)] = str(v)
        etree.SubElement(inputs_elem, "input", **input_attrs)


    for node in self.spec.nodes:
        node_attrs = {
            "id": str(node.id),
            "label": str(node.label) if node.label is not None else "",
            "type": str(node.type) if node.type is not None else ""
        }

        node_elem = etree.SubElement(pipeline_elem, "node", **node_attrs)

        # Input ports (flattened - directly under node)
        for port in node.input_ports:
            port_attrs = {"name": str(port.name) if port.name is not None else ""}
            if port.source_ref is not None:
                port_attrs["sourceRef"] = str(port.source_ref)
            if port.value is not None:
                port_attrs["value"] = str(port.value)
            if port.type:
                port_attrs["type"] = str(port.type)
            etree.SubElement(node_elem, "inputPort", **port_attrs)

        # Output ports (flattened - directly under node)
        for port in node.output_ports:
            port_attrs = {
                "id": str(port.id),
                "name": str(port.name) if port.name is not None else "",
                "visibility": "true" if port.visibility else "false"
            }
            if port.type:
                port_attrs["type"] = str(port.type)
            if port.label:
                port_attrs["label"] = str(port.label)
            etree.SubElement(node_elem, "outputPort", **port_attrs)


    tree = etree.ElementTree(root)
    tree.write(str(save_to_path), pretty_print=True, xml_declaration=True, encoding="utf-8")

add_input

add_input(id, location='', format='CSV', label=None, visibility=False, url=None, data=None, **options)

Add an input data source.

PARAMETER DESCRIPTION
id

Unique input ID (string).

TYPE: str

location

File path or URL. Not required for in-memory data.

TYPE: str DEFAULT: ''

format

Data format (CSV, JSON, MEMORY, etc.). Set automatically to "MEMORY" when data is provided.

TYPE: str DEFAULT: 'CSV'

label

Human-readable label (auto-generated if None).

TYPE: str DEFAULT: None

visibility

Whether input is visible in visualization.

TYPE: bool DEFAULT: False

url

Origin URL for URL-based data sources.

TYPE: str DEFAULT: None

data

Optional in-memory data (pandas DataFrame, dict, or list). When provided, the input is stored in external_inputs and format is set to "MEMORY". No disk file is needed — Kedro reads from pipeline/data/inputs/{id}.parquet.

DEFAULT: None

**options

Additional format-specific options.

DEFAULT: {}

Source code in src/choregraph/choregraph.py
def add_input(self, id: str, location: str = "", format: str = "CSV", label: str = None, visibility: bool = False, url: str = None, data=None,
              **options):
    """Add an input data source.

    Args:
        id: Unique input ID (string).
        location: File path or URL. Not required for in-memory data.
        format: Data format (CSV, JSON, MEMORY, etc.).
            Set automatically to ``"MEMORY"`` when ``data`` is provided.
        label: Human-readable label (auto-generated if None).
        visibility: Whether input is visible in visualization.
        url: Origin URL for URL-based data sources.
        data: Optional in-memory data (pandas DataFrame, dict, or list).
            When provided, the input is stored in ``external_inputs``
            and ``format`` is set to ``"MEMORY"``. No disk file is needed —
            Kedro reads from ``pipeline/data/inputs/{id}.parquet``.
        **options: Additional format-specific options.
    """
    id_str = str(id)

    # Enforce uniqueness across entire namespace (check node IDs and output port IDs)
    for n in self.spec.nodes:
        if str(n.id) == id_str:
            raise ValueError(f"ID {id} is already used by a Node. IDs must be unique across all entities.")
        for op in n.output_ports:
            if str(op.id) == id_str:
                raise ValueError(f"ID {id} is already used by an output port. IDs must be unique across all entities.")

    # In-memory data: store in external_inputs, set format to MEMORY
    if data is not None:
        self.external_inputs[id_str] = data
        format = "MEMORY"

    # Auto-generate label if missing
    if label is None:
        label = Path(location).stem if location else f"Input {id}"

    for inp in self.spec.inputs:
        if str(inp.id) == id_str:
            self.spec.inputs.remove(inp)
            break

    self.spec.inputs.append(InputSpec(
        id=id_str, label=label, location=location, format=format,
        visibility=visibility, url=url, options=options,
    ))
    if visibility:
        if not any(o.id == id_str for o in self.spec.outputs):
            self.spec.outputs.append(OutputSpec(id=id_str))

    self._last_run_hash = None
    if self.workspace_path:
        self._ensure_wrapper()
    # Invalidate catalog cache so new entries are picked up
    self._catalog_instance = None
    self._emit("graph", {})

add_node

add_node(id, type, input_ports, output_ports=None, label=None)

Add a node to the pipeline.

PARAMETER DESCRIPTION
id

Unique node ID

TYPE: str

type

Transform function name

TYPE: str

input_ports

List of input port specifications

TYPE: List[InputPortSpec]

output_ports

List of output port specifications (auto-generated if None)

TYPE: List[OutputPortSpec] DEFAULT: None

label

Human-readable label (auto-generated if None)

TYPE: str DEFAULT: None

Source code in src/choregraph/choregraph.py
def add_node(self, id: str, type: str, input_ports: List[InputPortSpec], output_ports: List[OutputPortSpec] = None, label: str = None):
    """Add a node to the pipeline.

    Args:
        id: Unique node ID
        type: Transform function name
        input_ports: List of input port specifications
        output_ports: List of output port specifications (auto-generated if None)
        label: Human-readable label (auto-generated if None)
    """
    id_str = str(id)

    # Enforce uniqueness across entire namespace
    if any(str(inp.id) == id_str for inp in self.spec.inputs):
        raise ValueError(f"ID {id} is already used by an Input. IDs must be unique across all entities.")

    # Auto-generate label if missing
    if label is None:
        label = f"{type}_({id})"

    # Auto-generate output ports if not provided
    if output_ports is None:
        # Generate next available integer ID for output port
        existing_output_ids = set()
        for inp in self.spec.inputs:
            try:
                existing_output_ids.add(int(inp.id))
            except ValueError:
                pass
        for n in self.spec.nodes:
            for op in n.output_ports:
                existing_output_ids.add(op.id)
        next_id = max(existing_output_ids, default=0) + 1
        # Default: single "result" output with visibility=False
        output_ports = [OutputPortSpec(id=next_id, name="result", type="DATAFRAME", visibility=False)]

    # Upsert: remove existing node with same ID
    self.remove_node(id)

    new_node_spec = NodeSpec(id=id_str, label=label, type=type, input_ports=input_ports, output_ports=output_ports)
    self.spec.nodes.append(new_node_spec)

    # Update outputs list for visible output ports
    for op in output_ports:
        if op.visibility:
            if not any(o.id == str(op.id) for o in self.spec.outputs):
                self.spec.outputs.append(OutputSpec(id=str(op.id)))

    self._last_run_hash = None
    if self.workspace_path:
        self._ensure_wrapper()
    self._emit("graph", {})

remove_node

remove_node(id)

Remove a node from the pipeline.

Source code in src/choregraph/choregraph.py
def remove_node(self, id: str):
    """Remove a node from the pipeline."""
    id_str = str(id)
    self.spec.nodes = [n for n in self.spec.nodes if str(n.id) != id_str]
    self._last_run_hash = None
    if self.workspace_path:
        self._ensure_wrapper()
    self._emit("graph", {})

remove_input

remove_input(id)

Remove an input from the pipeline.

This removes the input from both the inputs list and outputs list (if visible). It also triggers catalog regeneration to ensure catalog.yml is updated.

PARAMETER DESCRIPTION
id

The input ID to remove

TYPE: str

Source code in src/choregraph/choregraph.py
def remove_input(self, id: str):
    """Remove an input from the pipeline.

    This removes the input from both the inputs list and outputs list (if visible).
    It also triggers catalog regeneration to ensure catalog.yml is updated.

    Args:
        id: The input ID to remove
    """
    id_str = str(id)

    # Remove from inputs list
    self.spec.inputs = [inp for inp in self.spec.inputs if str(inp.id) != id_str]

    # Remove from outputs list if it was visible
    self.spec.outputs = [out for out in self.spec.outputs if str(out.id) != id_str]

    # Invalidate cache and regenerate catalog
    self._last_run_hash = None
    if self.workspace_path:
        self._ensure_wrapper()
    self._catalog_instance = None
    self._emit("graph", {})

subscribe

subscribe(callback)

Register a listener for pipeline events.

PARAMETER DESCRIPTION
callback

Function called with (event_type, payload) on each event. Event types include "status", "graph", "graph_update", and "node_status".

TYPE: Callable[[str, Any], None]

Source code in src/choregraph/choregraph.py
def subscribe(self, callback: Callable[[str, Any], None]):
    """Register a listener for pipeline events.

    Args:
        callback: Function called with ``(event_type, payload)`` on each event.
            Event types include ``"status"``, ``"graph"``, ``"graph_update"``,
            and ``"node_status"``.
    """
    if callback not in self._listeners: self._listeners.append(callback)

unsubscribe

unsubscribe(callback)

Remove a previously registered event listener.

PARAMETER DESCRIPTION
callback

The callback function to remove.

TYPE: Callable[[str, Any], None]

Source code in src/choregraph/choregraph.py
def unsubscribe(self, callback: Callable[[str, Any], None]):
    """Remove a previously registered event listener.

    Args:
        callback: The callback function to remove.
    """
    if callback in self._listeners: self._listeners.remove(callback)

close

close()

Release cached data and catalog resources.

Source code in src/choregraph/choregraph.py
def close(self):
    """Release cached data and catalog resources."""
    self._data_cache.clear()
    self._catalog_instance = None
    self._purge_wrapper_modules()

reset_spec

reset_spec()

Reset the spec to an empty state, clearing all inputs, nodes, and outputs.

Source code in src/choregraph/choregraph.py
def reset_spec(self):
    """Reset the spec to an empty state, clearing all inputs, nodes, and outputs."""
    self.spec = ChoregraphSpec()
    self._data_cache.clear()
    if self._datasets_metadata is not None:
        self._datasets_metadata.clear()
    self._catalog_instance = None
    self._last_run_hash = None
    self._spec_hash = None
    if self.workspace_path:
        self._ensure_wrapper()
    self._emit("graph", {})

load

load(xml_spec=None, external_inputs=None, workspace_path=None)

Load or reload a pipeline specification (compatibility layer).

Re-parses the XML spec and regenerates Kedro project files if the spec content has changed since the last call.

PARAMETER DESCRIPTION
xml_spec

Path to an XML file or an XML string.

TYPE: Union[str, Path] DEFAULT: None

external_inputs

Dict mapping input IDs to in-memory data objects.

TYPE: Dict[str, Any] DEFAULT: None

workspace_path

Override the workspace directory.

TYPE: Union[str, Path] DEFAULT: None

Source code in src/choregraph/choregraph.py
def load(self, xml_spec: Union[str, Path] = None, external_inputs: Dict[str, Any] = None, workspace_path: Union[str, Path] = None):
    """Load or reload a pipeline specification (compatibility layer).

    Re-parses the XML spec and regenerates Kedro project files if the spec
    content has changed since the last call.

    Args:
        xml_spec: Path to an XML file or an XML string.
        external_inputs: Dict mapping input IDs to in-memory data objects.
        workspace_path: Override the workspace directory.
    """
    old_workspace = self.workspace_path
    if workspace_path: self.workspace_path = Path(workspace_path)
    workspace_changed = (old_workspace != self.workspace_path)

    new_spec_content = ""
    if xml_spec:
        if isinstance(xml_spec, (str, Path)) and Path(xml_spec).exists():
             new_spec_content = safe_path(xml_spec).read_text()
        else:
             new_spec_content = str(xml_spec)

    new_hash = hashlib.sha256(new_spec_content.encode('utf-8')).hexdigest()
    spec_changed = (new_hash != getattr(self, '_spec_hash', None))

    if spec_changed or workspace_changed:
        # Purge cached Kedro wrapper modules to prevent stale imports
        # when switching between rooms/workspaces
        self._purge_wrapper_modules()

        self._spec_hash = new_hash
        if xml_spec: self.spec = ChoregraphSpecParser.parse(xml_spec)
        else: self.spec = ChoregraphSpec()

        # Clear all caches when spec or workspace changes to prevent stale data
        self._data_cache.clear()
        self._catalog_instance = None

        self._last_run_hash = None
        if self.workspace_path: self._ensure_wrapper()
        self._emit("graph", {})
    else:
        logger.info("Choregraph spec unchanged.")

    if external_inputs: self.external_inputs = external_inputs

get_inputs

get_inputs()

Get list of input specifications as (id, name) tuples.

Source code in src/choregraph/choregraph.py
def get_inputs(self) -> List[Tuple[str, str]]:
    """Get list of input specifications as (id, name) tuples."""
    return [(str(inp.id), self.spec.get_name(inp.id)) for inp in self.spec.inputs]

get_visibles

get_visibles()

Get list of datasets marked as visible (visibility=True) as (id, name) tuples.

Source code in src/choregraph/choregraph.py
def get_visibles(self) -> List[Tuple[str, str]]:
    """Get list of datasets marked as visible (visibility=True) as (id, name) tuples."""
    result = []
    seen_ids = set()

    for inp in self.spec.inputs:
        if inp.visibility:
            inp_id = str(inp.id)
            if inp_id not in seen_ids:
                seen_ids.add(inp_id)
                result.append((inp_id, self.spec.get_name(inp.id)))

    for n in self.spec.nodes:
        for op in n.output_ports:
            if op.visibility:
                op_id = str(op.id)
                if op_id not in seen_ids:
                    seen_ids.add(op_id)
                    result.append((op_id, self.spec.get_name(op.id)))

    return result

get_leaves

get_leaves()

Get list of terminal output ports (not consumed by any downstream node) as (id, name) tuples.

Note: Only returns output ports from nodes, not inputs. If there are no nodes, returns an empty list (inputs are already inputs, they don't need promotion).

Source code in src/choregraph/choregraph.py
def get_leaves(self) -> List[Tuple[str, str]]:
    """Get list of terminal output ports (not consumed by any downstream node) as (id, name) tuples.

    Note: Only returns output ports from nodes, not inputs. If there are no nodes,
    returns an empty list (inputs are already inputs, they don't need promotion).
    """
    referenced_ids = set()

    for n in self.spec.nodes:
        for port in n.input_ports:
            if port.source_ref is not None:
                referenced_ids.add(str(port.source_ref))

    leaf_ids = []
    for n in self.spec.nodes:
        for op in n.output_ports:
            if str(op.id) not in referenced_ids:
                leaf_ids.append(str(op.id))

    return [(leaf_id, self.spec.get_name(leaf_id)) for leaf_id in leaf_ids]

find_node_for_output_port

find_node_for_output_port(output_port_id)

Find the node that owns a given output port ID.

PARAMETER DESCRIPTION
output_port_id

The ID of the output port

TYPE: str

RETURNS DESCRIPTION
Optional[NodeSpec]

The NodeSpec containing this output port, or None if not found

Source code in src/choregraph/choregraph.py
def find_node_for_output_port(self, output_port_id: str) -> Optional[NodeSpec]:
    """Find the node that owns a given output port ID.

    Args:
        output_port_id: The ID of the output port

    Returns:
        The NodeSpec containing this output port, or None if not found
    """
    port_id_str = str(output_port_id)
    for n in self.spec.nodes:
        for op in n.output_ports:
            if str(op.id) == port_id_str:
                return n
    return None

give_id

give_id()

Give the next available integer ID as a string. This can be used for generating unique IDs for nodes and ports.

Source code in src/choregraph/choregraph.py
def give_id(self) -> str: 
    """
    Give the next available integer ID as a string. This can be used for generating unique IDs for nodes and ports.
    """
    existing_ids = set()
    for inp in self.spec.inputs:
        try:
            existing_ids.add(int(inp.id))
        except ValueError:
            pass
    for n in self.spec.nodes:
        try:
            existing_ids.add(int(n.id))
        except ValueError:
            pass
        for op in n.output_ports:
            try:
                existing_ids.add(int(op.id))
            except ValueError:
                pass
    next_id = max(existing_ids, default=0) + 1
    return str(next_id)

promote_leaves

promote_leaves(remove_source_nodes=True)

Promote all leaf outputs as inputs, optionally removing their source nodes.

For each terminal output port (not consumed downstream): - Single-file outputs are promoted via :meth:_promote_output. - Partitioned outputs are promoted via :meth:_promote_partitioned.

Nodes are only removed when all their outputs were successfully promoted.

RETURNS DESCRIPTION
List[Tuple[str, str]]

List of (id, name) tuples for the promoted inputs.

Source code in src/choregraph/choregraph.py
def promote_leaves(self, remove_source_nodes: bool = True) -> List[Tuple[str, str]]:
    """Promote all leaf outputs as inputs, optionally removing their source nodes.

    For each terminal output port (not consumed downstream):
    - Single-file outputs are promoted via :meth:`_promote_output`.
    - Partitioned outputs are promoted via :meth:`_promote_partitioned`.

    Nodes are only removed when **all** their outputs were successfully promoted.

    Returns:
        List of ``(id, name)`` tuples for the promoted inputs.
    """
    leaves = self.get_leaves()
    promoted: List[Tuple[str, str]] = []
    nodes_to_remove: set[str] = set()
    nodes_with_failures: set[str] = set()

    for leaf_id, leaf_name in leaves:
        source_node = self.find_node_for_output_port(leaf_id)
        source_node_id = str(source_node.id) if source_node else None
        success = False

        try:
            self._promote_output(output_port_id=leaf_id)
            promoted.append((leaf_id, leaf_name))
            success = True
        except ValueError:
            partitioned = self._promote_partitioned(leaf_id, leaf_name)
            if partitioned:
                promoted.extend(partitioned)
                success = True
            else:
                logger.warning(f"Could not promote leaf '{leaf_name}' (ID: {leaf_id}): no files found")

        if source_node_id:
            (nodes_to_remove if success else nodes_with_failures).add(source_node_id)

    if remove_source_nodes:
        for node_id in nodes_to_remove - nodes_with_failures:
            try:
                self.remove_node(node_id)
                logger.info(f"Removed transformation node ID: {node_id}")
            except Exception as e:
                logger.warning(f"Could not remove node {node_id}: {e}")

    # Clean up raw container inputs (Excel, JSON) whose conversion nodes
    # have been promoted and removed above.
    if promoted:
        self._remove_non_processable_inputs()

    return promoted

pushd

pushd(new_dir)

Context manager to temporarily change the working directory. Restores the original directory automatically when exiting the 'with' block.

Source code in src/choregraph/choregraph.py
@contextlib.contextmanager
def pushd(new_dir):
    """
    Context manager to temporarily change the working directory.
    Restores the original directory automatically when exiting the 'with' block.
    """
    previous_dir = os.getcwd()
    os.chdir(safe_path(new_dir))
    try:
        yield
    finally:
        os.chdir(previous_dir)