Ejemplo de modelo de inserción externa de búsqueda de vectores (OpenAI)

En este cuaderno se muestra cómo usar el SDK de búsqueda de vectores Python, que proporciona una VectorSearchClient como API principal para trabajar con la búsqueda de vectores.

En este cuaderno se usa la compatibilidad de Databricks con modelos externos para acceder a un modelo de incrustaciones de OpenAI para generar inserciones.

%pip install --upgrade --force-reinstall databricks-vectorsearch tiktoken
dbutils.library.restartPython()
from databricks.vector_search.client import VectorSearchClient

vsc = VectorSearchClient(disable_notice=True)
# Display help for the Vector Search Client
help(VectorSearchClient)

Carga el conjunto de datos de prueba en la tabla Delta de fuente

A continuación se crea la tabla Delta de origen.

# Specify the catalog and schema to use. You must have USE_CATALOG privilege on the catalog and USE_SCHEMA and CREATE_TABLE privileges on the schema.
# Change the catalog and schema here if necessary.

catalog_name = "main"
schema_name = "default"

source_table_name = "wiki_articles_demo"
source_table_fullname = f"{catalog_name}.{schema_name}.{source_table_name}"
# Uncomment the following line if you want to start from scratch.

# spark.sql(f"DROP TABLE {source_table_fullname}")
source_df = spark.read.parquet("/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet").limit(10)
display(source_df)

Conjunto de datos de muestra de bloque

La división en partes del conjunto de datos de ejemplo le ayuda a evitar superar el límite de contexto del modelo de incrustación. El modelo openAI admite hasta 8192 tokens. Sin embargo, Databricks recomienda dividir los datos en fragmentos de contexto más pequeños para que pueda alimentar una variedad más amplia de ejemplos en el modelo de razonamiento de la aplicación RAG.

import tiktoken
import pandas as pd


max_chunk_tokens = 1024
encoding = tiktoken.get_encoding("cl100k_base")


def chunk_text(text):
    # Encode and then decode within the UDF
    tokens = encoding.encode(text)
    chunks = []
    while tokens:
        chunk_tokens = tokens[:max_chunk_tokens]
        chunk_text = encoding.decode(chunk_tokens)
        chunks.append(chunk_text)
        tokens = tokens[max_chunk_tokens:]
    return chunks

# Process the data and store in a new list
pandas_df = source_df.toPandas()
processed_data = []
for index, row in pandas_df.iterrows():
    text_chunks = chunk_text(row['text'])
    chunk_no = 0
    for chunk in text_chunks:
        row_data = row.to_dict()

        # Replace the id column with a new unique chunk id
        # and the text column with the text chunk
        row_data['id'] = f"{row['id']}_{chunk_no}"
        row_data['text'] = chunk

        processed_data.append(row_data)
        chunk_no += 1

chunked_pandas_df = pd.DataFrame(processed_data)
chunked_spark_df = spark.createDataFrame(chunked_pandas_df)

# Write the chunked DataFrame to a Delta table
spark.sql(f"DROP TABLE IF EXISTS {source_table_fullname}")
chunked_spark_df.write.format("delta") \
    .option("delta.enableChangeDataFeed", "true") \
    .saveAsTable(source_table_fullname)
display(spark.sql(f"SELECT * FROM {source_table_fullname}"))

Creación de un punto de conexión de búsqueda vectorial

vector_search_endpoint_name = "vector-search-demo-endpoint"
vsc.create_endpoint(
    name=vector_search_endpoint_name,
    endpoint_type="STANDARD" # or "STORAGE_OPTIMIZED"
)
vsc.get_endpoint(
  name=vector_search_endpoint_name
)

Registro del punto de conexión del modelo de inserción de OpenAI

Para obtener información detallada sobre el uso, consulte la documentación del modelo externo para configurar un punto de conexión de OpenAI.

Para proporcionar credenciales, use el administrador de secretos de Databricks.

embedding_model_endpoint_name = "openai-embedding-endpoint"
import mlflow.deployments

mlflow_deploy_client = mlflow.deployments.get_deploy_client("databricks")

# Configure the secret manager with the OpenAPI key and provide the
# correct scope and key name below.

mlflow_deploy_client.create_endpoint(
    name=embedding_model_endpoint_name,
    config={
        "served_entities": [{
            "external_model": {
                "name": "text-embedding-ada-002",
                "provider": "openai",
                "task": "llm/v1/embeddings",
                "openai_config": {
                    "openai_api_key": "{{secrets/demo/openai-api-key}}" # CHANGE ME
                }
            }
    }]
    }
)

Creación de un índice vectorial

# Vector index
vs_index = f"{source_table_name}_openai_index"
vs_index_fullname = f"{catalog_name}.{schema_name}.{vs_index}"
index = vsc.create_delta_sync_index(
  endpoint_name=vector_search_endpoint_name,
  source_table_name=source_table_fullname,
  index_name=vs_index_fullname,
  pipeline_type='TRIGGERED',
  primary_key="id",
  embedding_source_column="text",
  embedding_model_endpoint_name=embedding_model_endpoint_name
)
index.describe()['status']['message']
# Wait for index to come online. Expect this command to take several minutes.
# You can also track the status of the index build in Catalog Explorer in the
# Overview tab for the vector index.

import time
index = vsc.get_index(endpoint_name=vector_search_endpoint_name,index_name=vs_index_fullname)
while not index.describe().get('status')['ready']:
  print("Waiting for index to be ready...")
  time.sleep(30)
print("Index is ready!")
index.describe()

Las celdas siguientes muestran cómo consultar el índice de vectores para buscar documentos similares.

results = index.similarity_search(
  query_text="Greek myths",
  columns=["id", "text", "title"],
  num_results=5
  )
rows = results['result']['data_array']
for (id, text, title, score) in rows:
  if len(text) > 32:
    # trim text output for readability
    text = text[0:32] + "..."
  print(f"id: {id}  title: {title} text: '{text}' score: {score}")
# Search with a filter. Note that the syntax depends on the endpoint type.

# Standard endpoint syntax
results = index.similarity_search(
  query_text="Greek myths",
  columns=["id", "text", "title"],
  num_results=5,
  filters={"title NOT": "Hercules"}
)

# Storage-optimized endpoint syntax
# results = index.similarity_search(
#   query_text="Greek myths",
#   columns=["id", "text", "title"],
#   num_results=5,
#   filters='title != "Hercules"'
#   )

rows = results['result']['data_array']
for (id, text, title, score) in rows:
  if len(text) > 32:
    # trim text output for readability
    text = text[0:32] + "..."
  print(f"id: {id}  title: {title} text: '{text}' score: {score}")

Eliminar índice de vectores

vsc.delete_index(
  endpoint_name=vector_search_endpoint_name,
  index_name=vs_index_fullname
)

Cuaderno de ejemplo

Ejemplo de modelo de inserción externa de búsqueda de vectores (OpenAI)

Obtener el portátil