Note
Access to this page requires authorization. You can try signing in or changing directories.
Access to this page requires authorization. You can try changing directories.
This notebook shows how to use the Vector Search Python SDK, which provides a VectorSearchClient as a primary API for working with Vector Search.
Alternatively, you can call the REST API directly.
Requirements
This notebook assumes that a Model Serving endpoint named databricks-gte-large-en exists. To create that endpoint, see the notebook Call a GTE embeddings model using Mosaic AI Model Serving.
%pip install --upgrade --force-reinstall databricks-vectorsearch langchain
dbutils.library.restartPython()
from databricks.vector_search.client import VectorSearchClient
vsc = VectorSearchClient()
help(VectorSearchClient)
Load toy dataset into source Delta table
The following creates the source Delta table.
# Specify the catalog and schema to use. You must have USE_CATALOG privilege on the catalog and USE_SCHEMA and CREATE_TABLE privileges on the schema.
# Change the catalog and schema here if necessary.
catalog_name = "main"
schema_name = "default"
source_table_name = "en_wiki"
source_table_fullname = f"{catalog_name}.{schema_name}.{source_table_name}"
# Uncomment if you want to start from scratch.
# spark.sql(f"DROP TABLE {source_table_fullname}")
source_df = spark.read.parquet("/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet").limit(10)
display(source_df)
source_df.write.format("delta").option("delta.enableChangeDataFeed", "true").saveAsTable(source_table_fullname)
display(spark.sql(f"SELECT * FROM {source_table_fullname}"))
Create vector search endpoint
vector_search_endpoint_name = "vector-search-demo-endpoint"
vsc.create_endpoint(
name=vector_search_endpoint_name,
endpoint_type="STANDARD" # or "STORAGE_OPTIMIZED"
)
endpoint = vsc.get_endpoint(
name=vector_search_endpoint_name)
endpoint
Create vector index
# Vector index
vs_index = "en_wiki_index"
vs_index_fullname = f"{catalog_name}.{schema_name}.{vs_index}"
embedding_model_endpoint = "databricks-gte-large-en"
index = vsc.create_delta_sync_index(
endpoint_name=vector_search_endpoint_name,
source_table_name=source_table_fullname,
index_name=vs_index_fullname,
pipeline_type='TRIGGERED',
primary_key="id",
embedding_source_column="text",
embedding_model_endpoint_name=embedding_model_endpoint
)
index.describe()
Get a vector index
Use get_index() to retrieve the vector index object using the vector index name. You can also use describe() on the index object to see a summary of the index's configuration information.
index = vsc.get_index(endpoint_name=vector_search_endpoint_name, index_name=vs_index_fullname)
index.describe()
# Wait for index to come online. Expect this command to take several minutes.
import time
while not index.describe().get('status').get('detailed_state').startswith('ONLINE'):
print("Waiting for index to be ONLINE...")
time.sleep(5)
print("Index is ONLINE")
index.describe()
Similarity search
Query the Vector Index to find similar documents.
# Returns [col1, col2, ...]
# You can set this to any subset of the columns.
all_columns = spark.table(source_table_fullname).columns
results = index.similarity_search(
query_text="Greek myths",
columns=all_columns,
num_results=2)
results
# Search with a filter. Note that the syntax depends on the endpoint type.
# Standard endpoint syntax
results = index.similarity_search(
query_text="Greek myths",
columns=all_columns,
filters={"id NOT": ("13770", "88231")},
num_results=2)
# Storage-optimized endpoint syntax
# results = index.similarity_search(
# query_text="Greek myths",
# columns=all_columns,
# filters='id NOT IN ("13770", "88231")',
# num_results=2)
results
Convert results to LangChain documents
The first column retrieved is loaded into page_content, and the rest into metadata.
from langchain_core.documents import Document
from typing import List
def convert_vector_search_to_documents(results) -> List[Document]:
column_names = []
for column in results["manifest"]["columns"]:
column_names.append(column)
langchain_docs = []
for item in results["result"]["data_array"]:
metadata = {}
score = item[-1]
# print(score)
i = 1
for field in item[1:-1]:
# print(field + "--")
metadata[column_names[i]["name"]] = field
i = i + 1
doc = Document(page_content=item[0], metadata=metadata) # , 9)
langchain_docs.append(doc)
return langchain_docs
langchain_docs = convert_vector_search_to_documents(results)
langchain_docs
Delete vector index
vsc.delete_index(index_name=vs_index_fullname)