Condividi tramite


Connettere agenti a dati non strutturati

Gli agenti di intelligenza artificiale spesso devono eseguire query su dati non strutturati come raccolte di documenti, knowledge base o corpora di testo per rispondere alle domande e fornire risposte in grado di essere consapevoli del contesto.

Databricks offre più approcci per la connessione degli agenti a dati non strutturati negli indici di Ricerca vettoriale e negli archivi vettoriali esterni. Usare i server MCP preconfigurato per l'accesso immediato agli indici di Ricerca vettoriale di Databricks, sviluppare strumenti di recupero in locale con pacchetti AI Bridge o creare funzioni di recupero personalizzate per flussi di lavoro specializzati.

Eseguire query su un indice di ricerca vettoriale

Eseguire query su un indice di ricerca vettoriale di Databricks

Eseguire query su un indice di ricerca vettoriale di Databricks usando il mcp di ricerca vettoriale

Se l'agente deve eseguire query su un indice di ricerca vettoriale di Databricks, usare il server MCP gestito da Databricks per questo tipo:

  1. Creare un indice di ricerca vettoriale usando incorporamenti gestiti di Databricks. Vedere Creare endpoint e indici di ricerca vettoriali.
  2. Creare un agente e connetterlo all'URL MCP gestito preconfigurato per l'indice di ricerca vettoriale: https://<workspace-hostname>/api/2.0/mcp/vector-search/{catalog}/{schema}/{index_name}.

Per informazioni su come creare un agente che si connette ai server MCP gestiti, vedere Usare i server MCP gestiti di Databricks.

Eseguire query su un indice di ricerca vettoriale all'esterno di Databricks

Eseguire query su un indice di ricerca vettoriale ospitato all'esterno di Databricks

Se l'indice vettoriale è ospitato all'esterno di Azure Databricks, è possibile creare una connessione di Unity Catalog per connettersi al servizio esterno e usare la connessione nel codice dell'agente. Vedere Connettere gli strumenti dell'agente di intelligenza artificiale ai servizi esterni.

Nell'esempio seguente viene creato un recuperatore che chiama un indice vettoriale ospitato all'esterno di Databricks per un agente basato su PyFunc.

  1. Creare una connessione del catalogo Unity al servizio esterno, in questo caso Azure.

    CREATE CONNECTION ${connection_name}
    TYPE HTTP
    OPTIONS (
      host 'https://example.search.windows.net',
      base_path '/',
      bearer_token secret ('<secret-scope>','<secret-key>')
    );
    
  2. Definire lo strumento retriever nel codice dell'agente usando la connessione al catalogo Unity. Questo esempio usa i decoratori MLflow per abilitare il tracciamento dell'agente.

    Nota

    Per essere conforme allo schema del retriever MLflow, la funzione retriever deve restituire un List[Document] oggetto e utilizzare il metadata campo nella classe Document per aggiungere altri attributi al documento restituito, ad esempio doc_uri e similarity_score. Vedere Documento MLflow.

    import mlflow
    import json
    
    from mlflow.entities import Document
    from typing import List, Dict, Any
    from dataclasses import asdict
    
    class VectorSearchRetriever:
      """
      Class using Databricks Vector Search to retrieve relevant documents.
      """
    
      def __init__(self):
        self.azure_search_index = "hotels_vector_index"
    
      @mlflow.trace(span_type="RETRIEVER", name="vector_search")
      def __call__(self, query_vector: List[Any], score_threshold=None) -> List[Document]:
        """
        Performs vector search to retrieve relevant chunks.
        Args:
          query: Search query.
          score_threshold: Score threshold to use for the query.
    
        Returns:
          List of retrieved Documents.
        """
        from databricks.sdk import WorkspaceClient
        from databricks.sdk.service.serving import ExternalFunctionRequestHttpMethod
    
        json = {
          "count": true,
          "select": "HotelId, HotelName, Description, Category",
          "vectorQueries": [
            {
              "vector": query_vector,
              "k": 7,
              "fields": "DescriptionVector",
              "kind": "vector",
              "exhaustive": true,
            }
          ],
        }
    
        response = (
          WorkspaceClient()
          .serving_endpoints.http_request(
            conn=connection_name,
            method=ExternalFunctionRequestHttpMethod.POST,
            path=f"indexes/{self.azure_search_index}/docs/search?api-version=2023-07-01-Preview",
            json=json,
          )
          .text
        )
    
        documents = self.convert_vector_search_to_documents(response, score_threshold)
        return [asdict(doc) for doc in documents]
    
      @mlflow.trace(span_type="PARSER")
      def convert_vector_search_to_documents(
        self, vs_results, score_threshold
      ) -> List[Document]:
        docs = []
    
        for item in vs_results.get("value", []):
          score = item.get("@search.score", 0)
    
          if score >= score_threshold:
            metadata = {
              "score": score,
              "HotelName": item.get("HotelName"),
              "Category": item.get("Category"),
            }
    
            doc = Document(
              page_content=item.get("Description", ""),
              metadata=metadata,
              id=item.get("HotelId"),
            )
            docs.append(doc)
    
        return docs
    
  3. Per eseguire il retriever, eseguire il codice Python seguente. Facoltativamente, è possibile includere filtri di ricerca vettoriale nella richiesta per filtrare i risultati.

    retriever = VectorSearchRetriever()
    query = [0.01944167, 0.0040178085 . . .  TRIMMED FOR BREVITY 010858015, -0.017496133]
    results = retriever(query, score_threshold=0.1)
    
Sviluppare un retriever locale

Sviluppare un retriever localmente usando AI Bridge

Per creare localmente uno strumento di recupero per la ricerca vettoriale di Databricks, utilizzare i pacchetti Databricks AI Bridge come databricks-langchain e databricks-openai. Questi pacchetti includono funzioni helper come from_vector_search e from_uc_function per creare recuperatori da risorse di Databricks esistenti.

LangChain/LangGraph

Installare la versione più recente di databricks-langchain che include Databricks AI Bridge.

%pip install --upgrade databricks-langchain

Il codice seguente crea prototipi di uno strumento di recupero che esegue una query su un indice ipotetico di ricerca vettoriale e lo associa a un LLM in locale, in modo da poter testare il comportamento di chiamata allo strumento.

Fornire un descrittivo tool_description per aiutare l'agente a comprendere lo strumento e determinare quando richiamarlo.

from databricks_langchain import VectorSearchRetrieverTool, ChatDatabricks

# Initialize the retriever tool.
vs_tool = VectorSearchRetrieverTool(
  index_name="catalog.schema.my_databricks_docs_index",
  tool_name="databricks_docs_retriever",
  tool_description="Retrieves information about Databricks products from official Databricks documentation."
)

# Run a query against the vector search index locally for testing
vs_tool.invoke("Databricks Agent Framework?")

# Bind the retriever tool to your Langchain LLM of choice
llm = ChatDatabricks(endpoint="databricks-claude-sonnet-4-5")
llm_with_tools = llm.bind_tools([vs_tool])

# Chat with your LLM to test the tool calling functionality
llm_with_tools.invoke("Based on the Databricks documentation, what is Databricks Agent Framework?")

Per gli scenari che usano indici di accesso diretto o indici di Sincronizzazione Delta tramite incorporamenti autogestiti, è necessario configurare VectorSearchRetrieverTool e specificare un modello di incorporamento personalizzato e una colonna di testo. Consultare le opzioni per fornire incorporazioni.

L'esempio seguente mostra come configurare un VectorSearchRetrieverTool con le chiavi columns e embedding.

from databricks_langchain import VectorSearchRetrieverTool
from databricks_langchain import DatabricksEmbeddings

embedding_model = DatabricksEmbeddings(
    endpoint="databricks-bge-large-en",
)

vs_tool = VectorSearchRetrieverTool(
  index_name="catalog.schema.index_name", # Index name in the format 'catalog.schema.index'
  num_results=5, # Max number of documents to return
  columns=["primary_key", "text_column"], # List of columns to include in the search
  filters={"text_column LIKE": "Databricks"}, # Filters to apply to the query
  query_type="ANN", # Query type ("ANN" or "HYBRID").
  tool_name="name of the tool", # Used by the LLM to understand the purpose of the tool
  tool_description="Purpose of the tool", # Used by the LLM to understand the purpose of the tool
  text_column="text_column", # Specify text column for embeddings. Required for direct-access index or delta-sync index with self-managed embeddings.
  embedding=embedding_model # The embedding model. Required for direct-access index or delta-sync index with self-managed embeddings.
)

Per altri dettagli, vedere la documentazione dell'API per VectorSearchRetrieverTool.

OpenAI

Installare la versione più recente di databricks-openai che include Databricks AI Bridge.

%pip install --upgrade databricks-openai

Il codice seguente crea prototipi di un retriever che esegue una query su un indice ipotetico di ricerca vettoriale e lo integra con i modelli GPT di OpenAI.

Fornire un descrittivo tool_description per aiutare l'agente a comprendere lo strumento e determinare quando richiamarlo.

Per altre informazioni sulle raccomandazioni openAI per gli strumenti, vedere documentazione relativa alla chiamata di funzioni OpenAI.

from databricks_openai import VectorSearchRetrieverTool
from openai import OpenAI
import json

# Initialize OpenAI client
client = OpenAI(api_key=<your_API_key>)

# Initialize the retriever tool
dbvs_tool = VectorSearchRetrieverTool(
  index_name="catalog.schema.my_databricks_docs_index",
  tool_name="databricks_docs_retriever",
  tool_description="Retrieves information about Databricks products from official Databricks documentation"
)

messages = [
  {"role": "system", "content": "You are a helpful assistant."},
  {
    "role": "user",
    "content": "Using the Databricks documentation, answer what is Spark?"
  }
]
first_response = client.chat.completions.create(
  model="gpt-4o",
  messages=messages,
  tools=[dbvs_tool.tool]
)

# Execute function code and parse the model's response and handle function calls.
tool_call = first_response.choices[0].message.tool_calls[0]
args = json.loads(tool_call.function.arguments)
result = dbvs_tool.execute(query=args["query"])  # For self-managed embeddings, optionally pass in openai_client=client

# Supply model with results – so it can incorporate them into its final response.
messages.append(first_response.choices[0].message)
messages.append({
  "role": "tool",
  "tool_call_id": tool_call.id,
  "content": json.dumps(result)
})
second_response = client.chat.completions.create(
  model="gpt-4o",
  messages=messages,
  tools=[dbvs_tool.tool]
)

Per gli scenari che usano indici di accesso diretto o indici di Sincronizzazione Delta tramite incorporamenti autogestiti, è necessario configurare VectorSearchRetrieverTool e specificare un modello di incorporamento personalizzato e una colonna di testo. Consultare le opzioni per fornire incorporazioni.

L'esempio seguente mostra come configurare un VectorSearchRetrieverTool con le chiavi columns e embedding.

from databricks_openai import VectorSearchRetrieverTool

vs_tool = VectorSearchRetrieverTool(
    index_name="catalog.schema.index_name", # Index name in the format 'catalog.schema.index'
    num_results=5, # Max number of documents to return
    columns=["primary_key", "text_column"], # List of columns to include in the search
    filters={"text_column LIKE": "Databricks"}, # Filters to apply to the query
    query_type="ANN", # Query type ("ANN" or "HYBRID").
    tool_name="name of the tool", # Used by the LLM to understand the purpose of the tool
    tool_description="Purpose of the tool", # Used by the LLM to understand the purpose of the tool
    text_column="text_column", # Specify text column for embeddings. Required for direct-access index or delta-sync index with self-managed embeddings.
    embedding_model_name="databricks-bge-large-en" # The embedding model. Required for direct-access index or delta-sync index with self-managed embeddings.
)

Per altri dettagli, vedere la documentazione dell'API per VectorSearchRetrieverTool.

Dopo che lo strumento locale è pronto, è possibile crearlo direttamente in produzione come parte del codice dell'agente o eseguirne la migrazione a una funzione del catalogo Unity, che offre una migliore individuabilità e governance, ma presenta alcune limitazioni.

Eseguire query su Ricerca vettoriale di Databricks usando le funzioni UC (deprecate)

Eseguire query su Vector Search di Databricks usando le funzioni UC (deprecate)

Nota

Databricks consiglia i server MCP per la maggior parte degli strumenti dell'agente, ma la definizione degli strumenti con le funzioni di Unity Catalog rimane disponibile per la creazione di prototipi.

È possibile creare una funzione di Unity Catalog che incapsula una query sull'indice di ricerca a vettori Mosaic AI. Questo approccio:

  • Supporta i casi d'uso di produzione con governance e facilità di individuazione
  • Usa la funzione SQL vector_search() sotto le quinte
  • Supporta il tracciamento automatico di MLflow
    • È necessario allineare l'output della funzione allo schema del retriever MLflow usando gli page_content alias e metadata .
    • Tutte le colonne di metadati aggiuntive devono essere aggiunte alla metadata colonna usando la funzione di mapping SQL, anziché come chiavi di output di primo livello.

Eseguire il codice seguente in un notebook o in un editor SQL per creare la funzione:

CREATE OR REPLACE FUNCTION main.default.databricks_docs_vector_search (
  -- The agent uses this comment to determine how to generate the query string parameter.
  query STRING
  COMMENT 'The query string for searching Databricks documentation.'
) RETURNS TABLE
-- The agent uses this comment to determine when to call this tool. It describes the types of documents and information contained within the index.
COMMENT 'Executes a search on Databricks documentation to retrieve text documents most relevant to the input query.' RETURN
SELECT
  chunked_text as page_content,
  map('doc_uri', url, 'chunk_id', chunk_id) as metadata
FROM
  vector_search(
    -- Specify your Vector Search index name here
    index => 'catalog.schema.databricks_docs_index',
    query => query,
    num_results => 5
  )

Per usare questo strumento di recupero nel tuo agente di intelligenza artificiale, racchiudilo con UCFunctionToolkit. Questo consente la tracciatura automatica tramite MLflow, generando automaticamente tipi di span nei log di MLflow.

from unitycatalog.ai.langchain.toolkit import UCFunctionToolkit

toolkit = UCFunctionToolkit(
    function_names=[
        "main.default.databricks_docs_vector_search"
    ]
)
tools = toolkit.tools

Gli strumenti di recupero del catalogo Unity presentano le avvertenze seguenti:

  • I client SQL potrebbero limitare il numero massimo di righe o byte restituiti. Per evitare il troncamento dei dati, troncate i valori delle colonne restituiti dalla UDF. Ad esempio, è possibile usare substring(chunked_text, 0, 8192) per ridurre le dimensioni delle colonne di contenuto di grandi dimensioni ed evitare il troncamento delle righe durante l'esecuzione.
  • Poiché questo strumento è un wrapper per la vector_search() funzione, è soggetto alle stesse limitazioni della vector_search() funzione. Vedere Limitazioni.

Per altre informazioni sui UCFunctionToolkit, vedere documentazione di Unity Catalog.

Aggiungere il tracciamento ad uno strumento di recupero

Aggiungi il tracciamento MLflow per monitorare e eseguire il debug del retriever. La traccia consente di visualizzare input, output e metadati per ogni passaggio di esecuzione.

Nell'esempio precedente, il decorator @mlflow.trace viene aggiunto sia ai __call__ metodi che a quelli di analisi. Il decoratore crea un intervallo che inizia quando la funzione viene richiamata e termina al suo ritorno. MLflow registra automaticamente l'input e l'output della funzione ed eventuali eccezioni generate.

Nota

Gli utenti della libreria LangChain, LlamaIndex e OpenAI possono usare la registrazione automatica di MLflow oltre a definire manualmente le tracce con l'elemento decorator. Vedere Aggiungere tracce alle applicazioni: traccia automatica e manuale.

import mlflow
from mlflow.entities import Document

# This code snippet has been truncated for brevity. See the full retriever example above.
class VectorSearchRetriever:
  ...

  # Create a RETRIEVER span. The span name must match the retriever schema name.
  @mlflow.trace(span_type="RETRIEVER", name="vector_search")
  def __call__(...) -> List[Document]:
    ...

  # Create a PARSER span.
  @mlflow.trace(span_type="PARSER")
  def parse_results(...) -> List[Document]:
    ...

Per verificare che le applicazioni downstream, come Agent Evaluation e AI Playground, rendano correttamente la traccia del retriever, assicurarsi che il decoratore soddisfi i seguenti requisiti:

  • Usare lo schema dell'intervallo di recupero MLflow e verificare che la funzione restituisca un oggetto List[Document].
  • Il nome della traccia e il retriever_schema nome devono corrispondere per configurare correttamente la traccia. Per informazioni su come impostare lo schema del retriever, vedere la sezione seguente.

Impostare lo schema del retriever per verificare la compatibilità di MLflow

Se la traccia restituita dal retriever o span_type="RETRIEVER" non è conforme allo schema di recupero standard di MLflow, è necessario eseguire manualmente il mapping dello schema restituito ai campi previsti di MLflow. Ciò verifica che MLflow possa tracciare correttamente il retriever e visualizzare le tracce in applicazioni downstream.

Per impostare manualmente lo schema del retriever:

  1. Chiamare mlflow.models.set_retriever_schema quando si definisce l'agente. Usare set_retriever_schema per mappare i nomi delle colonne nella tabella restituita ai campi previsti di MLflow, ad esempio primary_key, text_column e doc_uri.

    # Define the retriever's schema by providing your column names
    mlflow.models.set_retriever_schema(
      name="vector_search",
      primary_key="chunk_id",
      text_column="text_column",
      doc_uri="doc_uri"
      # other_columns=["column1", "column2"],
    )
    
  2. Specificare colonne aggiuntive nello schema del retriever fornendo un elenco di nomi di colonna con il other_columns campo .

  3. Se si dispone di più retriever, è possibile definire più schemi usando nomi univoci per ogni schema di recupero.

Lo schema del retriever impostato durante la creazione dell'agente influisce su applicazioni e flussi di lavoro downstream, ad esempio l'app di revisione e i set di valutazione. In particolare, la colonna doc_uri funge da identificatore primario per i documenti restituiti dal retriever.

  • L'app di revisione visualizza il doc_uri per aiutare i revisori a valutare le risposte e tracciare le origini dei documenti. Vedi Revisione dell'interfaccia utente dell'app.
  • I set di valutazione utilizzano doc_uri per confrontare i risultati del sistema di recupero con i set di dati di valutazione predefiniti, al fine di determinare il recupero e la precisione del sistema. Vedere Set di valutazione (MLflow 2).

Leggere i file da un volume del catalogo Unity

Se l'agente deve leggere file non strutturati (documenti di testo, report, file di configurazione e così via) archiviati in un volume di Catalogo Unity, è possibile creare strumenti che usano l'API File di Databricks SDK per elencare e leggere direttamente i file.

Gli esempi seguenti creano due strumenti che l'agente può usare:

  • list_volume_files: elenca i file e le directory nel volume.
  • read_volume_file: legge il contenuto di un file di testo dal volume.

LangChain/LangGraph

Installare la versione più recente di databricks-langchain che include Databricks AI Bridge.

%pip install --upgrade databricks-langchain
from databricks.sdk import WorkspaceClient
from langchain_core.tools import tool

VOLUME = "<catalog>.<schema>.<volume>"  # TODO: Replace with your volume
w = WorkspaceClient()


@tool
def list_volume_files(directory: str = "") -> str:
    """Lists files and directories in the Unity Catalog volume.
    Provide a relative directory path, or leave empty to list the volume root."""
    base = f"/Volumes/{VOLUME.replace('.', '/')}"
    path = f"{base}/{directory.lstrip('/')}" if directory else base
    entries = []
    for f in w.files.list_directory_contents(path):
        kind = "dir" if f.is_directory else "file"
        size = f" ({f.file_size} bytes)" if not f.is_directory else ""
        entries.append(f"  [{kind}] {f.name}{size}")
    return "\n".join(entries) if entries else "No files found."


@tool
def read_volume_file(file_path: str) -> str:
    """Reads a text file from the Unity Catalog volume.
    Provide the path relative to the volume root, for example 'reports/q1_summary.txt'."""
    base = f"/Volumes/{VOLUME.replace('.', '/')}"
    full_path = f"{base}/{file_path.lstrip('/')}"
    resp = w.files.download(full_path)
    return resp.contents.read().decode("utf-8")

Associare gli strumenti a un LLM ed eseguire un ciclo di chiamata degli strumenti:

from databricks_langchain import ChatDatabricks
from langchain_core.messages import HumanMessage, ToolMessage

llm = ChatDatabricks(endpoint="databricks-claude-sonnet-4-5")
llm_with_tools = llm.bind_tools([list_volume_files, read_volume_file])

messages = [HumanMessage(content="What files are in the volume? Can you read about_databricks.txt and summarize it in 2 sentences?")]
tool_map = {"list_volume_files": list_volume_files, "read_volume_file": read_volume_file}

for _ in range(5):  # max iterations
    response = llm_with_tools.invoke(messages)
    messages.append(response)
    if not response.tool_calls:
        break
    for tc in response.tool_calls:
        result = tool_map[tc["name"]].invoke(tc["args"])
        messages.append(ToolMessage(content=result, tool_call_id=tc["id"]))

print(response.content)

OpenAI

Installare la versione più recente di databricks-openai che include Databricks AI Bridge.

%pip install --upgrade databricks-openai
from databricks.sdk import WorkspaceClient
from databricks_openai import DatabricksOpenAI
import json

VOLUME = "<catalog>.<schema>.<volume>"  # TODO: Replace with your volume
w = WorkspaceClient()
client = DatabricksOpenAI()

# Define the tool specifications
tools = [
    {
        "type": "function",
        "function": {
            "name": "list_volume_files",
            "description": "Lists files and directories in the Unity Catalog volume. Provide a relative directory path, or leave empty to list the volume root.",
            "parameters": {
                "type": "object",
                "properties": {
                    "directory": {
                        "type": "string",
                        "description": "Relative directory path within the volume. Leave empty for root.",
                    }
                },
                "required": [],
            },
        },
    },
    {
        "type": "function",
        "function": {
            "name": "read_volume_file",
            "description": "Reads a text file from the Unity Catalog volume. Provide the path relative to the volume root, for example 'reports/q4_summary.txt'.",
            "parameters": {
                "type": "object",
                "properties": {
                    "file_path": {
                        "type": "string",
                        "description": "Path to the file relative to the volume root.",
                    }
                },
                "required": ["file_path"],
            },
        },
    },
]


def execute_tool(name: str, args: dict) -> str:
    base = f"/Volumes/{VOLUME.replace('.', '/')}"
    if name == "list_volume_files":
        directory = args.get("directory", "")
        path = f"{base}/{directory.lstrip('/')}" if directory else base
        entries = []
        for f in w.files.list_directory_contents(path):
            kind = "dir" if f.is_directory else "file"
            size = f" ({f.file_size} bytes)" if not f.is_directory else ""
            entries.append(f"[{kind}] {f.name}{size}")
        return "\n".join(entries) if entries else "No files found."
    elif name == "read_volume_file":
        full_path = f"{base}/{args['file_path'].lstrip('/')}"
        resp = w.files.download(full_path)
        return resp.contents.read().decode("utf-8")
    return f"Unknown tool: {name}"


# Call the model with tools
messages = [
    {"role": "system", "content": "You are a helpful assistant."},
    {"role": "user", "content": "List the files in the volume, then read about_databricks.txt and summarize it."},
]

response = client.chat.completions.create(
    model="databricks-claude-sonnet-4-5", messages=messages, tools=tools
)

# Execute tool calls and send results back
while response.choices[0].finish_reason == "tool_calls":
    messages.append(response.choices[0].message)
    for tool_call in response.choices[0].message.tool_calls:
        args = json.loads(tool_call.function.arguments)
        result = execute_tool(tool_call.function.name, args)
        messages.append(
            {"role": "tool", "tool_call_id": tool_call.id, "content": result}
        )
    response = client.chat.completions.create(
        model="databricks-claude-sonnet-4-5", messages=messages, tools=tools
    )

print(response.choices[0].message.content)