Nota:
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
En este cuaderno se muestra cómo usar el SDK de búsqueda de vectores Python, que proporciona una VectorSearchClient como API principal para trabajar con la búsqueda de vectores.
En este cuaderno se usan las API de modelo de Databricks Foundation para acceder al modelo de incrustaciones GTE para generar inserciones.
%pip install --upgrade --force-reinstall databricks-vectorsearch
dbutils.library.restartPython()
from databricks.vector_search.client import VectorSearchClient
vsc = VectorSearchClient(disable_notice=True)
help(VectorSearchClient)
Carga el conjunto de datos de prueba en la tabla Delta de fuente
A continuación se crea la tabla Delta de origen.
# Specify the catalog and schema to use. You must have USE_CATALOG privilege on the catalog and USE_SCHEMA and CREATE_TABLE privileges on the schema.
# Change the catalog and schema here if necessary.
catalog_name = "main"
schema_name = "default"
source_table_name = "wiki_articles_demo"
source_table_fullname = f"{catalog_name}.{schema_name}.{source_table_name}"
# Uncomment if you want to start from scratch.
# spark.sql(f"DROP TABLE {source_table_fullname}")
source_df = spark.read.parquet("/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet").limit(10)
display(source_df)
Conjunto de datos de muestra de bloque
La división en partes del conjunto de datos de ejemplo le ayuda a evitar superar el límite de contexto del modelo de incrustación. El modelo GTE admite hasta 8192 tokens. Sin embargo, Databricks recomienda dividir los datos en fragmentos de contexto más pequeños para que pueda alimentar una variedad más amplia de ejemplos en el modelo de razonamiento de la aplicación RAG.
import tiktoken
import pandas as pd
# The GTE model has been trained on a max context lenth of 8192 tokens.
max_chunk_tokens = 8192
encoding = tiktoken.get_encoding("cl100k_base")
def chunk_text(text):
# Encode and then decode within the UDF
tokens = encoding.encode(text)
chunks = []
while tokens:
chunk_tokens = tokens[:max_chunk_tokens]
chunk_text = encoding.decode(chunk_tokens)
chunks.append(chunk_text)
tokens = tokens[max_chunk_tokens:]
return chunks
# Process the data and store in a new list
pandas_df = source_df.toPandas()
processed_data = []
for index, row in pandas_df.iterrows():
text_chunks = chunk_text(row['text'])
chunk_no = 0
for chunk in text_chunks:
row_data = row.to_dict()
# replace the id column with a new unique chunk id
# and the text column with the text chunk
row_data['id'] = f"{row['id']}_{chunk_no}"
row_data['text'] = chunk
processed_data.append(row_data)
chunk_no += 1
chunked_pandas_df = pd.DataFrame(processed_data)
chunked_spark_df = spark.createDataFrame(chunked_pandas_df)
# Write the chunked DataFrame to a Delta table
spark.sql(f"DROP TABLE IF EXISTS {source_table_fullname}")
chunked_spark_df.write.format("delta") \
.option("delta.enableChangeDataFeed", "true") \
.saveAsTable(source_table_fullname)
display(spark.sql(f"SELECT * FROM {source_table_fullname}"))
Creación de un punto de conexión de búsqueda vectorial
vector_search_endpoint_name = "vector-search-demo-endpoint"
vsc.create_endpoint(
name=vector_search_endpoint_name,
endpoint_type="STANDARD" # or "STORAGE_OPTIMIZED"
)
vsc.get_endpoint(
name=vector_search_endpoint_name
)
Creación de un índice vectorial
# Vector index
vs_index = f"{source_table_name}_gte_index"
vs_index_fullname = f"{catalog_name}.{schema_name}.{vs_index}"
embedding_model_endpoint = "databricks-gte-large-en"
index = vsc.create_delta_sync_index(
endpoint_name=vector_search_endpoint_name,
source_table_name=source_table_fullname,
index_name=vs_index_fullname,
pipeline_type='TRIGGERED',
primary_key="id",
embedding_source_column="text",
embedding_model_endpoint_name=embedding_model_endpoint
)
index.describe()['status']['message']
# Wait for index to come online. Expect this command to take several minutes.
# You can also track the status of the index build in Catalog Explorer in the
# Overview tab for the vector index.
import time
index = vsc.get_index(endpoint_name=vector_search_endpoint_name,index_name=vs_index_fullname)
while not index.describe().get('status')['ready']:
print("Waiting for index to be ready...")
time.sleep(30)
print("Index is ready!")
index.describe()
Búsqueda de similitud
Las celdas siguientes muestran cómo consultar el índice de vectores para buscar documentos similares.
results = index.similarity_search(
query_text="Greek myths",
columns=["id", "text", "title"],
num_results=5
)
rows = results['result']['data_array']
for (id, text, title, score) in rows:
if len(text) > 32:
# trim text output for readability
text = text[0:32] + "..."
print(f"id: {id} title: {title} text: '{text}' score: {score}")
# Search with a filter. Note that the syntax depends on the endpoint type.
# Standard endpoint syntax
results = index.similarity_search(
query_text="Greek myths",
columns=["id", "text", "title"],
num_results=5,
filters={"title NOT": "Hercules"}
)
# Storage-optimized endpoint syntax
# results = index.similarity_search(
# query_text="Greek myths",
# columns=["id", "text", "title"],
# num_results=5,
# filters='title != "Hercules"'
# )
rows = results['result']['data_array']
for (id, text, title, score) in rows:
if len(text) > 32:
# trim text output for readability
text = text[0:32] + "..."
print(f"id: {id} title: {title} text: '{text}' score: {score}")
Eliminar índice de vectores
vsc.delete_index(
endpoint_name=vector_search_endpoint_name,
index_name=vs_index_fullname
)