Skip to content

Support for node/edge "properties" field + same rel type between different node types #9

Open
@daaain

Description

@daaain

I've been working on a proof of concept graph RAG using Kuzu and this library, but missed two features.

First, to have the library save the properties field that are defined here, so it's possible to add some metadata instead of only getting node ids back when querying.

Second, I've been testing LLM based entity and relationship extraction (see prompt below) so I ended up with the same relationships extracted between different node types, but the existing _create_entity_relationship_table only creates the rel table for the first pair and then it's a noop.

I've managed to vibe code a naive implementation of this, but had trouble with the Kuzu db somehow getting corrupted even when trying to ingest only 60 documents. Once it started crashing with segfault after 15 docs, and then on second and third attempts it's crashing with Error during recovery: unordered_map::at after about 30-40 docs.

So would be great to have proper support for these.

import json
from typing import Any

from langchain_kuzu.graphs.graph_document import GraphDocument
from langchain_kuzu.graphs.kuzu_graph import KuzuGraph


class EnhancedKuzuGraph(KuzuGraph):
    """
    Enhanced KuzuGraph that properly handles relationship tables with multiple connection types
    and preserves node and edge properties.

    The standard KuzuGraph implementation creates relationship tables with single source and target
    node types, which causes errors when the same relationship type is used between different node
    type pairs. It also ignores the properties on nodes and edges.

    This enhanced implementation:
    1. Gathers all required source-target combinations for each relationship type
    2. Creates relationship tables with all needed connection patterns
    3. Preserves node and edge properties in a 'properties' column
    """

    def add_graph_documents(
        self,
        graph_documents: list[GraphDocument],
        include_source: bool = False,  # noqa: FBT001, FBT002: inherited from library
    ) -> None:
        """
        Adds a list of `GraphDocument` objects to the Kuzu graph with enhanced relationship handling
        and property preservation.

        This overridden method first analyzes all relationships to ensure relationship tables
        support all required connection patterns before adding any relationships.

        Args:
          graph_documents (List[GraphDocument]): A list of `GraphDocument` objects
            that contain the nodes and relationships to be added to the graph.
          include_source (bool): If True, stores the source document
            and links it to nodes in the graph using the `MENTIONS` relationship.

        """
        # We have to create node tables first as Kuzu does a reference check when creating
        # relationship tables, so unfortunately there's a bit of duplication here
        node_labels = list({node.type for document in graph_documents for node in document.nodes})
        for node_label in node_labels:
            self._create_entity_node_table_with_properties(node_label)

        # First pass: Gather all relationship connection patterns
        relationship_patterns: dict[str, set[tuple[str, str]]] = {}

        for document in graph_documents:
            # Collect relationship patterns
            for rel in document.relationships:
                # TODO: make sure type is one word, otherwise the table won't be created:
                # Parser exception: mismatched input '":"' expecting {'(', SP} (line: 1, offset: 21)
                # "CREATE REL TABLE type":"requires (FROM Definition TO Clause, properties STRING)"
                rel_type = rel.type
                source_type = rel.source.type
                target_type = rel.target.type

                if rel_type not in relationship_patterns:
                    relationship_patterns[rel_type] = set()

                relationship_patterns[rel_type].add((source_type, target_type))

        # Create or update relationship tables with all needed connections
        for rel_type, connections in relationship_patterns.items():
            self._ensure_relationship_table_with_properties(rel_type, connections)

        # Now add all the nodes with their properties
        for document in graph_documents:
            for node in document.nodes:
                self._add_entity_node_with_properties(node)

        # Add all relationships with their properties
        for document in graph_documents:
            for rel in document.relationships:
                self._add_relationship_with_properties(rel)

        # Process source documents if requested
        if include_source:
            for document in graph_documents:
                self._process_source_document(document)
                for node in document.nodes:
                    self._link_entity_to_source(document, node)

    def _create_entity_node_table_with_properties(self, node_label: str) -> None:
        """
        Creates a node table with a properties column.

        Args:
            node_label (str): The label/type of the node table to create

        """
        self.conn.execute(
            f"""
            CREATE NODE TABLE IF NOT EXISTS {node_label} (
                id STRING,
                type STRING,
                properties STRING,
                PRIMARY KEY(id)
            );
            """,
        )

    def _ensure_relationship_table_with_properties(
        self,
        rel_type: str,
        connections: set[tuple[str, str]],
    ) -> None:
        """
        Ensure a relationship table exists with all the required connection patterns and a properties column.

        Args:
            rel_type (str): The relationship type name
            connections (Set[Tuple[str, str]]): Set of (source_type, target_type) tuples

        """
        # Check if the relationship table already exists
        query_results = self.conn.execute("CALL SHOW_TABLES() RETURN *")
        rel_tables = query_results[0] if isinstance(query_results, list) else query_results

        existing_tables = []
        while rel_tables.has_next():
            row = rel_tables.get_next()
            existing_tables.append((row[1], row[2]))  # (name, type)

        table_exists = any(
            name == rel_type and table_type == "REL" for name, table_type in existing_tables
        )

        if table_exists:
            # Get current connections
            query_results = self.conn.execute(f"CALL SHOW_CONNECTION('{rel_type}') RETURN *")
            conn_info = query_results[0] if isinstance(query_results, list) else query_results
            existing_connections = set()

            while conn_info.has_next():
                row = conn_info.get_next()
                existing_connections.add((row[0], row[1]))  # (source, target)

            # Check if we need to add new connections
            all_connections = existing_connections.union(connections)

            if len(all_connections) > len(existing_connections):
                # We need to recreate the table with all connections

                # First, collect all existing relationships to preserve them
                all_relationships = []
                for src_type, tgt_type in existing_connections:
                    # Query to get all relationships of this type and direction with properties
                    query = f"""
                    MATCH (source:{src_type})-[r:{rel_type}]->(target:{tgt_type})
                    RETURN source.id AS source_id, target.id AS target_id, r.properties AS properties
                    """
                    query_results = self.conn.execute(query)
                    all_rels = (
                        query_results[0] if isinstance(query_results, list) else query_results
                    )

                    # Collect all relationships
                    while all_rels.has_next():
                        row = all_rels.get_next()
                        source_id = row[0]
                        target_id = row[1]
                        properties = row[2] if len(row) > 2 else None
                        all_relationships.append(
                            (src_type, source_id, tgt_type, target_id, properties),
                        )

                # Now drop and recreate the relationship table with expanded schema
                self.conn.execute(f"DROP TABLE {rel_type}")

                # Create new table with all connections and properties column
                connection_strs = [f"FROM {src} TO {tgt}" for src, tgt in all_connections]
                create_stmt = (
                    f"CREATE REL TABLE {rel_type} ({', '.join(connection_strs)}, properties STRING)"
                )
                self.conn.execute(create_stmt)

                # Restore all relationships
                for src_type, source_id, tgt_type, target_id, properties in all_relationships:
                    if properties:
                        restore_query = f"""
                        MATCH (source:{src_type} {{id: '{source_id}'}}),
                              (target:{tgt_type} {{id: '{target_id}'}})
                        MERGE (source)-[r:{rel_type}]->(target)
                        SET r.properties = '{properties}'
                        """
                    else:
                        restore_query = f"""
                        MATCH (source:{src_type} {{id: '{source_id}'}}),
                              (target:{tgt_type} {{id: '{target_id}'}})
                        MERGE (source)-[r:{rel_type}]->(target)
                        """
                    self.conn.execute(restore_query)
        else:
            # Table doesn't exist, create it with all connections and properties column
            connection_strs = [f"FROM {src} TO {tgt}" for src, tgt in connections]
            create_stmt = (
                f"CREATE REL TABLE {rel_type} ({', '.join(connection_strs)}, properties STRING)"
            )
            self.conn.execute(create_stmt)

        # Refresh the schema after making changes
        self.refresh_schema()

    def _add_entity_node_with_properties(self, node: Any) -> None:
        """
        Add an entity node to the graph with its properties.

        Args:
            node: The node to add

        """
        # Convert properties to a string representation if they exist
        properties_str = "NULL"
        if hasattr(node, "properties") and node.properties:
            properties_str = json.dumps(node.properties)

        self.conn.execute(
            f"""
            MERGE (e:{node.type} {{id: $id}})
                SET e.type = "entity",
                    e.properties = {properties_str}
            """,
            parameters={"id": node.id},
        )

    def _add_relationship_with_properties(self, rel: Any) -> None:
        """
        Add a relationship to the graph with its properties.

        Args:
            rel: The relationship to add

        """
        source_label = rel.source.type
        source_id = rel.source.id
        target_label = rel.target.type
        target_id = rel.target.id

        # Convert properties to a string representation if they exist
        properties_str = "NULL"
        if hasattr(rel, "properties") and rel.properties:
            properties_str = json.dumps(rel.properties)

        statement = f"""
            MATCH (e1:{source_label} {{id: $source_id}}),
                  (e2:{target_label} {{id: $target_id}})
            MERGE (e1)-[r:{rel.type}]->(e2)
            SET r.properties = {properties_str}
        """

        self.conn.execute(
            statement,
            parameters={
                "source_id": source_id,
                "target_id": target_id,
            },
        )

    def _process_source_document(self, document: GraphDocument) -> None:
        """Process a source document, creating the chunk node."""
        from hashlib import md5

        self._create_chunk_node_table()
        if not document.source.metadata.get("id"):
            # Add a unique id to each document chunk via an md5 hash
            document.source.metadata["id"] = md5(
                document.source.page_content.encode("utf-8"),
            ).hexdigest()

        self.conn.execute(
            """
            MERGE (c:Chunk {id: $id}) 
                SET c.text = $text,
                    c.type = "text_chunk"
            """,
            parameters={
                "id": document.source.metadata["id"],
                "text": document.source.page_content,
            },
        )

    def _link_entity_to_source(self, document: GraphDocument, node: Any) -> None:
        """Link an entity node to its source document."""
        self._create_mentions_relationship_table([node.type])

        # Only allow relationships that exist in the schema
        self.conn.execute(
            f"""
            MATCH (c:Chunk {{id: $id}}),
                  (e:{node.type} {{id: $node_id}})
            MERGE (c)-[m:MENTIONS]->(e)
            SET m.triplet_source_id = $id
            """,
            parameters={
                "id": document.source.metadata["id"],
                "node_id": node.id,
            },
        )

    def _create_mentions_relationship_table(self, node_labels: list[str]) -> None:
        """Create the MENTIONS relationship table that connects Chunk nodes to entity nodes."""
        ddl = "CREATE REL TABLE IF NOT EXISTS MENTIONS ("
        table_names = []
        for node_label in node_labels:
            table_names.append(f"FROM Chunk TO {node_label}")
        table_names = list(set(table_names))
        ddl += ", ".join(table_names)
        # Add common properties for all the tables here
        ddl += ", label STRING, triplet_source_id STRING)"

        if ddl:
            self.conn.execute(ddl)

Graph extraction prompt:

<task>Extract entities and relationships from the following financial/legal document. Identify specific financial and legal entities that appear throughout the document.</task>

<entity_types>
# Business Entities
- LegalEntity (borrowers, lenders, guarantors, subsidiaries, parent companies, agents, trustees)
- Person (individuals named in the agreement with roles or responsibilities)
- CreditFacility (specific credit lines or facilities being offered)
- Collateral (assets pledged as security)
- FinancialProduct (loans, revolving credits, term loans)

# Legal Elements
- Clause (specific contractual provisions)
- Section (document organisation units)
- Definition (formally defined terms)
- Obligation (duties, requirements)
- Right (entitlements, permissions)
- Condition (precedent or subsequent conditions)
- Representation (statements of fact)
- Warranty (assurances, promises)
- Covenant (agreements to do or not do something)
- EventOfDefault (circumstances triggering default)

# Financial Elements
- Amount (monetary values)
- Currency (types of currency)
- InterestRate (including base rates and margins)
- Fee (commitment fees, arrangement fees, prepayment fees)
- FinancialRatio (financial covenants like debt-to-EBITDA)
- PaymentSchedule (repayment terms)
- Account (bank accounts referenced)

# Temporal Elements
- Date (effective dates, maturity dates, reporting dates)
- Period (durations, terms, fiscal periods)
- Deadline (time-bound requirements)

# Geographical Elements
- Location (jurisdictions, addresses)
- GoverningLaw (applicable legal systems)
</entity_types>

<relationship_types>
- partyTo (entity is party to agreement)
- provides (entity provides facility/product/service)
- receives (entity receives facility/product/service)
- securedBy (facility secured by collateral)
- governs (law governs agreement/entity)
- guaranteedBy (obligation guaranteed by entity)
- definedIn (term defined in section)
- contains (section contains clause/provision)
- subjectTo (entity subject to condition)
- requires (entity requires action/condition)
- prohibits (entity prohibits action)
- calculatedAs (amount calculated using formula)
- effectiveOn (agreement effective on date)
- terminatesOn (agreement terminates on date)
- denominatiedIn (amount denominated in currency)
- locatedAt (entity located at address)
- reportsTo (entity reports to another)
- triggers (event triggers consequence)
- assigns (entity assigns right to another)
</relationship_types>

<rules>
- Output *only* valid JSON. Do not include any explanations or text outside the JSON structure. Write the objects inside the top level arrays without whitespace to save output tokens.
- Each node must have an 'id' (use a very short name, but ensuring uniqueness *within the entire document*) and a 'type'. 
- Include relevant extracted text or values in a 'properties' dictionary (e.g., {"value": "USD 50,000,000", "context": "maximum facility amount"} for an Amount node).
- For LegalEntity nodes, include a "role" property (e.g., "Borrower", "Lender", "Guarantor").
- For Date nodes, standardise the date format when possible (YYYY-MM-DD).
- Each edge must have a 'from' (referencing a node id), a 'to' (referencing a node id), and a 'type' (the relationship label)..
- Focus on explicit mentions and clear relationships within the text. Avoid inferring relationships not directly stated.
- Ensure node 'id' values used in 'from' and 'to' correspond to nodes defined in the 'nodes' list.
- Cross-reference entities across the entire document to ensure consistent identification.
</rules>

<output_example>
{
  "nodes": [
    {"id":"company1","type":"LegalEntity","properties":{"name":"OpCo Ltd.","role":"Borrower",...}},
    ...
  ],
  "edges": [
    {"from":"company1","to":"address1","type":"locatedAt","properties":{"since":"2015-01-02"}},
    ...
  ]
}
</output_example>

<document_text>
{{document_text}}
</document_text>

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions