Skip to content

Graph Operations

This section documents the graph operation endpoints and their functionality.

backend.routes.graph_ops

This module provides API endpoints and helper functions for managing node attributes and relations in the NDF Studio knowledge graph backend.

Key Features: - All node data is stored as JSON files under graph_data/users/{user_id}/nodes/{node_id}.json. - Attribute and relation types are validated against global schema files (attribute_types.json, relation_types.json). - Endpoints support full CRUD (create, update, delete) for both attributes and relations. - When creating or updating a relation, if the source or target node does not exist, the canonical create_node function from nodes.py is called to ensure proper node creation and registry updates. - All endpoints are designed for robust integration with the frontend, supporting both selection and creation of new nodes/relations/attributes.

Endpoints: - POST /users/{user_id}/graphs/{graph_id}/attribute/create - PUT /users/{user_id}/graphs/{graph_id}/attribute/update/{node_id}/{attr_name} - DELETE /users/{user_id}/graphs/{graph_id}/attribute/delete/{node_id}/{attr_name} - POST /users/{user_id}/graphs/{graph_id}/relation/create - PUT /users/{user_id}/graphs/{graph_id}/relation/update/{source}/{name}/{target} - DELETE /users/{user_id}/graphs/{graph_id}/relation/delete/{source}/{name}/{target}

Helpers: - node_path, load_node, save_node: JSON-based node storage helpers. - load_schema: Loads global schema files for validation.

All logic is designed to be robust, extensible, and consistent with the rest of the backend.

Classes

Functions

get_attribute_node(user_id: str, graph_id: str, attribute_id: str)

Get a specific attribute node by its ID

Source code in backend/routes/graph_ops.py
@router.get("/users/{user_id}/graphs/{graph_id}/attributeNodes/{attribute_id}")
def get_attribute_node(user_id: str, graph_id: str, attribute_id: str):
    """Get a specific attribute node by its ID"""
    attr_path = f"graph_data/users/{user_id}/attributeNodes/{attribute_id}.json"
    if not os.path.exists(attr_path):
        raise HTTPException(status_code=404, detail="AttributeNode not found")
    with open(attr_path, "r") as f:
        return json.load(f)

unlist_attribute_from_morph(user_id: str, graph_id: str, node_id: str, attr_name: str, request: MorphOperationRequest)

Remove an attribute from a specific morph without deleting the attribute itself. The attribute continues to exist in other morphs.

Source code in backend/routes/graph_ops.py
@router.post("/users/{user_id}/graphs/{graph_id}/attribute/unlist_from_morph/{node_id}/{attr_name}")
def unlist_attribute_from_morph(user_id: str, graph_id: str, node_id: str, attr_name: str, request: MorphOperationRequest):
    """
    Remove an attribute from a specific morph without deleting the attribute itself.
    The attribute continues to exist in other morphs.
    """
    try:
        with graph_transaction(user_id, graph_id, "unlist_attribute_from_morph") as backup_dir:
            morph_id = request.morph_id
            # Find the attributeNode id by node_id and attr_name
            reg_path = Path(f"graph_data/users/{user_id}/attribute_registry.json")
            registry = load_registry(reg_path)
            attr_id = None
            for k, v in registry.items():
                if v.get("source_id") == node_id and v.get("name") == attr_name:
                    attr_id = k
                    break
            if not attr_id:
                raise HTTPException(status_code=404, detail="AttributeNode not found")

            # Update source node to remove attribute from specific morph
            source_node_path = Path(f"graph_data/users/{user_id}/nodes/{node_id}.json")
            if not source_node_path.exists():
                raise HTTPException(status_code=404, detail="Source node not found")

            source_node = load_json_file(source_node_path)

            # Find the specific morph and remove the attribute
            morph_found = False
            for morph in source_node.get("morphs", []):
                if morph.get("morph_id") == morph_id:
                    morph_found = True
                    if "attributeNode_ids" in morph and attr_id in morph["attributeNode_ids"]:
                        morph["attributeNode_ids"].remove(attr_id)
                        break

            if not morph_found:
                raise HTTPException(status_code=404, detail=f"Morph {morph_id} not found")

            # Atomically save updated source node
            atomic_node_save(user_id, node_id, source_node)

            # Regenerate composed files atomically
            try:
                node_ids = get_graph_node_ids(user_id, graph_id)
                metadata_path = Path(f"graph_data/users/{user_id}/graphs/{graph_id}/metadata.yaml")
                graph_description = ""
                if metadata_path.exists():
                    import yaml
                    with open(metadata_path, "r") as f:
                        metadata = yaml.safe_load(f) or {}
                        graph_description = metadata.get("description", "")

                composed_data = compose_graph(user_id, graph_id, node_ids, graph_description)
                if composed_data:
                    atomic_composed_save(user_id, graph_id, composed_data["cytoscape"], "json")
                    atomic_composed_save(user_id, graph_id, composed_data["cytoscape"], "yaml")
                    atomic_composed_save(user_id, graph_id, composed_data["polymorphic"], "polymorphic")
            except Exception as e:
                print(f"Warning: Failed to regenerate composed files: {e}")

            return {"status": "Attribute unlisted from morph", "attribute_id": attr_id, "morph_id": morph_id}

    except AtomicityError as e:
        raise HTTPException(status_code=500, detail=f"Atomic operation failed: {str(e)}")
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Failed to unlist attribute from morph: {str(e)}")

add_attribute_to_morph(user_id: str, graph_id: str, node_id: str, attr_name: str, request: MorphOperationRequest)

Add an existing attribute to a specific morph.

Source code in backend/routes/graph_ops.py
@router.post("/users/{user_id}/graphs/{graph_id}/attribute/add_to_morph/{node_id}/{attr_name}")
def add_attribute_to_morph(user_id: str, graph_id: str, node_id: str, attr_name: str, request: MorphOperationRequest):
    """
    Add an existing attribute to a specific morph.
    """
    try:
        with graph_transaction(user_id, graph_id, "add_attribute_to_morph") as backup_dir:
            morph_id = request.morph_id
            # Find the attributeNode id by node_id and attr_name
            reg_path = Path(f"graph_data/users/{user_id}/attribute_registry.json")
            registry = load_registry(reg_path)
            attr_id = None
            for k, v in registry.items():
                if v.get("source_id") == node_id and v.get("name") == attr_name:
                    attr_id = k
                    break
            if not attr_id:
                raise HTTPException(status_code=404, detail="AttributeNode not found")

            # Update source node to add attribute to specific morph
            source_node_path = Path(f"graph_data/users/{user_id}/nodes/{node_id}.json")
            if not source_node_path.exists():
                raise HTTPException(status_code=404, detail="Source node not found")

            source_node = load_json_file(source_node_path)

            # Find the specific morph and add the attribute
            morph_found = False
            for morph in source_node.get("morphs", []):
                if morph.get("morph_id") == morph_id:
                    morph_found = True
                    if "attributeNode_ids" not in morph:
                        morph["attributeNode_ids"] = []
                    if attr_id not in morph["attributeNode_ids"]:
                        morph["attributeNode_ids"].append(attr_id)
                    break

            if not morph_found:
                raise HTTPException(status_code=404, detail=f"Morph {morph_id} not found")

            # Atomically save updated source node
            atomic_node_save(user_id, node_id, source_node)

            # Regenerate composed files atomically
            try:
                node_ids = get_graph_node_ids(user_id, graph_id)
                metadata_path = Path(f"graph_data/users/{user_id}/graphs/{graph_id}/metadata.yaml")
                graph_description = ""
                if metadata_path.exists():
                    import yaml
                    with open(metadata_path, "r") as f:
                        metadata = yaml.safe_load(f) or {}
                        graph_description = metadata.get("description", "")

                composed_data = compose_graph(user_id, graph_id, node_ids, graph_description)
                if composed_data:
                    atomic_composed_save(user_id, graph_id, composed_data["cytoscape"], "json")
                    atomic_composed_save(user_id, graph_id, composed_data["cytoscape"], "yaml")
                    atomic_composed_save(user_id, graph_id, composed_data["polymorphic"], "polymorphic")
            except Exception as e:
                print(f"Warning: Failed to regenerate composed files: {e}")

            return {"status": "Attribute added to morph", "attribute_id": attr_id, "morph_id": morph_id}

    except AtomicityError as e:
        raise HTTPException(status_code=500, detail=f"Atomic operation failed: {str(e)}")
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Failed to add attribute to morph: {str(e)}")

move_attribute_to_morph(user_id: str, graph_id: str, node_id: str, attr_name: str, request: MoveMorphRequest)

Move an attribute from one morph to another.

Source code in backend/routes/graph_ops.py
@router.post("/users/{user_id}/graphs/{graph_id}/attribute/move_to_morph/{node_id}/{attr_name}")
def move_attribute_to_morph(user_id: str, graph_id: str, node_id: str, attr_name: str, request: MoveMorphRequest):
    """
    Move an attribute from one morph to another.
    """
    try:
        print(f"[DEBUG] move_attribute_to_morph called: user_id={user_id}, graph_id={graph_id}, node_id={node_id}, attr_name={attr_name}, from_morph_id={request.from_morph_id}, to_morph_id={request.to_morph_id}")
        with graph_transaction(user_id, graph_id, "move_attribute_to_morph") as backup_dir:
            from_morph_id = request.from_morph_id
            to_morph_id = request.to_morph_id
            # Find the attributeNode id by node_id and attr_name
            reg_path = Path(f"graph_data/users/{user_id}/attribute_registry.json")
            registry = load_registry(reg_path)
            attr_id = None
            for k, v in registry.items():
                if v.get("source_id") == node_id and v.get("name") == attr_name:
                    attr_id = k
                    break
            print(f"[DEBUG] Found attr_id: {attr_id}")
            if not attr_id:
                raise HTTPException(status_code=404, detail="AttributeNode not found")

            # Update source node to move attribute between morphs
            source_node_path = Path(f"graph_data/users/{user_id}/nodes/{node_id}.json")
            if not source_node_path.exists():
                raise HTTPException(status_code=404, detail="Source node not found")

            source_node = load_json_file(source_node_path)

            # Find both morphs
            from_morph = None
            to_morph = None
            for morph in source_node.get("morphs", []):
                if morph.get("morph_id") == from_morph_id:
                    from_morph = morph
                if morph.get("morph_id") == to_morph_id:
                    to_morph = morph
            print(f"[DEBUG] from_morph: {from_morph}")
            print(f"[DEBUG] to_morph: {to_morph}")
            if not from_morph:
                raise HTTPException(status_code=404, detail=f"Source morph {from_morph_id} not found")
            if not to_morph:
                raise HTTPException(status_code=404, detail=f"Target morph {to_morph_id} not found")
            print(f"[DEBUG] from_morph attributeNode_ids before: {from_morph.get('attributeNode_ids', [])}")
            print(f"[DEBUG] to_morph attributeNode_ids before: {to_morph.get('attributeNode_ids', [])}")
            # Remove from source morph
            if "attributeNode_ids" in from_morph and attr_id in from_morph["attributeNode_ids"]:
                from_morph["attributeNode_ids"].remove(attr_id)
            # Add to target morph
            if "attributeNode_ids" not in to_morph:
                to_morph["attributeNode_ids"] = []
            if attr_id not in to_morph["attributeNode_ids"]:
                to_morph["attributeNode_ids"].append(attr_id)
            print(f"[DEBUG] from_morph attributeNode_ids after: {from_morph.get('attributeNode_ids', [])}")
            print(f"[DEBUG] to_morph attributeNode_ids after: {to_morph.get('attributeNode_ids', [])}")
            # Atomically save updated source node
            atomic_node_save(user_id, node_id, source_node)
            # Regenerate composed files atomically
            try:
                node_ids = get_graph_node_ids(user_id, graph_id)
                metadata_path = Path(f"graph_data/users/{user_id}/graphs/{graph_id}/metadata.yaml")
                graph_description = ""
                if metadata_path.exists():
                    import yaml
                    with open(metadata_path, "r") as f:
                        metadata = yaml.safe_load(f) or {}
                        graph_description = metadata.get("description", "")
                composed_data = compose_graph(user_id, graph_id, node_ids, graph_description)
                if composed_data:
                    atomic_composed_save(user_id, graph_id, composed_data["cytoscape"], "json")
                    atomic_composed_save(user_id, graph_id, composed_data["cytoscape"], "yaml")
                    atomic_composed_save(user_id, graph_id, composed_data["polymorphic"], "polymorphic")
            except Exception as e:
                print(f"Warning: Failed to regenerate composed files: {e}")
            return {"status": "Attribute moved between morphs", "attribute_id": attr_id, "from_morph_id": from_morph_id, "to_morph_id": to_morph_id}
    except AtomicityError as e:
        raise HTTPException(status_code=500, detail=f"Atomic operation failed: {str(e)}")
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Failed to move attribute between morphs: {str(e)}")

copy_attribute_to_morph(user_id: str, graph_id: str, node_id: str, attr_name: str, request: MorphOperationRequest)

Copy an existing attribute to a specific morph (keeps it in all other morphs as well).

Source code in backend/routes/graph_ops.py
@router.post("/users/{user_id}/graphs/{graph_id}/attribute/copy_to_morph/{node_id}/{attr_name}")
def copy_attribute_to_morph(user_id: str, graph_id: str, node_id: str, attr_name: str, request: MorphOperationRequest):
    """
    Copy an existing attribute to a specific morph (keeps it in all other morphs as well).
    """
    try:
        with graph_transaction(user_id, graph_id, "copy_attribute_to_morph") as backup_dir:
            morph_id = request.morph_id
            # Find the attributeNode id by node_id and attr_name
            reg_path = Path(f"graph_data/users/{user_id}/attribute_registry.json")
            registry = load_registry(reg_path)
            attr_id = None
            for k, v in registry.items():
                if v.get("source_id") == node_id and v.get("name") == attr_name:
                    attr_id = k
                    break

            if not attr_id:
                raise HTTPException(status_code=404, detail="Attribute not found")

            # Load the source node
            source_node_path = Path(f"graph_data/users/{user_id}/nodes/{node_id}.json")
            if not source_node_path.exists():
                raise HTTPException(status_code=404, detail="Source node not found")

            source_node = load_json_file(source_node_path)

            # Ensure morphs array exists
            if "morphs" not in source_node:
                source_node["morphs"] = []

            # Find the target morph
            target_morph = None
            for morph in source_node["morphs"]:
                if morph.get("morph_id") == morph_id:
                    target_morph = morph
                    break

            if not target_morph:
                raise HTTPException(status_code=404, detail="Target morph not found")

            # Ensure attributeNode_ids array exists
            if "attributeNode_ids" not in target_morph:
                target_morph["attributeNode_ids"] = []

            # Add attribute to the morph if not already present
            if attr_id not in target_morph["attributeNode_ids"]:
                target_morph["attributeNode_ids"].append(attr_id)

                # Atomically save the updated node
                atomic_node_save(user_id, node_id, source_node)

                # Regenerate composed files
                try:
                    node_ids = get_graph_node_ids(user_id, graph_id)
                    metadata_path = Path(f"graph_data/users/{user_id}/graphs/{graph_id}/metadata.yaml")
                    graph_description = ""
                    if metadata_path.exists():
                        import yaml
                        with open(metadata_path, "r") as f:
                            metadata = yaml.safe_load(f) or {}
                            graph_description = metadata.get("description", "")

                    composed_data = compose_graph(user_id, graph_id, node_ids, graph_description)
                    if composed_data:
                        atomic_composed_save(user_id, graph_id, composed_data["cytoscape"], "json")
                        atomic_composed_save(user_id, graph_id, composed_data["cytoscape"], "yaml")
                        atomic_composed_save(user_id, graph_id, composed_data["polymorphic"], "polymorphic")
                except Exception as e:
                    print(f"Warning: Failed to regenerate composed files: {e}")

                return {"status": "Attribute copied to morph", "attribute_id": attr_id, "morph_id": morph_id}
            else:
                return {"status": "Attribute already exists in target morph", "attribute_id": attr_id, "morph_id": morph_id}

    except AtomicityError as e:
        raise HTTPException(status_code=500, detail=f"Atomic operation failed: {str(e)}")
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Failed to copy attribute to morph: {str(e)}")

list_attributes_by_morph(user_id: str, graph_id: str, node_id: str)

List all attributes organized by morph for a given node.

Source code in backend/routes/graph_ops.py
@router.get("/users/{user_id}/graphs/{graph_id}/attribute/list_by_morph/{node_id}")
def list_attributes_by_morph(user_id: str, graph_id: str, node_id: str):
    """
    List all attributes organized by morph for a given node.
    """
    try:
        # Load source node
        source_node_path = Path(f"graph_data/users/{user_id}/nodes/{node_id}.json")
        if not source_node_path.exists():
            raise HTTPException(status_code=404, detail="Source node not found")

        source_node = load_json_file(source_node_path)

        # Load attribute registry
        reg_path = Path(f"graph_data/users/{user_id}/attribute_registry.json")
        registry = load_registry(reg_path)

        # Organize attributes by morph
        morph_attributes = {}
        for morph in source_node.get("morphs", []):
            morph_id = morph.get("morph_id")
            morph_name = morph.get("name", "Unknown")
            morph_attributes[morph_id] = {
                "morph_id": morph_id,
                "morph_name": morph_name,
                "attributes": []
            }

            for attr_id in morph.get("attributeNode_ids", []):
                if attr_id in registry:
                    attr_info = registry[attr_id]

                    # Load full attribute data from the attribute file
                    attr_file_path = Path(f"graph_data/users/{user_id}/attributeNodes/{attr_id}.json")
                    full_attr_data = {}
                    if attr_file_path.exists():
                        try:
                            full_attr_data = load_json_file(attr_file_path)
                        except Exception as e:
                            print(f"Warning: Failed to load attribute file {attr_id}: {e}")

                    # Combine registry info with full attribute data
                    attribute_data = {
                        "attribute_id": attr_id,
                        "name": attr_info.get("name"),
                        "source_id": attr_info.get("source_id"),
                        "value": full_attr_data.get("value"),
                        "unit": full_attr_data.get("unit"),
                        "adverb": full_attr_data.get("adverb"),
                        "modality": full_attr_data.get("modality")
                    }

                    morph_attributes[morph_id]["attributes"].append(attribute_data)

        return {"node_id": node_id, "morphs": morph_attributes}

    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Failed to list attributes by morph: {str(e)}")

list_relations_by_morph(user_id: str, graph_id: str, node_id: str)

List all relations organized by morph for a given node.

Source code in backend/routes/graph_ops.py
@router.get("/users/{user_id}/graphs/{graph_id}/relation/list_by_morph/{node_id}")
def list_relations_by_morph(user_id: str, graph_id: str, node_id: str):
    """
    List all relations organized by morph for a given node.
    """
    try:
        # Load source node
        source_node_path = Path(f"graph_data/users/{user_id}/nodes/{node_id}.json")
        if not source_node_path.exists():
            raise HTTPException(status_code=404, detail="Source node not found")

        source_node = load_json_file(source_node_path)

        # Load relation registry
        reg_path = Path(f"graph_data/users/{user_id}/relation_registry.json")
        registry = load_registry(reg_path)

        # Organize relations by morph
        morph_relations = {}
        for morph in source_node.get("morphs", []):
            morph_id = morph.get("morph_id")
            morph_name = morph.get("name", "Unknown")
            morph_relations[morph_id] = {
                "morph_id": morph_id,
                "morph_name": morph_name,
                "relations": []
            }

            for rel_id in morph.get("relationNode_ids", []):
                if rel_id in registry:
                    rel_info = registry[rel_id]

                    # Load full relation data from the relation file
                    rel_file_path = Path(f"graph_data/users/{user_id}/relationNodes/{rel_id}.json")
                    full_rel_data = {}
                    if rel_file_path.exists():
                        try:
                            full_rel_data = load_json_file(rel_file_path)
                        except Exception as e:
                            print(f"Warning: Failed to load relation file {rel_id}: {e}")

                    # Combine registry info with full relation data
                    relation_data = {
                        "relation_id": rel_id,
                        "name": rel_info.get("name"),
                        "source_id": rel_info.get("source_id"),
                        "target_id": rel_info.get("target_id"),
                        "adverb": full_rel_data.get("adverb"),
                        "modality": full_rel_data.get("modality")
                    }

                    morph_relations[morph_id]["relations"].append(relation_data)

        return {"node_id": node_id, "morphs": morph_relations}

    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Failed to list relations by morph: {str(e)}")

create_morph(user_id: str, graph_id: str, request: CreateMorphRequest)

Create a new morph for a node.

Scenarios: 1. Empty morph: copy_from_morph is None - creates morph with empty properties 2. Copy from existing morph: copy_from_morph specifies any existing morph to copy from 3. Node context is always required - morphs must belong to a node

Source code in backend/routes/graph_ops.py
@router.post("/users/{user_id}/graphs/{graph_id}/morph/create")
def create_morph(user_id: str, graph_id: str, request: CreateMorphRequest):
    """
    Create a new morph for a node.

    Scenarios:
    1. Empty morph: copy_from_morph is None - creates morph with empty properties
    2. Copy from existing morph: copy_from_morph specifies any existing morph to copy from
    3. Node context is always required - morphs must belong to a node
    """
    try:
        print(f"DEBUG: Morph creation called with:")
        print(f"  user_id: {user_id}")
        print(f"  graph_id: {graph_id}")
        print(f"  node_id: {request.node_id}")
        print(f"  name: {request.name}")
        print(f"  copy_from_morph: {request.copy_from_morph}")
        print(f"  auto-generated morph_id: {request.name}_{request.node_id}")

        with graph_transaction(user_id, graph_id, "create_morph") as backup_dir:
            # Auto-generate morph_id from name and node_id
            morph_id = f"{request.name}_{request.node_id}"
            node_id = request.node_id
            morph_name = request.name
            copy_from_morph = request.copy_from_morph

            # Load the source node (required context)
            source_node_path = Path(f"graph_data/users/{user_id}/nodes/{node_id}.json")
            if not source_node_path.exists():
                raise HTTPException(status_code=404, detail=f"Node {node_id} not found")

            with open(source_node_path, 'r') as f:
                source_node = json.load(f)

            # Check if morph already exists
            existing_morph = None
            for morph in source_node.get("morphs", []):
                if morph.get("morph_id") == morph_id:
                    existing_morph = morph
                    break

            if existing_morph:
                # Morph already exists, but we need to ensure registries are updated
                print(f"DEBUG: Morph {morph_id} already exists, checking registry updates")

                # If copying from another morph, ensure registries are updated
                if copy_from_morph:
                    print(f"DEBUG: Updating registries for existing morph {morph_id}")

                    # Update registries to include the morph_id for all relations and attributes
                    # Update relation registry
                    rel_reg_path = Path(f"graph_data/users/{user_id}/relation_registry.json")
                    if rel_reg_path.exists():
                        rel_registry = load_json_file(rel_reg_path)
                        for rel_id in existing_morph.get("relationNode_ids", []):
                            if rel_id in rel_registry:
                                current_morph_ids = rel_registry[rel_id].get("morph_id", [])
                                if isinstance(current_morph_ids, str):
                                    current_morph_ids = [current_morph_ids]
                                if morph_id not in current_morph_ids:
                                    current_morph_ids.append(morph_id)
                                    rel_registry[rel_id]["morph_id"] = current_morph_ids
                        atomic_registry_save(user_id, "relation", rel_registry)

                    # Update attribute registry
                    attr_reg_path = Path(f"graph_data/users/{user_id}/attribute_registry.json")
                    if attr_reg_path.exists():
                        attr_registry = load_json_file(attr_reg_path)
                        for attr_id in existing_morph.get("attributeNode_ids", []):
                            if attr_id in attr_registry:
                                current_morph_ids = attr_registry[attr_id].get("morph_id", [])
                                if isinstance(current_morph_ids, str):
                                    current_morph_ids = [current_morph_ids]
                                if morph_id not in current_morph_ids:
                                    current_morph_ids.append(morph_id)
                                    attr_registry[attr_id]["morph_id"] = current_morph_ids
                        atomic_registry_save(user_id, "attribute", attr_registry)

                # Return success if morph already exists (idempotent behavior)
                return {
                    "status": "Morph already exists",
                    "morph_id": morph_id,
                    "node_id": node_id,
                    "name": morph_name,
                    "copied_from": copy_from_morph,
                    "relation_count": len(existing_morph.get("relationNode_ids", [])),
                    "attribute_count": len(existing_morph.get("attributeNode_ids", []))
                }

            # Create new morph
            new_morph = {
                "morph_id": morph_id,
                "node_id": node_id,
                "name": morph_name,
                "relationNode_ids": [],
                "attributeNode_ids": []
            }

            # If copying from another morph, copy all properties
            print(f"DEBUG: About to check copy_from_morph condition: {copy_from_morph}")
            print(f"DEBUG: copy_from_morph type: {type(copy_from_morph)}")
            print(f"DEBUG: copy_from_morph truthiness: {bool(copy_from_morph)}")
            if copy_from_morph:
                print(f"DEBUG: Copying from morph: {copy_from_morph}")
                # Reload the node file to ensure latest state
                with open(source_node_path, 'r') as f:
                    source_node = json.load(f)
                print(f"DEBUG: Loaded source node with {len(source_node.get('morphs', []))} morphs")
                source_morph = None
                for morph in source_node.get("morphs", []):
                    print(f"DEBUG: Checking morph: {morph.get('morph_id')}")
                    if morph.get("morph_id") == copy_from_morph:
                        source_morph = morph
                        print(f"DEBUG: Found source morph: {copy_from_morph}")
                        break

                if not source_morph:
                    print(f"DEBUG: Source morph {copy_from_morph} not found!")
                    raise HTTPException(status_code=404, detail=f"Source morph {copy_from_morph} not found in node {node_id}")

                print(f"DEBUG: Copying from source morph {copy_from_morph}")
                print(f"DEBUG: Source morph relationNode_ids: {source_morph.get('relationNode_ids', [])}")
                print(f"DEBUG: Source morph attributeNode_ids: {source_morph.get('attributeNode_ids', [])}")

                # Copy relations and attributes
                new_morph["relationNode_ids"] = source_morph.get("relationNode_ids", []).copy()
                new_morph["attributeNode_ids"] = source_morph.get("attributeNode_ids", []).copy()

                print(f"DEBUG: New morph relationNode_ids: {new_morph['relationNode_ids']}")
                print(f"DEBUG: New morph attributeNode_ids: {new_morph['attributeNode_ids']}")

                # Update registries to include the new morph_id for all copied relations and attributes
                # Update relation registry
                rel_reg_path = Path(f"graph_data/users/{user_id}/relation_registry.json")
                print(f"DEBUG: Checking relation registry at {rel_reg_path}")
                print(f"DEBUG: Relation registry exists: {rel_reg_path.exists()}")
                if rel_reg_path.exists():
                    rel_registry = load_json_file(rel_reg_path)
                    print(f"DEBUG: Current relation registry keys: {list(rel_registry.keys())}")
                    for rel_id in new_morph["relationNode_ids"]:
                        print(f"DEBUG: Processing relation {rel_id}")
                        if rel_id in rel_registry:
                            current_morph_ids = rel_registry[rel_id].get("morph_id", [])
                            print(f"DEBUG: Current morph_ids for {rel_id}: {current_morph_ids}")
                            if isinstance(current_morph_ids, str):
                                current_morph_ids = [current_morph_ids]
                            if morph_id not in current_morph_ids:
                                current_morph_ids.append(morph_id)
                            print(f"DEBUG: Updated morph_ids for {rel_id}: {current_morph_ids}")
                            rel_registry[rel_id]["morph_id"] = current_morph_ids
                        else:
                            print(f"DEBUG: Relation {rel_id} not found in registry")
                    print(f"DEBUG: Saving updated relation registry")
                    atomic_registry_save(user_id, "relation", rel_registry)

                # Update attribute registry
                attr_reg_path = Path(f"graph_data/users/{user_id}/attribute_registry.json")
                print(f"DEBUG: Checking attribute registry at {attr_reg_path}")
                print(f"DEBUG: Attribute registry exists: {attr_reg_path.exists()}")
                if attr_reg_path.exists():
                    attr_registry = load_json_file(attr_reg_path)
                    print(f"DEBUG: Current attribute registry keys: {list(attr_registry.keys())}")
                    for attr_id in new_morph["attributeNode_ids"]:
                        print(f"DEBUG: Processing attribute {attr_id}")
                        if attr_id in attr_registry:
                            current_morph_ids = attr_registry[attr_id].get("morph_id", [])
                            print(f"DEBUG: Current morph_ids for {attr_id}: {current_morph_ids}")
                            if isinstance(current_morph_ids, str):
                                current_morph_ids = [current_morph_ids]
                            if morph_id not in current_morph_ids:
                                current_morph_ids.append(morph_id)
                            print(f"DEBUG: Updated morph_ids for {attr_id}: {current_morph_ids}")
                            attr_registry[attr_id]["morph_id"] = current_morph_ids
                        else:
                            print(f"DEBUG: Attribute {attr_id} not found in registry")
                    print(f"DEBUG: Saving updated attribute registry")
                    atomic_registry_save(user_id, "attribute", attr_registry)
                else:
                    print(f"DEBUG: Attribute registry does not exist")

            # Add new morph to node
            if "morphs" not in source_node:
                source_node["morphs"] = []
            source_node["morphs"].append(new_morph)

            # Save updated node
            with open(source_node_path, 'w') as f:
                json.dump(source_node, f, indent=2)

            return {
                "status": "Morph created successfully",
                "morph_id": morph_id,
                "node_id": node_id,
                "name": morph_name,
                "copied_from": copy_from_morph,
                "relation_count": len(new_morph["relationNode_ids"]),
                "attribute_count": len(new_morph["attributeNode_ids"]),
                "is_empty": copy_from_morph is None
            }

    except AtomicityError as e:
        raise HTTPException(status_code=500, detail=f"Atomic operation failed: {str(e)}")
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Failed to create morph: {str(e)}")

get_graph_node_ids(user_id: str, graph_id: str) -> list[str]

Get list of node IDs that belong to a specific graph

Source code in backend/routes/graph_ops.py
def get_graph_node_ids(user_id: str, graph_id: str) -> list[str]:
    """Get list of node IDs that belong to a specific graph"""
    from backend.core.registry import load_node_registry
    registry = load_node_registry(user_id)
    graph_nodes = []

    for node_id, entry in registry.items():
        if "graphs" in entry and graph_id in entry["graphs"]:
            graph_nodes.append(node_id)

    return graph_nodes

get_relation_node(user_id: str, graph_id: str, relation_id: str)

Get a specific relation node by its ID

Source code in backend/routes/graph_ops.py
@router.get("/users/{user_id}/graphs/{graph_id}/relationNodes/{relation_id}")
def get_relation_node(user_id: str, graph_id: str, relation_id: str):
    """Get a specific relation node by its ID"""
    rel_path = f"graph_data/users/{user_id}/relationNodes/{relation_id}.json"
    if not os.path.exists(rel_path):
        raise HTTPException(status_code=404, detail="RelationNode not found")
    with open(rel_path, "r") as f:
        return json.load(f)

validate_graph_consistency(user_id: str, graph_id: str)

Validate the consistency of a graph's data.

This endpoint performs comprehensive validation of: - Node registry consistency - Relation registry consistency
- Attribute registry consistency - File existence checks - Reference integrity

Returns validation results with issues and warnings.

Source code in backend/routes/graph_ops.py
@router.get("/users/{user_id}/graphs/{graph_id}/validate")
def validate_graph_consistency(user_id: str, graph_id: str):
    """
    Validate the consistency of a graph's data.

    This endpoint performs comprehensive validation of:
    - Node registry consistency
    - Relation registry consistency  
    - Attribute registry consistency
    - File existence checks
    - Reference integrity

    Returns validation results with issues and warnings.
    """
    try:
        validation_result = validate_consistency(user_id)
        return validation_result
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Validation failed: {str(e)}")

cleanup_old_backups(user_id: str, graph_id: str, max_age_hours: int = 24)

Clean up old backup directories for a user.

Parameters:

Name Type Description Default
user_id str

User ID

required
graph_id str

Graph ID (not used but kept for consistency)

required
max_age_hours int

Maximum age of backups to keep (default: 24 hours)

24

Returns:

Type Description

Number of backups cleaned up

Source code in backend/routes/graph_ops.py
@router.post("/users/{user_id}/graphs/{graph_id}/cleanup-backups")
def cleanup_old_backups(user_id: str, graph_id: str, max_age_hours: int = 24):
    """
    Clean up old backup directories for a user.

    Args:
        user_id: User ID
        graph_id: Graph ID (not used but kept for consistency)
        max_age_hours: Maximum age of backups to keep (default: 24 hours)

    Returns:
        Number of backups cleaned up
    """
    try:
        cleaned_count = cleanup_backups(user_id, max_age_hours)
        return {
            "status": "Backup cleanup completed",
            "cleaned_count": cleaned_count,
            "max_age_hours": max_age_hours
        }
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Backup cleanup failed: {str(e)}")