Skip to content

Viz Server

Manages the Kedro Viz server lifecycle — starting, stopping, port checking, and readiness polling. Provides custom CSS styling for the Kedro Viz UI and utilities for generating dummy graph structures.

viz

Kedro Viz server management and graph utilities.

Manages the Kedro Viz subprocess lifecycle (start, stop, port checking, readiness polling) and provides custom CSS for UI styling. Also includes a helper to generate dummy graph structures for file-only views.

KedroVizServer

KedroVizServer(port=4141, custom_css=None)

Manages a Kedro Viz server instance.

Source code in src/choregraph/viz.py
def __init__(self, port: int = 4141, custom_css: Optional[str] = None):
    self.port = port
    self.process = None
    self._atexit_registered = False
    self._ready = False
    self._custom_css = custom_css or DEFAULT_KEDRO_CUSTOM_CSS
    self._project_path: Optional[Path] = None
    self._stable_wrapper: Optional[Path] = None
    limits = httpx.Limits(max_connections=200, max_keepalive_connections=20)
    self._client = httpx.AsyncClient(limits=limits, verify=False)

is_ready async

is_ready()

Check if the Kedro Viz server is ready to accept connections.

Source code in src/choregraph/viz.py
async def is_ready(self) -> bool:
    """Check if the Kedro Viz server is ready to accept connections."""
    return self._ready and await self._is_http_ready()

wait_until_ready async

wait_until_ready(timeout=30.0, poll_interval=0.5)

Wait until the server is ready or timeout is reached.

PARAMETER DESCRIPTION
timeout

Maximum time to wait in seconds

TYPE: float DEFAULT: 30.0

poll_interval

Time between checks in seconds

TYPE: float DEFAULT: 0.5

RETURNS DESCRIPTION
bool

True if server is ready, False if timeout reached

Source code in src/choregraph/viz.py
async def wait_until_ready(
    self, timeout: float = 30.0, poll_interval: float = 0.5
) -> bool:
    """Wait until the server is ready or timeout is reached.

    Args:
        timeout: Maximum time to wait in seconds
        poll_interval: Time between checks in seconds

    Returns:
        True if server is ready, False if timeout reached
    """
    start = time.time()
    while time.time() - start < timeout:
        # First check if port is bound (fast check)
        if self._is_port_in_use(self.port):
            # Then check if HTTP server is actually responding
            if await self._is_http_ready():
                self._ready = True
                return True
        await asyncio.sleep(poll_interval)
    return False

get_custom_css

get_custom_css()

Get the custom CSS for Kedro Viz styling.

Source code in src/choregraph/viz.py
def get_custom_css(self) -> str:
    """Get the custom CSS for Kedro Viz styling."""
    return self._custom_css

set_custom_css

set_custom_css(css)

Set custom CSS for Kedro Viz styling.

Source code in src/choregraph/viz.py
def set_custom_css(self, css: str):
    """Set custom CSS for Kedro Viz styling."""
    self._custom_css = css

switch_project

switch_project(new_wrapper_path)

Sync files from a new room's pipeline to trigger autoreload.

Call :meth:wait_for_switch afterwards to wait for the server to restart with the new data.

Source code in src/choregraph/viz.py
def switch_project(self, new_wrapper_path: Union[str, Path]):
    """Sync files from a new room's pipeline to trigger autoreload.

    Call :meth:`wait_for_switch` afterwards to wait for the server to
    restart with the new data.
    """
    self._sync_wrapper(safe_path(new_wrapper_path))
    self._ready = False

trigger_reload

trigger_reload()

Touch a config file in the stable wrapper to trigger autoreload.

Call this after a pipeline run to ensure kedro-viz picks up new output files for previews.

Source code in src/choregraph/viz.py
def trigger_reload(self):
    """Touch a config file in the stable wrapper to trigger autoreload.

    Call this after a pipeline run to ensure kedro-viz picks up
    new output files for previews.
    """
    if self._stable_wrapper and self._stable_wrapper.exists():
        target = self._stable_wrapper / "conf" / "base" / "catalog.yml"
        if target.exists():
            target.touch()

wait_for_switch async

wait_for_switch(timeout=20.0)

Wait for autoreload to restart the server after a switch_project().

Phase 1: wait for the old child to die (port becomes free). Phase 2: wait for the new child to become ready.

Source code in src/choregraph/viz.py
async def wait_for_switch(self, timeout: float = 20.0) -> bool:
    """Wait for autoreload to restart the server after a switch_project().

    Phase 1: wait for the old child to die (port becomes free).
    Phase 2: wait for the new child to become ready.
    """
    start = time.time()

    # Phase 1 — old child should die within ~2s of file change
    for _ in range(6):
        await asyncio.sleep(0.3)
        if not self._is_port_in_use(self.port):
            break

    # Phase 2 — new child binds and responds
    remaining = max(timeout - (time.time() - start), 5.0)
    return await self.wait_until_ready(timeout=remaining, poll_interval=0.3)

start

start(project_path)

Start the Kedro Viz server.

Copies the source pipeline into a stable directory and launches kedro viz run --autoreload from there. Subsequent room switches only need :meth:switch_project (no process restart from our side).

PARAMETER DESCRIPTION
project_path

Path to the room's pipeline directory.

TYPE: Union[str, Path]

RETURNS DESCRIPTION
bool

True if server started successfully or is already running, False on failure.

Source code in src/choregraph/viz.py
def start(self, project_path: Union[str, Path]) -> bool:
    """Start the Kedro Viz server.

    Copies the source pipeline into a stable directory and launches
    ``kedro viz run --autoreload`` from there.  Subsequent room switches
    only need :meth:`switch_project` (no process restart from our side).

    Args:
        project_path: Path to the room's pipeline directory.

    Returns:
        True if server started successfully or is already running, False on failure.
    """
    project_path = safe_path(project_path)

    # Sync source wrapper into the stable directory
    self._sync_wrapper(project_path)
    stable = self._ensure_stable_wrapper()

    if self.process:
        # Already running from the stable dir — nothing to do
        if self._project_path == stable:
            self._ready = True
            return True
        # Different stable path (shouldn't happen) — stop first
        logger.info(f"Project path changed, restarting Kedro Viz on port {self.port}")
        self.stop()

    # Kill any orphan process holding the port (non-blocking — just SIGKILL
    # and proceed; the new server will bind with SO_REUSEADDR).
    if self._is_port_in_use(self.port):
        logger.warning(f"Port {self.port} occupied, killing orphan processes...")
        self._kill_process_on_port(self.port)
        # Brief non-blocking wait — just enough for the kernel to clean up
        time.sleep(0.3)

    # Check if kedro viz is available
    if not shutil.which("kedro"):
        print("Kedro CLI not found. Cannot start Kedro Viz.")
        return False

    if not stable.exists():
        print(f"Stable wrapper path {stable} does not exist.")
        return False

    self._project_path = stable

    cmd = [
        "kedro",
        "viz",
        "run",
        "--port",
        str(self.port),
        "--no-browser",
        "--host",
        "0.0.0.0",
        "--include-hooks",
        "--autoreload",
    ]

    try:
        # start_new_session=True creates a process group so stop() can
        # kill the entire tree (kedro + uvicorn workers) at once.
        self.process = subprocess.Popen(
            cmd,
            cwd=str(stable),
            stdout=subprocess.DEVNULL,
            stderr=subprocess.DEVNULL,
            start_new_session=True,
        )
        # Register cleanup on exit
        if not self._atexit_registered:
            atexit.register(self.stop)
            self._atexit_registered = True
        return True
    except Exception as e:
        logger.error(f"Failed to start Kedro Viz: {e}")
        self.process = None
        return False

generate_dummy_graph

generate_dummy_graph(file_paths)

Generates a dummy Kedro Viz graph structure for a list of files. Each file becomes a data node. No edges or tasks.

Source code in src/choregraph/viz.py
def generate_dummy_graph(file_paths: list[str]) -> dict:
    """
    Generates a dummy Kedro Viz graph structure for a list of files.
    Each file becomes a data node. No edges or tasks.
    """
    nodes = []

    for fpath in file_paths:
        # Normalize the path to prevent directory traversal
        fname = Path(fpath).name
        # Sanitize the filename to remove any dangerous characters
        fname = sanitize_filename(fname)
        # Create a deterministic but unique-ish ID based on filename
        node_id = f"data_{fname.replace('.', '_').replace('-', '_')}"

        nodes.append(
            {
                "id": node_id,
                "name": fname,
                "tags": [],
                "pipelines": ["choregraph", "__default__"],
                "type": "data",
                "modular_pipelines": None,
                "node_extras": {
                    "stats": {"rows": 0, "columns": 0, "file_size": 0},
                    "styles": None,
                },
                "layer": None,
                "dataset_type": "pandas.csv_dataset.CSVDataset",
            }
        )

    return {
        "data": {
            "nodes": nodes,
            "edges": [],
            "layers": [],
            "tags": [],
            "pipelines": [
                {"id": "__default__", "name": "__default__"},
                {"id": "choregraph", "name": "choregraph"},
            ],
            "modular_pipelines": {
                "__root__": {
                    "id": "__root__",
                    "name": "__root__",
                    "inputs": [],
                    "outputs": [],
                    "children": [],
                }
            },
            "selected_pipeline": "__default__",
        }
    }